1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33
34 namespace DistributedDB {
35 namespace {
36 static constexpr const char *DATAKEY = "DATA_KEY";
37 static constexpr const char *DEVICE_FIELD = "DEVICE";
38 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
39 static constexpr const char *VERSION = "VERSION";
40 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
41 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
42 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
43 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
44 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
45 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
46 // clean 5th bit of flag to one which is wait compensated sync
47 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
48 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
49 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
50 // set data logic delete and exist passport
51 static constexpr const char *SET_FLAG_LOGIC_DELETE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x02)";
52 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
53 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
54 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
55 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
56 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
57 static constexpr const int DATA_KEY_INDEX = 0;
58 static constexpr const int TIMESTAMP_INDEX = 3;
59 static constexpr const int W_TIMESTAMP_INDEX = 4;
60 static constexpr const int FLAG_INDEX = 5;
61 static constexpr const int HASH_KEY_INDEX = 6;
62 static constexpr const int CLOUD_GID_INDEX = 7;
63 static constexpr const int VERSION_INDEX = 8;
64 static constexpr const int STATUS_INDEX = 9;
65
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)66 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
67 {
68 if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
69 return SQLITE_DENY;
70 }
71 if (b == SQLITE_FUNCTION) {
72 if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
73 LOGE("Deny fts3_tokenizer in remote query");
74 return SQLITE_DENY;
75 }
76 }
77 return SQLITE_OK;
78 }
79 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)80 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
81 DistributedTableMode mode)
82 : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
83 assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
84 maxUploadCount_(0), maxUploadSize_(0)
85 {
86 bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
87 bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
88 bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
89 bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
90 bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
91 bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
92 bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
93 }
94
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)95 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
96 {
97 std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
98 if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
99 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
100 return -E_NOT_SUPPORT;
101 }
102 std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
103 for (const auto &field : fieldInfos) {
104 if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
105 LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
106 return -E_NOT_SUPPORT;
107 }
108 }
109
110 if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
111 if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
112 LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
113 return -E_NOT_SUPPORT;
114 }
115
116 if (DBCommon::HasConstraint(trimedSql, "ON CONFLICT", " )", " ")) {
117 LOGE("[CreateDistributedTable] Not support create distributed table with 'ON CONFLICT' constraint.");
118 return -E_NOT_SUPPORT;
119 }
120
121 if (mode == DistributedTableMode::COLLABORATION) {
122 if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
123 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
124 return -E_NOT_SUPPORT;
125 }
126 }
127
128 if (syncType == CLOUD_COOPERATION) {
129 int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
130 if (errCode != E_OK) {
131 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
132 return errCode;
133 }
134 }
135 }
136
137 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
138 if (table.GetPrimaryKey().size() > 1) {
139 LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
140 return -E_NOT_SUPPORT;
141 }
142 }
143
144 return E_OK;
145 }
146
147 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)148 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
149 {
150 std::string sql = "SELECT get_sys_time(0) - max(" + std::string(DBConstant::SQLITE_INNER_ROWID) + ") - 1 FROM '" +
151 tableName + "';";
152 sqlite3_stmt *stmt = nullptr;
153 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
154 if (errCode != E_OK) {
155 return errCode;
156 }
157 errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
158 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
159 timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
160 errCode = E_OK;
161 }
162 SQLiteUtils::ResetStatement(stmt, true, errCode);
163 return errCode;
164 }
165 }
166
GeneLogInfoForExistedData(sqlite3 * db,const std::string & tableName,const std::string & calPrimaryKeyHash,TableInfo & tableInfo)167 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName,
168 const std::string &calPrimaryKeyHash, TableInfo &tableInfo)
169 {
170 int64_t timeOffset = 0;
171 int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
172 if (errCode != E_OK) {
173 return errCode;
174 }
175 errCode = SetLogTriggerStatus(false);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179 std::string timeOffsetStr = std::to_string(timeOffset);
180 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
181 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
182 std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
183 static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY));
184 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
185 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + timeOffsetStr +
186 " + " + rowid + ", " + timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', " +
187 "'', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
188 errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
189 if (errCode != E_OK) {
190 LOGE("Failed to initialize device type log data.%d", errCode);
191 }
192 return errCode;
193 }
194 TrackerTable trackerTable = tableInfo.GetTrackerTable();
195 trackerTable.SetTableName(tableName);
196 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid +
197 ", '', '', " + timeOffsetStr + " + " + rowid + ", " +
198 timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', ";
199 sql += tableInfo.GetTrackerTable().GetExtendName().empty() ? "''" : tableInfo.GetTrackerTable().GetExtendName();
200 sql += ", 0, '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
201 errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
202 int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
203 if (ret != E_OK) {
204 LOGE("Failed to initialize cloud type log data.%d", ret);
205 }
206 return ret;
207 });
208 return errCode;
209 }
210
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table,TableSyncType syncType)211 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
212 const std::string &identity, TableInfo &table, TableSyncType syncType)
213 {
214 if (dbHandle_ == nullptr) {
215 return -E_INVALID_DB;
216 }
217
218 const std::string tableName = table.GetTableName();
219 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
220 if (errCode != E_OK) {
221 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
222 return errCode;
223 }
224
225 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
226 bool isEmpty = false;
227 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
228 if (errCode != E_OK) {
229 LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
230 return -E_NOT_SUPPORT;
231 }
232 if (!isEmpty) {
233 LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
234 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
235 static_cast<int>(syncType));
236 }
237 }
238
239 errCode = CheckTableConstraint(table, mode, syncType);
240 if (errCode != E_OK) {
241 LOGE("[CreateDistributedTable] check table constraint failed.");
242 return errCode;
243 }
244
245 // create log table
246 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
247 errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
248 if (errCode != E_OK) {
249 LOGE("[CreateDistributedTable] create log table failed");
250 return errCode;
251 }
252
253 if (!isUpgraded) {
254 std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, identity);
255 errCode = GeneLogInfoForExistedData(dbHandle_, tableName, calPrimaryKeyHash, table);
256 if (errCode != E_OK) {
257 return errCode;
258 }
259 }
260
261 // add trigger
262 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
263 if (errCode != E_OK) {
264 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
265 return errCode;
266 }
267 return SetLogTriggerStatus(true);
268 }
269
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)270 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
271 DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
272 {
273 if (dbHandle_ == nullptr) {
274 return -E_INVALID_DB;
275 }
276 TableInfo newTableInfo;
277 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
278 if (errCode != E_OK) {
279 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
280 return errCode;
281 }
282
283 if (CheckTableConstraint(newTableInfo, mode, syncType)) {
284 LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
285 return -E_NOT_SUPPORT;
286 }
287
288 // new table should has same or compatible upgrade
289 TableInfo tableInfo = schema.GetTable(tableName);
290 errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
291 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
292 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
293 return -E_SCHEMA_MISMATCH;
294 } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
295 LOGD("[UpgradeDistributedTable] schema has not changed.");
296 // update table if tableName changed
297 schema.RemoveRelationalTable(tableName);
298 tableInfo.SetTableName(tableName);
299 schema.AddRelationalTable(tableInfo);
300 return E_OK;
301 }
302
303 schemaChanged = true;
304 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
305 if (errCode != E_OK) {
306 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
307 }
308
309 schema.AddRelationalTable(newTableInfo);
310 return errCode;
311 }
312
313 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)314 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
315 std::vector<std::string> &deviceTables)
316 {
317 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
318 return -E_INVALID_ARGS;
319 }
320 std::string devicePattern = device.empty() ? "%" : device;
321 std::string tablePattern = tableName.empty() ? "%" : tableName;
322 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
323
324 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
325 deviceTableName + "';";
326 sqlite3_stmt *stmt = nullptr;
327 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
328 if (errCode != E_OK) {
329 return errCode;
330 }
331
332 do {
333 errCode = SQLiteUtils::StepWithRetry(stmt, false);
334 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
335 errCode = E_OK;
336 break;
337 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
338 LOGE("Get table name failed. %d", errCode);
339 break;
340 }
341 std::string realTableName;
342 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
343 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
344 continue;
345 }
346 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
347 continue;
348 }
349 deviceTables.emplace_back(realTableName);
350 } while (true);
351
352 SQLiteUtils::ResetStatement(stmt, true, errCode);
353 return errCode;
354 }
355
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)356 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
357 {
358 std::vector<FieldInfo> fields;
359 auto itOld = oldTableInfo.GetFields().begin();
360 auto itNew = newTableInfo.GetFields().begin();
361 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
362 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
363 fields.emplace_back(itNew->second);
364 continue;
365 }
366 itOld++;
367 }
368 return fields;
369 }
370
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)371 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
372 {
373 if (db == nullptr) {
374 return -E_INVALID_ARGS;
375 }
376
377 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
378 return a.GetColumnId()< b.GetColumnId();
379 });
380 int errCode = E_OK;
381 for (const auto &table : tables) {
382 for (const auto &field : fields) {
383 std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
384 alterSql += "'" + field.GetDataType() + "'";
385 alterSql += field.IsNotNull() ? " NOT NULL" : "";
386 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
387 alterSql += ";";
388 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
389 if (errCode != E_OK) {
390 LOGE("Alter table failed. %d", errCode);
391 break;
392 }
393 }
394 }
395 return errCode;
396 }
397
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)398 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
399 {
400 IndexInfoMap indexes;
401 auto itOld = oldTableInfo.GetIndexDefine().begin();
402 auto itNew = newTableInfo.GetIndexDefine().begin();
403 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
404 auto itNewEnd = newTableInfo.GetIndexDefine().end();
405
406 while (itOld != itOldEnd && itNew != itNewEnd) {
407 if (itOld->first == itNew->first) {
408 if (itOld->second != itNew->second) {
409 indexes.insert({itNew->first, itNew->second});
410 }
411 itOld++;
412 itNew++;
413 } else if (itOld->first < itNew->first) {
414 indexes.insert({itOld->first, {}});
415 itOld++;
416 } else {
417 indexes.insert({itNew->first, itNew->second});
418 itNew++;
419 }
420 }
421
422 while (itOld != itOldEnd) {
423 indexes.insert({itOld->first, {}});
424 itOld++;
425 }
426
427 while (itNew != itNewEnd) {
428 indexes.insert({itNew->first, itNew->second});
429 itNew++;
430 }
431
432 return indexes;
433 }
434
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)435 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
436 {
437 if (db == nullptr) {
438 return -E_INVALID_ARGS;
439 }
440
441 int errCode = E_OK;
442 for (const auto &table : tables) {
443 for (const auto &index : indexes) {
444 if (index.first.empty()) {
445 continue;
446 }
447 std::string realIndexName = table + "_" + index.first;
448 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
449 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
450 if (errCode != E_OK) {
451 LOGE("Drop index failed. %d", errCode);
452 return errCode;
453 }
454
455 if (index.second.empty()) { // empty means drop index only
456 continue;
457 }
458
459 auto it = index.second.begin();
460 std::string indexDefine = *it++;
461 while (it != index.second.end()) {
462 indexDefine += ", " + *it++;
463 }
464 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
465 "(" + indexDefine + ");";
466 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
467 if (errCode != E_OK) {
468 LOGE("Create index failed. %d", errCode);
469 break;
470 }
471 }
472 }
473 return errCode;
474 }
475 }
476
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)477 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
478 const TableInfo &newTableInfo)
479 {
480 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
481 IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
482 std::vector<std::string> deviceTables;
483 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
484 if (errCode != E_OK) {
485 LOGE("Get device table name for alter table failed. %d", errCode);
486 return errCode;
487 }
488
489 LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
490 upgradeIndexes.size(), deviceTables.size());
491 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
492 if (errCode != E_OK) {
493 LOGE("upgrade fields failed. %d", errCode);
494 return errCode;
495 }
496
497 errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
498 if (errCode != E_OK) {
499 LOGE("upgrade indexes failed. %d", errCode);
500 }
501
502 return errCode;
503 }
504
StartTransaction(TransactType type)505 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
506 {
507 if (dbHandle_ == nullptr) {
508 LOGE("Begin transaction failed, dbHandle is null.");
509 return -E_INVALID_DB;
510 }
511 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
512 if (errCode != E_OK) {
513 LOGE("Begin transaction failed, errCode = %d", errCode);
514 }
515 return errCode;
516 }
517
Commit()518 int SQLiteSingleVerRelationalStorageExecutor::Commit()
519 {
520 if (dbHandle_ == nullptr) {
521 return -E_INVALID_DB;
522 }
523
524 return SQLiteUtils::CommitTransaction(dbHandle_);
525 }
526
Rollback()527 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
528 {
529 if (dbHandle_ == nullptr) {
530 return -E_INVALID_DB;
531 }
532 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
533 if (errCode != E_OK) {
534 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
535 }
536 return errCode;
537 }
538
SetTableInfo(const TableInfo & tableInfo)539 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
540 {
541 table_ = tableInfo;
542 }
543
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)544 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
545 {
546 logInfo.dataKey = sqlite3_column_int64(logStatement, 0); // 0 means dataKey index
547
548 std::vector<uint8_t> dev;
549 int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev); // 1 means dev index
550 if (errCode != E_OK) {
551 return errCode;
552 }
553 logInfo.device = std::string(dev.begin(), dev.end());
554
555 std::vector<uint8_t> oriDev;
556 errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev); // 2 means ori_dev index
557 if (errCode != E_OK) {
558 return errCode;
559 }
560 logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
561 logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3)); // 3 means timestamp index
562 logInfo.wTimestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4)); // 4 means w_timestamp index
563 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5)); // 5 means flag index
564 logInfo.flag &= (~DataItem::LOCAL_FLAG);
565 logInfo.flag &= (~DataItem::UPDATE_FLAG);
566 return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey); // 6 means hashKey index
567 }
568
569 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize)570 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize)
571 {
572 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
573 uint64_t curTime = 0;
574 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
575 if (modifyTime > static_cast<int64_t>(curTime)) {
576 modifyTime = static_cast<int64_t>(curTime);
577 }
578 } else {
579 LOGW("[Relational] get raw sys time failed.");
580 }
581 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
582 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
583 static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
584 totalSize += sizeof(int64_t) + sizeof(int64_t);
585 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
586 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
587 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
588 if (!cloudGid.empty()) {
589 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
590 totalSize += cloudGid.size();
591 }
592 }
593 std::string version;
594 SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
595 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
596 totalSize += version.size();
597 }
598
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)599 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
600 {
601 flags.insert_or_assign(CloudDbConstant::ROWID,
602 static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
603 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
604 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
605 flags.insert_or_assign(CloudDbConstant::FLAG,
606 static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
607 Bytes hashKey;
608 (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
609 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
610 flags.insert_or_assign(CloudDbConstant::STATUS,
611 static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
612 }
613
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)614 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
615 {
616 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
617 return;
618 }
619 std::string gid = reinterpret_cast<const std::string::value_type *>(
620 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
621 if (gid.empty()) {
622 LOGW("[Relational] Get cloud gid is null.");
623 return;
624 }
625 cloudGid.emplace_back(gid);
626 }
627 }
628
GetDataItemSerialSize(DataItem & item,size_t appendLen)629 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
630 {
631 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
632 // the size would not be very large.
633 static const size_t maxOrigDevLength = 40;
634 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
635 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
636 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
637 return dataSize;
638 }
639
GetKvData(const Key & key,Value & value) const640 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
641 {
642 static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
643 "metadata WHERE key=?;";
644 sqlite3_stmt *statement = nullptr;
645 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
646 if (errCode != E_OK) {
647 goto END;
648 }
649
650 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
651 if (errCode != E_OK) {
652 goto END;
653 }
654
655 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
656 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
657 errCode = -E_NOT_FOUND;
658 goto END;
659 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
660 goto END;
661 }
662
663 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
664 END:
665 SQLiteUtils::ResetStatement(statement, true, errCode);
666 return errCode;
667 }
668
PutKvData(const Key & key,const Value & value) const669 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
670 {
671 static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
672 "metadata VALUES(?,?);";
673 sqlite3_stmt *statement = nullptr;
674 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
675 if (errCode != E_OK) {
676 return errCode;
677 }
678
679 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // 1 means key index
680 if (errCode != E_OK) {
681 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
682 goto ERROR;
683 }
684
685 errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true); // 2 means value index
686 if (errCode != E_OK) {
687 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
688 goto ERROR;
689 }
690 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
691 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
692 errCode = E_OK;
693 }
694 ERROR:
695 SQLiteUtils::ResetStatement(statement, true, errCode);
696 return errCode;
697 }
698
DeleteMetaData(const std::vector<Key> & keys) const699 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
700 {
701 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
702 "metadata WHERE key=?;";
703 sqlite3_stmt *statement = nullptr;
704 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
705 if (errCode != E_OK) {
706 return errCode;
707 }
708
709 for (const auto &key : keys) {
710 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
711 if (errCode != E_OK) {
712 break;
713 }
714
715 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
716 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
717 break;
718 }
719 errCode = E_OK;
720 SQLiteUtils::ResetStatement(statement, false, errCode);
721 }
722 SQLiteUtils::ResetStatement(statement, true, errCode);
723 return CheckCorruptedStatus(errCode);
724 }
725
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const726 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
727 {
728 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
729 "metadata WHERE key>=? AND key<=?;";
730 sqlite3_stmt *statement = nullptr;
731 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
732 if (errCode != E_OK) {
733 return errCode;
734 }
735
736 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
737 if (errCode == E_OK) {
738 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
739 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
740 errCode = E_OK;
741 }
742 }
743 SQLiteUtils::ResetStatement(statement, true, errCode);
744 return CheckCorruptedStatus(errCode);
745 }
746
GetAllMetaKeys(std::vector<Key> & keys) const747 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
748 {
749 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
750 sqlite3_stmt *statement = nullptr;
751 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
752 if (errCode != E_OK) {
753 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
754 return errCode;
755 }
756 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
757 SQLiteUtils::ResetStatement(statement, true, errCode);
758 return errCode;
759 }
760
GetLogInfoPre(sqlite3_stmt * queryStmt,const DataItem & dataItem,LogInfo & logInfoGet)761 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem,
762 LogInfo &logInfoGet)
763 {
764 if (queryStmt == nullptr) {
765 return -E_INVALID_ARGS;
766 }
767 int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey); // 1 means hashkey index.
768 if (errCode != E_OK) {
769 return errCode;
770 }
771 if (mode_ != DistributedTableMode::COLLABORATION) {
772 errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev); // 2 means device index.
773 if (errCode != E_OK) {
774 return errCode;
775 }
776 }
777
778 errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
779 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
780 errCode = -E_NOT_FOUND;
781 } else {
782 errCode = GetLogData(queryStmt, logInfoGet);
783 }
784 return errCode;
785 }
786
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)787 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
788 const DataItem &dataItem, int64_t rowid)
789 {
790 LogInfo logInfoGet;
791 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
792 LogInfo logInfoBind;
793 logInfoBind.hashKey = dataItem.hashKey;
794 logInfoBind.device = dataItem.dev;
795 logInfoBind.timestamp = dataItem.timestamp;
796 logInfoBind.flag = dataItem.flag;
797
798 if (errCode == -E_NOT_FOUND) { // insert
799 logInfoBind.wTimestamp = dataItem.writeTimestamp;
800 logInfoBind.originDev = dataItem.dev;
801 } else if (errCode == E_OK) { // update
802 logInfoBind.wTimestamp = logInfoGet.wTimestamp;
803 logInfoBind.originDev = logInfoGet.originDev;
804 } else {
805 return errCode;
806 }
807
808 // bind
809 SQLiteUtils::BindInt64ToStatement(statement, 1, rowid); // 1 means dataKey index
810 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
811 SQLiteUtils::BindBlobToStatement(statement, 2, originDev); // 2 means ori_dev index
812 SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp); // 3 means timestamp index
813 SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp); // 4 means w_timestamp index
814 SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag); // 5 means flag index
815 SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey); // 6 means hashKey index
816 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
817 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
818 return E_OK;
819 }
820 return errCode;
821 }
822
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)823 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
824 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
825 {
826 if (stmt == nullptr) {
827 int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
828 if (errCode != E_OK) {
829 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
830 return errCode;
831 }
832 }
833
834 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
835 if (errCode != E_OK) {
836 SQLiteUtils::ResetStatement(stmt, true, errCode);
837 return errCode;
838 }
839 if (mode_ != DistributedTableMode::COLLABORATION) {
840 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
841 if (errCode != E_OK) {
842 SQLiteUtils::ResetStatement(stmt, true, errCode);
843 return errCode;
844 }
845 }
846 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
847 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
848 errCode = E_OK;
849 }
850 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
851 return errCode;
852 }
853
SaveSyncDataItem(const DataItem & dataItem,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)854 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt,
855 RelationalSyncDataInserter &inserter, int64_t &rowid)
856 {
857 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
858 return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
859 }
860 if ((mode_ == DistributedTableMode::COLLABORATION && inserter.GetLocalTable().GetIdentifyKey().size() == 1u &&
861 inserter.GetLocalTable().GetIdentifyKey().at(0) == "rowid") ||
862 (mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().GetPrimaryKey().size() == 1u &&
863 inserter.GetLocalTable().GetPrimaryKey().at(0) == "rowid") ||
864 inserter.GetLocalTable().GetAutoIncrement()) { // No primary key of auto increment
865 int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
866 if (errCode != E_OK) {
867 LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
868 return errCode;
869 }
870 }
871
872 int errCode = inserter.BindInsertStatement(saveStmt.saveDataStmt, dataItem);
873 if (errCode != E_OK) {
874 LOGE("Bind data failed, errCode=%d.", errCode);
875 return errCode;
876 }
877 errCode = SQLiteUtils::StepWithRetry(saveStmt.saveDataStmt, isMemDb_);
878 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
879 rowid = SQLiteUtils::GetLastRowId(dbHandle_);
880 errCode = E_OK;
881 }
882 return errCode;
883 }
884
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)885 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
886 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
887 {
888 if (stmt == nullptr) {
889 int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
890 if (errCode != E_OK) {
891 LOGE("[DeleteSyncLog] Get statement fail!");
892 return errCode;
893 }
894 }
895
896 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
897 if (errCode != E_OK) {
898 SQLiteUtils::ResetStatement(stmt, true, errCode);
899 return errCode;
900 }
901 if (mode_ != DistributedTableMode::COLLABORATION) {
902 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
903 if (errCode != E_OK) {
904 SQLiteUtils::ResetStatement(stmt, true, errCode);
905 return errCode;
906 }
907 }
908 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
909 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
910 errCode = E_OK;
911 }
912 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
913 return errCode;
914 }
915
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)916 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
917 RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
918 {
919 int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
920 if (errCode != E_OK) {
921 return errCode;
922 }
923 return DeleteSyncLog(item, inserter, rmLogStmt);
924 }
925
GetSyncDataPre(const DataItem & dataItem,sqlite3_stmt * queryStmt,DataItem & itemGet)926 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt,
927 DataItem &itemGet)
928 {
929 LogInfo logInfoGet;
930 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
931 itemGet.timestamp = logInfoGet.timestamp;
932 SQLiteUtils::ResetStatement(queryStmt, false, errCode);
933 return errCode;
934 }
935
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated)936 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
937 sqlite3_stmt *queryStmt, bool &isDefeated)
938 {
939 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
940 mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
941 isDefeated = false; // no need to solve conflict except miss query data
942 return E_OK;
943 }
944
945 DataItem itemGet;
946 int errCode = GetSyncDataPre(dataItem, queryStmt, itemGet);
947 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
948 LOGE("Failed to get raw data. %d", errCode);
949 return errCode;
950 }
951 isDefeated = (dataItem.timestamp <= itemGet.timestamp); // defeated if item timestamp is earlier then raw data
952 return E_OK;
953 }
954
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)955 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
956 SaveSyncDataStmt &saveStmt, DataItem &item)
957 {
958 bool isDefeated = false;
959 int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated);
960 if (errCode != E_OK) {
961 LOGE("check data conflict failed. %d", errCode);
962 return errCode;
963 }
964
965 if (isDefeated) {
966 LOGD("Data was defeated.");
967 return E_OK;
968 }
969 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
970 return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
971 }
972 int64_t rowid = -1;
973 errCode = SaveSyncDataItem(item, saveStmt, inserter, rowid);
974 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
975 errCode = SaveSyncLog(saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
976 }
977 return errCode;
978 }
979
SaveSyncDataItems(RelationalSyncDataInserter & inserter)980 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
981 {
982 SaveSyncDataStmt saveStmt;
983 int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
984 if (errCode != E_OK) {
985 LOGE("Prepare insert sync data statement failed.");
986 return errCode;
987 }
988
989 errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
990 if (item.neglect) { // Do not save this record if it is neglected
991 return E_OK;
992 }
993 int errCode = SaveSyncDataItem(inserter, saveStmt, item);
994 if (errCode != E_OK) {
995 LOGE("save sync data item failed. err=%d", errCode);
996 return errCode;
997 }
998 // Need not reset rmDataStmt and rmLogStmt here.
999 return saveStmt.ResetStatements(false);
1000 });
1001
1002 int ret = saveStmt.ResetStatements(true);
1003 return errCode != E_OK ? errCode : ret;
1004 }
1005
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)1006 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
1007 {
1008 if (useTrans) {
1009 int errCode = StartTransaction(TransactType::IMMEDIATE);
1010 if (errCode != E_OK) {
1011 return errCode;
1012 }
1013 }
1014
1015 int errCode = SetLogTriggerStatus(false);
1016 if (errCode != E_OK) {
1017 goto END;
1018 }
1019
1020 errCode = SaveSyncDataItems(inserter);
1021 if (errCode != E_OK) {
1022 LOGE("Save sync data items failed. errCode=%d", errCode);
1023 goto END;
1024 }
1025
1026 errCode = SetLogTriggerStatus(true);
1027 END:
1028 if (useTrans) {
1029 if (errCode == E_OK) {
1030 errCode = Commit();
1031 } else {
1032 (void)Rollback(); // Keep the error code of the first scene
1033 }
1034 }
1035 return errCode;
1036 }
1037
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1038 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1039 bool isGettingDeletedData) const
1040 {
1041 RowDataWithLog data;
1042 int errCode = GetLogData(stmt, data.logInfo);
1043 if (errCode != E_OK) {
1044 LOGE("relational data value transfer to kv fail");
1045 return errCode;
1046 }
1047
1048 if (!isGettingDeletedData) {
1049 for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
1050 DataValue value;
1051 errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1052 value);
1053 if (errCode != E_OK) {
1054 return errCode;
1055 }
1056 data.rowData.push_back(std::move(value)); // sorted by cid
1057 }
1058 }
1059
1060 errCode = DataTransformer::SerializeDataItem(data,
1061 isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
1062 if (errCode != E_OK) {
1063 LOGE("relational data value transfer to kv fail");
1064 }
1065 return errCode;
1066 }
1067
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1068 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1069 {
1070 int errCode = GetDataItemForSync(fullStmt, item, false);
1071 if (errCode != E_OK) {
1072 return errCode;
1073 }
1074 item.value = {};
1075 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1076 return E_OK;
1077 }
1078
1079 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1080 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp ×tamp)
1081 {
1082 if (stmt == nullptr) {
1083 return -E_INVALID_ARGS;
1084 }
1085 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1086 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1087 timestamp = INT64_MAX;
1088 errCode = E_OK;
1089 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1090 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
1091 errCode = E_OK;
1092 }
1093 return errCode;
1094 }
1095
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1096 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1097 std::vector<DataItem> &dataItems, DataItem &&item)
1098 {
1099 // If one record is over 4M, ignore it.
1100 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1101 overLongSize++;
1102 } else {
1103 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1104 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1105 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1106 return -E_UNFINISHED;
1107 } else {
1108 dataItems.push_back(item);
1109 }
1110 }
1111 return E_OK;
1112 }
1113 }
1114
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1115 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1116 sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1117 {
1118 if (!isFirstTime) { // For the first time, never step before, can get nothing
1119 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1120 if (errCode != E_OK) {
1121 return errCode;
1122 }
1123 }
1124 return StepNext(isMemDb_, queryStmt, queryTime);
1125 }
1126
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1127 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1128 Timestamp &missQueryTime)
1129 {
1130 int errCode = GetMissQueryData(fullStmt, item);
1131 if (errCode != E_OK) {
1132 return errCode;
1133 }
1134 return StepNext(isMemDb_, fullStmt, missQueryTime);
1135 }
1136
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1137 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1138 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1139 const TableInfo &tableInfo)
1140 {
1141 baseTblName_ = tableInfo.GetTableName();
1142 SetTableInfo(tableInfo);
1143 sqlite3_stmt *queryStmt = nullptr;
1144 sqlite3_stmt *fullStmt = nullptr;
1145 bool isGettingDeletedData = false;
1146 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1147 if (errCode != E_OK) {
1148 return errCode;
1149 }
1150
1151 Timestamp queryTime = 0;
1152 Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1153
1154 bool isFirstTime = true;
1155 size_t dataTotalSize = 0;
1156 size_t overLongSize = 0;
1157 do {
1158 DataItem item;
1159 if (queryTime < missQueryTime) {
1160 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1161 } else if (queryTime == missQueryTime) {
1162 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1163 if (errCode != E_OK) {
1164 break;
1165 }
1166 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1167 } else {
1168 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1169 }
1170
1171 if (errCode == E_OK && !isFirstTime) {
1172 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1173 }
1174
1175 if (errCode != E_OK) {
1176 break;
1177 }
1178
1179 isFirstTime = false;
1180 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1181 errCode = -E_FINISHED;
1182 break;
1183 }
1184 } while (true);
1185 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1186 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1187 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1188 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1189 return errCode;
1190 }
1191
CheckDBModeForRelational()1192 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1193 {
1194 std::string journalMode;
1195 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1196
1197 for (auto &c : journalMode) { // convert to lowercase
1198 c = static_cast<char>(std::tolower(c));
1199 }
1200
1201 if (errCode == E_OK && journalMode != "wal") {
1202 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1203 return -E_NOT_SUPPORT;
1204 }
1205 return errCode;
1206 }
1207
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1208 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1209 const std::string &tableName)
1210 {
1211 std::vector<std::string> deviceTables;
1212 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1213 if (errCode != E_OK) {
1214 LOGE("Get device table name for alter table failed. %d", errCode);
1215 return errCode;
1216 }
1217
1218 LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1219 for (const auto &table : deviceTables) {
1220 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1221 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1222 if (errCode != E_OK) {
1223 LOGE("Delete device data failed. %d", errCode);
1224 break;
1225 }
1226 }
1227 return errCode;
1228 }
1229
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1230 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1231 {
1232 std::string deleteLogSql =
1233 "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName +
1234 "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1235 return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1236 }
1237
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1238 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1239 const std::string &tableName)
1240 {
1241 std::string deleteLogSql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE device = ?";
1242 sqlite3_stmt *deleteLogStmt = nullptr;
1243 int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1244 if (errCode != E_OK) {
1245 LOGE("Get delete device data log statement failed. %d", errCode);
1246 return errCode;
1247 }
1248
1249 errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1250 if (errCode != E_OK) {
1251 LOGE("Bind device to delete data log statement failed. %d", errCode);
1252 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1253 return errCode;
1254 }
1255
1256 errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1257 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1258 errCode = E_OK;
1259 } else {
1260 LOGE("Delete data log failed. %d", errCode);
1261 }
1262
1263 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1264 return errCode;
1265 }
1266
DeleteDistributedLogTable(const std::string & tableName)1267 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1268 {
1269 if (tableName.empty()) {
1270 return -E_INVALID_ARGS;
1271 }
1272 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1273 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1274 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1275 if (errCode != E_OK) {
1276 LOGE("Delete distributed log table failed. %d", errCode);
1277 }
1278 return errCode;
1279 }
1280
IsTableOnceDropped(const std::string & tableName,int execCode,bool & onceDropped)1281 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, int execCode,
1282 bool &onceDropped)
1283 {
1284 if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1285 onceDropped = true;
1286 return E_OK;
1287 } else if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1288 std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1289 Key key;
1290 DBCommon::StringToVector(keyStr, key);
1291 Value value;
1292
1293 int errCode = GetKvData(key, value);
1294 if (errCode == E_OK) {
1295 // if user drop table first, then create again(but don't create distributed table), will reach this branch
1296 onceDropped = true;
1297 return E_OK;
1298 } else if (errCode == -E_NOT_FOUND) {
1299 onceDropped = false;
1300 return E_OK;
1301 } else {
1302 LOGE("[IsTableOnceDropped] query is table dropped failed, %d", errCode);
1303 return errCode;
1304 }
1305 } else {
1306 return execCode;
1307 }
1308 }
1309
CleanResourceForDroppedTable(const std::string & tableName)1310 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1311 {
1312 int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1313 if (errCode != E_OK) {
1314 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1315 return errCode;
1316 }
1317 errCode = DeleteDistributedLogTable(tableName);
1318 if (errCode != E_OK) {
1319 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1320 return errCode;
1321 }
1322 errCode = DeleteTableTrigger(tableName);
1323 if (errCode != E_OK) {
1324 LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1325 }
1326 return errCode;
1327 }
1328
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1329 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1330 std::vector<std::string> &missingTables)
1331 {
1332 if (tableNames.empty()) {
1333 return E_OK;
1334 }
1335 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1336 sqlite3_stmt *stmt = nullptr;
1337 int ret = E_OK;
1338 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1339 if (errCode != E_OK) {
1340 SQLiteUtils::ResetStatement(stmt, true, ret);
1341 return errCode;
1342 }
1343 for (const auto &tableName : tableNames) {
1344 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1345 if (errCode != E_OK) {
1346 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1347 break;
1348 }
1349
1350 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1351 bool onceDropped = false;
1352 errCode = IsTableOnceDropped(tableName, errCode, onceDropped);
1353 if (errCode != E_OK) {
1354 LOGE("query is table once dropped failed. %d", errCode);
1355 break;
1356 }
1357 SQLiteUtils::ResetStatement(stmt, false, ret);
1358 if (onceDropped) { // The table in schema was once dropped
1359 errCode = CleanResourceForDroppedTable(tableName);
1360 if (errCode != E_OK) {
1361 break;
1362 }
1363 missingTables.emplace_back(tableName);
1364 }
1365 }
1366 SQLiteUtils::ResetStatement(stmt, true, ret);
1367 return CheckCorruptedStatus(errCode);
1368 }
1369
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1370 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1371 const TableInfo &baseTbl, const StoreInfo &info)
1372 {
1373 if (dbHandle_ == nullptr) {
1374 return -E_INVALID_DB;
1375 }
1376
1377 if (device.empty() || !baseTbl.IsValid()) {
1378 return -E_INVALID_ARGS;
1379 }
1380
1381 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1382 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1383 if (errCode != E_OK) {
1384 LOGE("Create device table failed. %d", errCode);
1385 return errCode;
1386 }
1387
1388 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1389 if (errCode != E_OK) {
1390 LOGE("Copy index to device table failed. %d", errCode);
1391 }
1392 return errCode;
1393 }
1394
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1395 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1396 const std::string &schemaVersion)
1397 {
1398 if (dbHandle_ == nullptr) {
1399 return -E_INVALID_DB;
1400 }
1401
1402 TableInfo newTable;
1403 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1404 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1405 LOGE("Check new schema failed. %d", errCode);
1406 return errCode;
1407 } else {
1408 errCode = table.CompareWithTable(newTable, schemaVersion);
1409 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1410 LOGE("Check schema failed, schema was changed. %d", errCode);
1411 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1412 } else {
1413 errCode = E_OK;
1414 }
1415 }
1416
1417 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1418 if (errCode != E_OK) {
1419 LOGE("Get query helper for check query failed. %d", errCode);
1420 return errCode;
1421 }
1422
1423 if (!query.IsQueryForRelationalDB()) {
1424 LOGE("Not support for this query type.");
1425 return -E_NOT_SUPPORT;
1426 }
1427
1428 SyncTimeRange defaultTimeRange;
1429 sqlite3_stmt *stmt = nullptr;
1430 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1431 stmt);
1432 if (errCode != E_OK) {
1433 LOGE("Get query statement for check query failed. %d", errCode);
1434 }
1435
1436 SQLiteUtils::ResetStatement(stmt, true, errCode);
1437 return errCode;
1438 }
1439
CheckQueryObjectLegal(const QuerySyncObject & query)1440 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1441 {
1442 if (dbHandle_ == nullptr) {
1443 return -E_INVALID_DB;
1444 }
1445 TableInfo newTable;
1446 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1447 if (errCode != E_OK) {
1448 LOGE("Check new schema failed. %d", errCode);
1449 }
1450 return errCode;
1451 }
1452
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1453 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1454 Timestamp &maxTimestamp) const
1455 {
1456 maxTimestamp = 0;
1457 for (const auto &tableName : tableNames) {
1458 const std::string sql = "SELECT MAX(timestamp) FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log;";
1459 sqlite3_stmt *stmt = nullptr;
1460 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1461 if (errCode != E_OK) {
1462 return errCode;
1463 }
1464 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1465 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1466 maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1467 errCode = E_OK;
1468 }
1469 SQLiteUtils::ResetStatement(stmt, true, errCode);
1470 if (errCode != E_OK) {
1471 maxTimestamp = 0;
1472 return errCode;
1473 }
1474 }
1475 return E_OK;
1476 }
1477
SetLogTriggerStatus(bool status)1478 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1479 {
1480 const std::string key = "log_trigger_switch";
1481 std::string val = status ? "true" : "false";
1482 std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
1483 " VALUES ('" + key + "', '" + val + "')";
1484 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1485 if (errCode != E_OK) {
1486 LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1487 }
1488 return errCode;
1489 }
1490
1491 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1492 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1493 std::vector<RelationalRowData *> &data)
1494 {
1495 size_t totalLength = 0;
1496 do {
1497 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1498 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1499 return E_OK;
1500 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1501 LOGE("Get data by bind sql failed:%d", errCode);
1502 return errCode;
1503 }
1504
1505 if (colNames.empty()) {
1506 SQLiteUtils::GetSelectCols(stmt, colNames); // Get column names.
1507 }
1508 auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1509 if (relaRowData == nullptr) {
1510 LOGE("ExecuteQueryBySqlStmt OOM");
1511 return -E_OUT_OF_MEMORY;
1512 }
1513
1514 auto dataSz = relaRowData->CalcLength();
1515 if (dataSz == 0) { // invalid data
1516 delete relaRowData;
1517 relaRowData = nullptr;
1518 continue;
1519 }
1520
1521 totalLength += static_cast<size_t>(dataSz);
1522 if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) { // the set has been full
1523 delete relaRowData;
1524 relaRowData = nullptr;
1525 LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1526 return -E_REMOTE_OVER_SIZE;
1527 }
1528 data.push_back(relaRowData);
1529 } while (true);
1530 return E_OK;
1531 }
1532 }
1533
1534 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1535 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1536 const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1537 std::vector<RelationalRowData *> &data)
1538 {
1539 int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1540 if (errCode != E_OK) {
1541 return errCode;
1542 }
1543
1544 sqlite3_stmt *stmt = nullptr;
1545 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1546 if (errCode != E_OK) {
1547 (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1548 return errCode;
1549 }
1550 ResFinalizer finalizer([this, &stmt, &errCode] {
1551 (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1552 SQLiteUtils::ResetStatement(stmt, true, errCode);
1553 });
1554 for (size_t i = 0; i < bindArgs.size(); ++i) {
1555 errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1556 if (errCode != E_OK) {
1557 return errCode;
1558 }
1559 }
1560 return GetRowDatas(stmt, isMemDb_, colNames, data);
1561 }
1562
CheckEncryptedOrCorrupted() const1563 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1564 {
1565 if (dbHandle_ == nullptr) {
1566 return -E_INVALID_DB;
1567 }
1568
1569 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1570 if (errCode != E_OK) {
1571 LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1572 }
1573 return errCode;
1574 }
1575
GetExistsDeviceList(std::set<std::string> & devices) const1576 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1577 {
1578 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1579 isMemDb_, devices);
1580 }
1581
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1582 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp ×tamp,
1583 SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1584 {
1585 sqlite3_stmt *stmt = nullptr;
1586 int errCode = helper.GetCloudQueryStatement(false, dbHandle_, timestamp, sql, stmt);
1587 if (errCode != E_OK) {
1588 LOGE("failed to get count statement %d", errCode);
1589 return errCode;
1590 }
1591 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1592 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1593 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1594 errCode = E_OK;
1595 } else {
1596 LOGE("Failed to get the count to be uploaded. %d", errCode);
1597 }
1598 SQLiteUtils::ResetStatement(stmt, true, errCode);
1599 return errCode;
1600 }
1601
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1602 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush,
1603 bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1604 {
1605 int errCode;
1606 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1607 if (errCode != E_OK) {
1608 return errCode;
1609 }
1610 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1611 CloudWaterType::DELETE);
1612 return GetUploadCountInner(timestamp, helper, sql, count);
1613 }
1614
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1615 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> ×tampVec,
1616 bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1617 {
1618 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1619 if (timestampVec.size() != typeVec.size()) {
1620 return -E_INVALID_ARGS;
1621 }
1622 int errCode;
1623 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1624 if (errCode != E_OK) {
1625 return errCode;
1626 }
1627 count = 0;
1628 for (size_t i = 0; i < typeVec.size(); i++) {
1629 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1630 int64_t tempCount = 0;
1631 helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1632 errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1633 if (errCode != E_OK) {
1634 return errCode;
1635 }
1636 count += tempCount;
1637 }
1638 return E_OK;
1639 }
1640
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1641 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1642 bool ignoreEmptyGid)
1643 {
1644 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1645 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1646 return -E_INVALID_ARGS;
1647 }
1648 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1649 + "' SET cloud_gid = ? WHERE data_key = ? ";
1650 sqlite3_stmt *stmt = nullptr;
1651 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1652 if (errCode != E_OK) {
1653 return errCode;
1654 }
1655 errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1656 int resetCode = E_OK;
1657 SQLiteUtils::ResetStatement(stmt, true, resetCode);
1658 return errCode == E_OK ? resetCode : errCode;
1659 }
1660
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1661 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1662 CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1663 {
1664 token.GetCloudTableSchema(tableSchema_);
1665 sqlite3_stmt *queryStmt = nullptr;
1666 bool isStepNext = false;
1667 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1668 if (errCode != E_OK) {
1669 (void)token.ReleaseCloudStatement();
1670 return errCode;
1671 }
1672 uint32_t totalSize = 0;
1673 uint32_t stepNum = -1;
1674 do {
1675 if (isStepNext) {
1676 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1677 if (errCode != E_OK) {
1678 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1679 break;
1680 }
1681 }
1682 isStepNext = true;
1683 errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1684 } while (errCode == E_OK);
1685 if (errCode != -E_UNFINISHED) {
1686 (void)token.ReleaseCloudStatement();
1687 }
1688 return errCode;
1689 }
1690
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1691 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1692 const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1693 bool isCompensatedTask, std::vector<std::string> &cloudGid)
1694 {
1695 sqlite3_stmt *queryStmt = nullptr;
1696 int errCode = E_OK;
1697 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1698 if (errCode != E_OK) {
1699 return errCode;
1700 }
1701 std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1702 isCompensatedTask);
1703 errCode = helper.GetCloudQueryStatement(false, dbHandle_, syncTimeRange.beginTime, sql, queryStmt);
1704 if (errCode != E_OK) {
1705 return errCode;
1706 }
1707 do {
1708 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1709 if (errCode != E_OK) {
1710 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1711 break;
1712 }
1713 GetCloudGid(queryStmt, cloudGid);
1714 } while (errCode == E_OK);
1715 int resetStatementErrCode = E_OK;
1716 SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1717 queryStmt = nullptr;
1718 return (errCode == E_OK ? resetStatementErrCode : errCode);
1719 }
1720
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1721 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1722 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1723 {
1724 VBucket log;
1725 VBucket extraLog;
1726 uint32_t preSize = totalSize;
1727 GetCloudLog(statement, log, totalSize);
1728 GetCloudExtraLog(statement, extraLog);
1729
1730 VBucket data;
1731 int64_t flag = 0;
1732 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1733 if (errCode != E_OK) {
1734 return errCode;
1735 }
1736 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1737 for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1738 Type cloudValue;
1739 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1740 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1741 if (errCode != E_OK) {
1742 return errCode;
1743 }
1744 SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1745 errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1746 if (errCode != E_OK) {
1747 return errCode;
1748 }
1749 }
1750 }
1751
1752 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1753 errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1754 } else {
1755 errCode = -E_UNFINISHED;
1756 }
1757 if (errCode == -E_IGNORE_DATA) {
1758 errCode = E_OK;
1759 totalSize = preSize;
1760 stepNum--;
1761 }
1762 return errCode;
1763 }
1764
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1765 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1766 {
1767 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1768 Asset asset;
1769 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1770 if (errCode != E_OK) {
1771 return errCode;
1772 }
1773 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1774 return -E_CLOUD_INVALID_ASSET;
1775 }
1776 vBucket.insert_or_assign(field.colName, asset);
1777 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1778 Assets assets;
1779 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1780 if (errCode != E_OK) {
1781 return errCode;
1782 }
1783 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1784 return -E_CLOUD_ERROR;
1785 }
1786 if (!CloudStorageUtils::CheckAssetStatus(assets)) {
1787 return -E_CLOUD_INVALID_ASSET;
1788 }
1789 vBucket.insert_or_assign(field.colName, assets);
1790 } else {
1791 vBucket.insert_or_assign(field.colName, cloudValue);
1792 }
1793 return E_OK;
1794 }
1795
SetLocalSchema(const RelationalSchemaObject & localSchema)1796 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1797 {
1798 localSchema_ = localSchema;
1799 }
1800
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1801 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1802 {
1803 std::string setFlag;
1804 if (mode == FLAG_ONLY && isLogicDelete_) {
1805 setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1806 } else {
1807 setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1808 }
1809 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1810 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1811 SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1812 CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1813 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1814 if (errCode != E_OK) {
1815 LOGE("clean cloud log failed, %d", errCode);
1816 return errCode;
1817 }
1818 cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1819 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1820 if (errCode != E_OK) {
1821 LOGE("delete cloud log failed, %d", errCode);
1822 return errCode;
1823 }
1824 // set all flag logout and data upload is not finished.
1825 cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1826 if (mode == FLAG_ONLY) {
1827 cleanLogSql += " = flag | 0x800 & ~0x400;";
1828 } else {
1829 cleanLogSql += " = flag & ~0x400;";
1830 }
1831 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1832 }
1833
CleanUploadFinishedFlag(const std::string & tableName)1834 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1835 {
1836 // unset upload finished flag
1837 std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1838 CloudDbConstant::FLAG + " = flag & ~0x400;";
1839 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1840 }
1841
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1842 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1843 const std::string &logTableName, const RelationalSchemaObject &localSchema)
1844 {
1845 std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1846 " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1847 ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1848 "));";
1849 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1850 if (errCode != E_OK) {
1851 LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1852 return errCode;
1853 }
1854 std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1855 FLAG_IS_CLOUD_CONSISTENCY + ";";
1856 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1857 if (errCode != E_OK) {
1858 LOGE("Failed to delete cloud data on log table, %d.", errCode);
1859 return errCode;
1860 }
1861 errCode = DoCleanAssetId(tableName, localSchema);
1862 if (errCode != E_OK) {
1863 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1864 return errCode;
1865 }
1866 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1867 if (errCode != E_OK) {
1868 LOGE("Failed to clean gid on log table, %d.", errCode);
1869 }
1870 return errCode;
1871 }
1872
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1873 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1874 {
1875 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1876 SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1877 CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1878 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1879 if (errCode != E_OK) {
1880 LOGE("change cloud log flag failed, %d", errCode);
1881 }
1882 return errCode;
1883 }
1884
SetDataOnUserTablWithLogicDelete(const std::string & tableName,const std::string & logTableName)1885 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTablWithLogicDelete(const std::string &tableName,
1886 const std::string &logTableName)
1887 {
1888 UpdateCursorContext context;
1889 context.cursor = GetCursor(tableName);
1890 LOGI("removeData start and cursor is, %d.", context.cursor);
1891 int errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1892 std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_LOGIC_DELETE +
1893 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1894 SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1895 " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND " + FLAG_IS_CLOUD_CONSISTENCY + ");";
1896 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1897 if (errCode != E_OK) {
1898 LOGE("Failed to change cloud data flag on usertable, %d.", errCode);
1899 CreateFuncUpdateCursor(context, nullptr);
1900 return errCode;
1901 }
1902 // deal when data is logicDelete
1903 sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1904 " = '', " + SHARING_RESOURCE + " = '' WHERE " + FLAG_IS_LOGIC_DELETE + ";";
1905 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1906 if (errCode != E_OK) {
1907 LOGE("Failed to deal logic delete data flag on usertable, %d.", errCode);
1908 CreateFuncUpdateCursor(context, nullptr);
1909 return errCode;
1910 }
1911 LOGI("removeData finish and cursor is %d.", context.cursor);
1912 errCode = SetCursor(tableName, context.cursor);
1913 if (errCode != E_OK) {
1914 CreateFuncUpdateCursor(context, nullptr);
1915 LOGE("set new cursor after removeData error %d.", errCode);
1916 return errCode;
1917 }
1918 errCode = CreateFuncUpdateCursor(context, nullptr);
1919 if (errCode != E_OK) {
1920 LOGE("Clear FuncUpdateCursor error %d.", errCode);
1921 }
1922 return ChangeCloudDataFlagOnLogTable(logTableName);
1923 }
1924
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1925 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1926 std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1927 {
1928 sqlite3_stmt *selectStmt = nullptr;
1929 std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1930 " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1931 if (distinguishCloudFlag) {
1932 sql += " AND (";
1933 sql += FLAG_IS_CLOUD;
1934 sql += " OR ";
1935 sql += FLAG_IS_CLOUD_CONSISTENCY;
1936 sql += " ) ";
1937 }
1938 sql += ";";
1939 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1940 if (errCode != E_OK) {
1941 LOGE("Get select data_key statement failed, %d", errCode);
1942 return errCode;
1943 }
1944 do {
1945 errCode = SQLiteUtils::StepWithRetry(selectStmt);
1946 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1947 dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1948 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1949 LOGE("SQLite step failed when query log's data_key : %d", errCode);
1950 break;
1951 }
1952 } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1953 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1954 return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1955 }
1956
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1957 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1958 const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1959 {
1960 std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1961 if (opType == OpType::ONLY_UPDATE_GID) {
1962 updateLogSql += "cloud_gid = ?";
1963 updateColName.push_back(CloudDbConstant::GID_FIELD);
1964 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1965 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1966 updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1967 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1968 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1969 updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1970 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1971 } else if (opType == OpType::UPDATE_TIMESTAMP) {
1972 updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1973 ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1974 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1975 } else if (opType == OpType::CLEAR_GID) {
1976 updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1977 std::to_string(SET_FLAG_ZERO_MASK);
1978 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1979 updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1980 updateColName.push_back(CloudDbConstant::GID_FIELD);
1981 } else {
1982 updateLogSql += " device = 'cloud', timestamp = ?,";
1983 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1984 if (opType == OpType::DELETE) {
1985 updateLogSql += GetCloudDeleteSql(DBCommon::GetLogTableName(tableSchema.name));
1986 } else {
1987 updateLogSql += GetUpdateDataFlagSql() + ", cloud_gid = ?";
1988 updateColName.push_back(CloudDbConstant::GID_FIELD);
1989 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1990 }
1991 }
1992
1993 int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1994 if (errCode != E_OK) {
1995 return errCode;
1996 }
1997
1998 errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1999 if (errCode != E_OK) {
2000 LOGE("Get update log statement failed when update cloud data, %d", errCode);
2001 }
2002 return errCode;
2003 }
2004 } // namespace DistributedDB
2005 #endif
2006