1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33 #include "device_tracker_log_table_manager.h"
34
35 namespace DistributedDB {
36 namespace {
37 static constexpr const char *DATAKEY = "DATA_KEY";
38 static constexpr const char *DEVICE_FIELD = "DEVICE";
39 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
40 static constexpr const char *VERSION = "VERSION";
41 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
42 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
43 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
44 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
45 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
46 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
47 // clean 5th bit of flag to one which is wait compensated sync
48 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
49 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
50 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
51 // set data logic delete, exist passport, delete, not compensated, cloud and consistency
52 static constexpr const char *SET_FLAG_WHEN_LOGOUT = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12) & (~0x20)";
53 // set data logic delete, exist passport, delete, not compensated and cloud
54 static constexpr const char *SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12)";
55 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
56 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
57 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
58 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
59 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
60 static constexpr const int DATA_KEY_INDEX = 0;
61 static constexpr const int TIMESTAMP_INDEX = 3;
62 static constexpr const int W_TIMESTAMP_INDEX = 4;
63 static constexpr const int FLAG_INDEX = 5;
64 static constexpr const int HASH_KEY_INDEX = 6;
65 static constexpr const int CLOUD_GID_INDEX = 7;
66 static constexpr const int VERSION_INDEX = 8;
67 static constexpr const int STATUS_INDEX = 9;
68
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)69 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
70 {
71 if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
72 return SQLITE_DENY;
73 }
74 if (b == SQLITE_FUNCTION) {
75 if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
76 LOGE("Deny fts3_tokenizer in remote query");
77 return SQLITE_DENY;
78 }
79 }
80 return SQLITE_OK;
81 }
82 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)83 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
84 DistributedTableMode mode)
85 : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
86 assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
87 maxUploadCount_(0), maxUploadSize_(0)
88 {
89 bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
90 bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
91 bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
92 bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
93 bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
94 bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
95 bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
96 }
97
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)98 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
99 {
100 std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
101 if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
102 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
103 return -E_NOT_SUPPORT;
104 }
105 std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
106 for (const auto &field : fieldInfos) {
107 if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
108 LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
109 return -E_NOT_SUPPORT;
110 }
111 }
112
113 if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
114 if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
115 LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
116 return -E_NOT_SUPPORT;
117 }
118
119 if (mode == DistributedTableMode::COLLABORATION) {
120 if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
121 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
122 return -E_NOT_SUPPORT;
123 }
124 }
125
126 if (syncType == CLOUD_COOPERATION) {
127 int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
128 if (errCode != E_OK) {
129 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
130 return errCode;
131 }
132 }
133 }
134
135 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
136 if (table.GetPrimaryKey().size() > 1) {
137 LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
138 return -E_NOT_SUPPORT;
139 }
140 }
141
142 return E_OK;
143 }
144
GeneLogInfoForExistedData(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)145 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &identity,
146 const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
147 {
148 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
149 return UpdateTrackerTable(db, identity, tableInfo, logMgrPtr, false);
150 }
151 SQLiteRelationalUtils::GenLogParam param = {db, isMemDb_, isTrackerTable};
152 return SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, tableInfo, logMgrPtr, param);
153 }
154
ResetLogStatus(std::string & tableName)155 int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(std::string &tableName)
156 {
157 int errCode = SetLogTriggerStatus(false);
158 if (errCode != E_OK) {
159 LOGE("Fail to set log trigger on when reset log status, %d", errCode);
160 return errCode;
161 }
162 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
163 std::string sql = "UPDATE " + logTable + " SET status = 0;";
164 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
165 if (errCode != E_OK) {
166 LOGE("Failed to initialize cloud type log data.%d", errCode);
167 }
168 return errCode;
169 }
170
UpdateTrackerTable(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTimestampOnly)171 int SQLiteSingleVerRelationalStorageExecutor::UpdateTrackerTable(sqlite3 *db, const std::string &identity,
172 const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTimestampOnly)
173 {
174 int errCode = SetLogTriggerStatus(false);
175 if (errCode != E_OK) {
176 return errCode;
177 }
178 std::string currentLocalTimeStr;
179 std::tie(errCode, currentLocalTimeStr) = SQLiteRelationalUtils::GetCurrentVirtualTime(db);
180 if (errCode != E_OK) {
181 LOGE("Failed to get local timeOffset.%d", errCode);
182 return errCode;
183 }
184 std::string tableName = tableInfo.GetTableName();
185 std::string logTable = DBCommon::GetLogTableName(tableName);
186 TrackerTable trackerTable = tableInfo.GetTrackerTable();
187 trackerTable.SetTableName(tableName);
188 if (isTimestampOnly) {
189 std::string sql = "update " + logTable + " set timestamp = " + currentLocalTimeStr +
190 " + data_key, wtimestamp = " + currentLocalTimeStr + " + data_key where data_key != -1;";
191 errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
192 if (errCode != E_OK) {
193 LOGE("Failed to initialize device type log data.%d", errCode);
194 return errCode;
195 }
196 return UpgradedLogForExistedData(tableInfo, false);
197 }
198 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
199 std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
200 static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
201 std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
202 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + currentLocalTimeStr +
203 " + " + rowid + ", " + currentLocalTimeStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash +
204 ", '', '', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
205 errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
206 int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
207 if (ret != E_OK) {
208 LOGE("Failed to initialize cloud type log data.%d", ret);
209 }
210 return ret;
211 });
212 if (errCode != E_OK) {
213 LOGE("Failed to initialize device type log data.%d", errCode);
214 }
215 return errCode;
216 }
217
CreateRelationalLogTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)218 int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded,
219 const std::string &identity, TableInfo &table)
220 {
221 // create log table
222 std::unique_ptr<SqliteLogTableManager> tableManager;
223 tableManager = LogTableManagerFactory::GetTableManager(table, mode, table.GetTableSyncType());
224 if (tableManager == nullptr) {
225 LOGE("[CreateRelationalLogTable] get table manager failed");
226 return -E_INVALID_DB;
227 }
228 int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
229 if (errCode != E_OK) {
230 LOGE("[CreateDistributedTable] create log table failed");
231 return errCode;
232 }
233 std::string tableName = table.GetTableName();
234 bool isOnceDropped = false;
235 (void)IsTableOnceDropped(tableName, isOnceDropped);
236 if (isOnceDropped) {
237 SQLiteRelationalUtils::GenLogParam param = {
238 dbHandle_, isMemDb_, table.GetTrackerTable().GetTableName().empty()
239 };
240 errCode = SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, table, tableManager, param);
241 } else if (!isUpgraded) {
242 if (table.GetTrackerTable().GetTableName().empty()) {
243 errCode = GeneLogInfoForExistedData(dbHandle_, identity, table, tableManager, false);
244 } else if (table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
245 // tracker table -> distributed device table
246 errCode = UpdateTrackerTable(dbHandle_, tableName, table, tableManager, true);
247 } else {
248 // tracker table -> distributed cloud table
249 errCode = ResetLogStatus(tableName);
250 }
251 }
252 if (errCode != E_OK) {
253 LOGE("[CreateDistributedTable] generate log isUpgraded %d failed %d.", static_cast<int>(isUpgraded), errCode);
254 return errCode;
255 }
256
257 // add trigger
258 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
259 if (errCode != E_OK) {
260 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
261 return errCode;
262 }
263 return SetLogTriggerStatus(true);
264 }
265
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)266 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
267 const std::string &identity, TableInfo &table)
268 {
269 if (dbHandle_ == nullptr) {
270 return -E_INVALID_DB;
271 }
272
273 const std::string tableName = table.GetTableName();
274 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
275 if (errCode != E_OK) {
276 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
277 return errCode;
278 }
279
280 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
281 bool isEmpty = false;
282 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
283 if (errCode != E_OK) {
284 LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
285 return -E_NOT_SUPPORT;
286 }
287 if (!isEmpty) {
288 LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
289 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
290 static_cast<int>(table.GetTableSyncType()));
291 }
292 }
293
294 errCode = CheckTableConstraint(table, mode, table.GetTableSyncType());
295 if (errCode != E_OK) {
296 LOGE("[CreateDistributedTable] check table constraint failed.");
297 return errCode;
298 }
299
300 return CreateRelationalLogTable(mode, isUpgraded, identity, table);
301 }
302
CompareSchemaTableColumns(const std::string & tableName)303 int SQLiteSingleVerRelationalStorageExecutor::CompareSchemaTableColumns(const std::string &tableName)
304 {
305 bool onceDropped = false;
306 int errCode = IsTableOnceDropped(tableName, onceDropped);
307 if (!onceDropped) {
308 // do not return error code to make sure main procedure will continue
309 return E_OK;
310 }
311 LOGI("[CompareSchemaTableColumns] table once dropped check schema. table name is %s length is %zu",
312 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
313 TableInfo newTableInfo;
314 errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
315 if (errCode != E_OK) {
316 LOGE("[CompareSchemaTableColumns] analysis table schema failed. %d", errCode);
317 return errCode;
318 }
319 // new table should has same or compatible upgrade
320 TableInfo tableInfo = localSchema_.GetTable(tableName);
321 errCode = tableInfo.CompareWithTable(newTableInfo, localSchema_.GetSchemaVersion());
322 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
323 LOGE("[CompareSchemaTableColumns] Not support with incompatible table.");
324 return -E_SCHEMA_MISMATCH;
325 }
326 return errCode;
327 }
328
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)329 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
330 DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
331 {
332 if (dbHandle_ == nullptr) {
333 return -E_INVALID_DB;
334 }
335 TableInfo newTableInfo;
336 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
337 if (errCode != E_OK) {
338 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
339 return errCode;
340 }
341
342 if (CheckTableConstraint(newTableInfo, mode, syncType)) {
343 LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
344 return -E_NOT_SUPPORT;
345 }
346
347 // new table should has same or compatible upgrade
348 TableInfo tableInfo = schema.GetTable(tableName);
349 errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
350 bool onceDropped = false;
351 int ret = IsTableOnceDropped(tableName, onceDropped);
352 if (ret != E_OK) {
353 LOGE("[UpgradeDistributedTable] Get table %s dropped status fail %d.",
354 DBCommon::StringMiddleMasking(tableName).c_str(), ret);
355 return ret;
356 }
357 LOGI("[UpgradeDistributedTable] table name is %s length is %zu, the dropped status is %d, errCode is %d",
358 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), onceDropped, errCode);
359 if (!onceDropped) {
360 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
361 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
362 return -E_SCHEMA_MISMATCH;
363 } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
364 LOGD("[UpgradeDistributedTable] schema has not changed.");
365 // update table if tableName changed
366 schema.RemoveRelationalTable(tableName);
367 tableInfo.SetTableName(tableName);
368 schema.AddRelationalTable(tableInfo);
369 return E_OK;
370 }
371 }
372
373 schemaChanged = true;
374 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
375 if (errCode != E_OK) {
376 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
377 }
378
379 schema.AddRelationalTable(newTableInfo);
380 return errCode;
381 }
382
383 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)384 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
385 std::vector<std::string> &deviceTables)
386 {
387 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
388 return -E_INVALID_ARGS;
389 }
390 std::string devicePattern = device.empty() ? "%" : device;
391 std::string tablePattern = tableName.empty() ? "%" : tableName;
392 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
393
394 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
395 deviceTableName + "';";
396 sqlite3_stmt *stmt = nullptr;
397 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
398 if (errCode != E_OK) {
399 return errCode;
400 }
401
402 do {
403 errCode = SQLiteUtils::StepWithRetry(stmt, false);
404 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
405 errCode = E_OK;
406 break;
407 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
408 LOGE("Get table name failed. %d", errCode);
409 break;
410 }
411 std::string realTableName;
412 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
413 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
414 continue;
415 }
416 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
417 continue;
418 }
419 deviceTables.emplace_back(realTableName);
420 } while (true);
421
422 int ret = E_OK;
423 SQLiteUtils::ResetStatement(stmt, true, ret);
424 return errCode != E_OK ? errCode : ret;
425 }
426
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)427 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
428 {
429 std::vector<FieldInfo> fields;
430 auto itOld = oldTableInfo.GetFields().begin();
431 auto itNew = newTableInfo.GetFields().begin();
432 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
433 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
434 fields.emplace_back(itNew->second);
435 continue;
436 }
437 itOld++;
438 }
439 return fields;
440 }
441
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)442 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
443 {
444 if (db == nullptr) {
445 return -E_INVALID_ARGS;
446 }
447
448 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
449 return a.GetColumnId()< b.GetColumnId();
450 });
451 int errCode = E_OK;
452 for (const auto &table : tables) {
453 for (const auto &field : fields) {
454 std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
455 alterSql += "'" + field.GetDataType() + "'";
456 alterSql += field.IsNotNull() ? " NOT NULL" : "";
457 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
458 alterSql += ";";
459 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
460 if (errCode != E_OK) {
461 LOGE("Alter table failed. %d", errCode);
462 break;
463 }
464 }
465 }
466 return errCode;
467 }
468
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)469 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
470 {
471 IndexInfoMap indexes;
472 auto itOld = oldTableInfo.GetIndexDefine().begin();
473 auto itNew = newTableInfo.GetIndexDefine().begin();
474 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
475 auto itNewEnd = newTableInfo.GetIndexDefine().end();
476
477 while (itOld != itOldEnd && itNew != itNewEnd) {
478 if (itOld->first == itNew->first) {
479 if (itOld->second != itNew->second) {
480 indexes.insert({itNew->first, itNew->second});
481 }
482 itOld++;
483 itNew++;
484 } else if (itOld->first < itNew->first) {
485 indexes.insert({itOld->first, {}});
486 itOld++;
487 } else {
488 indexes.insert({itNew->first, itNew->second});
489 itNew++;
490 }
491 }
492
493 while (itOld != itOldEnd) {
494 indexes.insert({itOld->first, {}});
495 itOld++;
496 }
497
498 while (itNew != itNewEnd) {
499 indexes.insert({itNew->first, itNew->second});
500 itNew++;
501 }
502
503 return indexes;
504 }
505
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)506 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
507 {
508 if (db == nullptr) {
509 return -E_INVALID_ARGS;
510 }
511
512 int errCode = E_OK;
513 for (const auto &table : tables) {
514 for (const auto &index : indexes) {
515 if (index.first.empty()) {
516 continue;
517 }
518 std::string realIndexName = table + "_" + index.first;
519 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
520 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
521 if (errCode != E_OK) {
522 LOGE("Drop index failed. %d", errCode);
523 return errCode;
524 }
525
526 if (index.second.empty()) { // empty means drop index only
527 continue;
528 }
529
530 auto it = index.second.begin();
531 std::string indexDefine = *it++;
532 while (it != index.second.end()) {
533 indexDefine += ", " + *it++;
534 }
535 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
536 "(" + indexDefine + ");";
537 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
538 if (errCode != E_OK) {
539 LOGE("Create index failed. %d", errCode);
540 break;
541 }
542 }
543 }
544 return errCode;
545 }
546 }
547
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)548 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
549 const TableInfo &newTableInfo)
550 {
551 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
552 IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
553 std::vector<std::string> deviceTables;
554 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
555 if (errCode != E_OK) {
556 LOGE("Get device table name for alter table failed. %d", errCode);
557 return errCode;
558 }
559
560 LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
561 upgradeIndexes.size(), deviceTables.size());
562 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
563 if (errCode != E_OK) {
564 LOGE("upgrade fields failed. %d", errCode);
565 return errCode;
566 }
567
568 errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
569 if (errCode != E_OK) {
570 LOGE("upgrade indexes failed. %d", errCode);
571 }
572
573 return errCode;
574 }
575
StartTransaction(TransactType type)576 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
577 {
578 if (dbHandle_ == nullptr) {
579 LOGE("Begin transaction failed, dbHandle is null.");
580 return -E_INVALID_DB;
581 }
582 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
583 if (errCode != E_OK) {
584 LOGE("Begin transaction failed, errCode = %d", errCode);
585 }
586 return errCode;
587 }
588
Commit()589 int SQLiteSingleVerRelationalStorageExecutor::Commit()
590 {
591 if (dbHandle_ == nullptr) {
592 return -E_INVALID_DB;
593 }
594
595 return SQLiteUtils::CommitTransaction(dbHandle_);
596 }
597
Rollback()598 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
599 {
600 if (dbHandle_ == nullptr) {
601 return -E_INVALID_DB;
602 }
603 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
604 if (errCode != E_OK) {
605 LOGE("sqlite single ver relation storage executor rollback fail! errCode = [%d]", errCode);
606 }
607 return errCode;
608 }
609
SetTableInfo(const TableInfo & tableInfo)610 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
611 {
612 table_ = tableInfo;
613 }
614
615 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)616 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize,
617 int64_t &revisedTime, int64_t &invalidTime)
618 {
619 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
620 uint64_t curTime = 0;
621 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
622 if (modifyTime > static_cast<int64_t>(curTime)) {
623 invalidTime = modifyTime;
624 modifyTime = static_cast<int64_t>(curTime);
625 revisedTime = modifyTime;
626 }
627 } else {
628 LOGW("[Relational] get raw sys time failed.");
629 }
630 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
631 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
632 static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
633 totalSize += sizeof(int64_t) + sizeof(int64_t);
634 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
635 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
636 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
637 if (!cloudGid.empty()) {
638 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
639 totalSize += cloudGid.size();
640 }
641 }
642 if (revisedTime != 0) {
643 std::string cloudGid;
644 if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
645 cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
646 }
647 LOGW("[Relational] Found invalid mod time: %lld, curTime: %lld, gid: %s", invalidTime, revisedTime,
648 DBCommon::StringMiddleMasking(cloudGid).c_str());
649 }
650 std::string version;
651 SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
652 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
653 totalSize += version.size();
654 }
655
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)656 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
657 {
658 flags.insert_or_assign(DBConstant::ROWID,
659 static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
660 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
661 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
662 flags.insert_or_assign(CloudDbConstant::FLAG,
663 static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
664 Bytes hashKey;
665 (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
666 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
667 flags.insert_or_assign(CloudDbConstant::STATUS,
668 static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
669 }
670
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)671 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
672 {
673 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
674 return;
675 }
676 std::string gid = reinterpret_cast<const std::string::value_type *>(
677 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
678 if (gid.empty()) {
679 LOGW("[Relational] Get cloud gid is null.");
680 return;
681 }
682 cloudGid.emplace_back(gid);
683 }
684 }
685
GetDataItemSerialSize(DataItem & item,size_t appendLen)686 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
687 {
688 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
689 // the size would not be very large.
690 static const size_t maxOrigDevLength = 40;
691 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
692 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
693 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
694 return dataSize;
695 }
696
GetKvData(const Key & key,Value & value) const697 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
698 {
699 return SQLiteRelationalUtils::GetKvData(dbHandle_, isMemDb_, key, value);
700 }
701
GetKvDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const702 int SQLiteSingleVerRelationalStorageExecutor::GetKvDataByPrefixKey(const Key &keyPrefix,
703 std::map<Key, Value> &data) const
704 {
705 std::string metaTableName = std::string(DBConstant::RELATIONAL_PREFIX) + "metadata";
706 return SqliteMetaExecutor::GetMetaDataByPrefixKey(dbHandle_, isMemDb_, metaTableName, keyPrefix, data);
707 }
708
PutKvData(const Key & key,const Value & value) const709 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
710 {
711 return SQLiteRelationalUtils::PutKvData(dbHandle_, isMemDb_, key, value);
712 }
713
DeleteMetaData(const std::vector<Key> & keys) const714 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
715 {
716 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
717 "metadata WHERE key=?;";
718 sqlite3_stmt *statement = nullptr;
719 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
720 if (errCode != E_OK) {
721 return errCode;
722 }
723
724 int ret = E_OK;
725 for (const auto &key : keys) {
726 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
727 if (errCode != E_OK) {
728 break;
729 }
730
731 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
732 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
733 break;
734 }
735 errCode = E_OK;
736 SQLiteUtils::ResetStatement(statement, false, ret);
737 }
738 SQLiteUtils::ResetStatement(statement, true, ret);
739 return CheckCorruptedStatus(errCode);
740 }
741
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const742 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
743 {
744 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " +
745 std::string(DBConstant::RELATIONAL_PREFIX) + "metadata WHERE key>=? AND key<=?;";
746 sqlite3_stmt *statement = nullptr;
747 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
748 if (errCode != E_OK) {
749 return errCode;
750 }
751
752 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
753 if (errCode == E_OK) {
754 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
755 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
756 errCode = E_OK;
757 }
758 }
759 int ret = E_OK;
760 SQLiteUtils::ResetStatement(statement, true, ret);
761 return CheckCorruptedStatus(errCode);
762 }
763
GetAllMetaKeys(std::vector<Key> & keys) const764 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
765 {
766 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
767 "metadata;";
768 sqlite3_stmt *statement = nullptr;
769 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
770 if (errCode != E_OK) {
771 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
772 return errCode;
773 }
774 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
775 int ret = E_OK;
776 SQLiteUtils::ResetStatement(statement, true, ret);
777 return errCode != E_OK ? errCode : ret;
778 }
779
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)780 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
781 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
782 {
783 if (stmt == nullptr) {
784 int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
785 if (errCode != E_OK) {
786 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
787 return errCode;
788 }
789 }
790
791 int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
792 if (errCode != E_OK) {
793 return errCode;
794 }
795 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
796 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
797 errCode = E_OK;
798 }
799 int ret = E_OK;
800 SQLiteUtils::ResetStatement(stmt, false, ret); // Finalize outside.
801 return errCode != E_OK ? errCode : ret;
802 }
803
SaveSyncDataItem(const DataItem & dataItem,bool isUpdate,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,std::map<std::string,Type> & saveVals)804 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, bool isUpdate,
805 SaveSyncDataStmt &saveStmt, RelationalSyncDataInserter &inserter, std::map<std::string, Type> &saveVals)
806 {
807 if (std::get_if<int64_t>(&saveVals[DBConstant::ROWID]) == nullptr) {
808 LOGE("[SaveSyncDataItem] Invalid args because of no rowid!");
809 return -E_INVALID_ARGS;
810 }
811 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
812 int errCode = inserter.GetObserverDataByRowId(dbHandle_, std::get<int64_t>(saveVals[DBConstant::ROWID]),
813 ChangeType::OP_DELETE);
814 if (errCode != E_OK) {
815 LOGE("[SaveSyncDataItem] Failed to get primary data before deletion, errCode: %d", errCode);
816 return errCode;
817 }
818 saveVals[DBConstant::ROWID] = static_cast<int64_t>(-1);
819 return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
820 }
821 // we don't know the rowid if user drop device table
822 // SPLIT_BY_DEVICE use insert or replace to update data
823 // no pk table should delete by hash key(rowid) first
824 if ((mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().IsNoPkTable())) {
825 int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
826 if (errCode != E_OK) {
827 LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
828 return errCode;
829 }
830 }
831 int errCode = inserter.SaveData(isUpdate, dataItem, saveStmt, saveVals);
832 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
833 return errCode;
834 }
835 if (!isUpdate) {
836 saveVals[DBConstant::ROWID] = SQLiteUtils::GetLastRowId(dbHandle_);
837 }
838 errCode = inserter.GetObserverDataByRowId(dbHandle_, std::get<int64_t>(saveVals[DBConstant::ROWID]),
839 isUpdate ? ChangeType::OP_UPDATE : ChangeType::OP_INSERT);
840 if (errCode != E_OK) {
841 LOGE("Get observer data by rowid failed, errCode=%d.", errCode);
842 }
843 return errCode;
844 }
845
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)846 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
847 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
848 {
849 if (stmt == nullptr) {
850 int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
851 if (errCode != E_OK) {
852 LOGE("[DeleteSyncLog] Get statement fail!");
853 return errCode;
854 }
855 }
856
857 int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
858 if (errCode != E_OK) {
859 return errCode;
860 }
861 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
862 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
863 errCode = E_OK;
864 }
865 int ret = E_OK;
866 SQLiteUtils::ResetStatement(stmt, false, ret); // Finalize outside.
867 return errCode != E_OK ? errCode : ret;
868 }
869
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)870 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
871 RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
872 {
873 int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
874 if (errCode != E_OK) {
875 return errCode;
876 }
877 return DeleteSyncLog(item, inserter, rmLogStmt);
878 }
879
CheckDataConflictDefeated(const DataItem & dataItem,const RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DeviceSyncSaveDataInfo & saveDataInfo)880 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
881 const RelationalSyncDataInserter &inserter, SaveSyncDataStmt &saveStmt, DeviceSyncSaveDataInfo &saveDataInfo)
882 {
883 auto &logInfoGet = saveDataInfo.localLogInfo;
884 int errCode = SQLiteRelationalUtils::GetLocalLogInfo(inserter, dataItem, mode_, logInfoGet, saveStmt);
885 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
886 LOGE("Failed to get raw data. %d", errCode);
887 return errCode;
888 }
889
890 auto &rowId = saveDataInfo.rowId;
891 auto &isExist = saveDataInfo.isExist;
892 auto &isDefeated = saveDataInfo.isDefeated;
893 rowId = logInfoGet.dataKey;
894 isExist = (errCode != -E_NOT_FOUND) && ((logInfoGet.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) == 0);
895 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
896 mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
897 isDefeated = false; // no need to solve conflict except miss query data
898 return E_OK;
899 }
900 // defeated if item timestamp is earlier than raw data
901 isDefeated = (dataItem.timestamp < logInfoGet.timestamp);
902 return E_OK;
903 }
904
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)905 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
906 SaveSyncDataStmt &saveStmt, DataItem &item)
907 {
908 DeviceSyncSaveDataInfo deviceSyncSaveDataInfo;
909 int errCode = CheckDataConflictDefeated(item, inserter, saveStmt, deviceSyncSaveDataInfo);
910 if (errCode != E_OK) {
911 LOGE("check data conflict failed. %d", errCode);
912 return errCode;
913 }
914 bool &isDefeated = deviceSyncSaveDataInfo.isDefeated;
915 bool &isExist = deviceSyncSaveDataInfo.isExist;
916 int64_t &rowid = deviceSyncSaveDataInfo.rowId;
917
918 if (isDefeated) {
919 LOGD("Data was defeated.");
920 return E_OK;
921 }
922 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
923 return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
924 }
925 if (!isExist && ((item.flag & DataItem::DELETE_FLAG) != 0)) {
926 LOGI("[RelationalStorageExecutor][SaveSyncDataItem] Delete non-exist data. Nothing to save.");
927 return E_OK;
928 }
929 bool isUpdate = isExist && mode_ == DistributedTableMode::COLLABORATION;
930 Key sourceHash = item.hashKey;
931 if (isUpdate) {
932 item.hashKey = deviceSyncSaveDataInfo.localLogInfo.hashKey;
933 }
934 std::map<std::string, Type> saveVals;
935 saveVals[DBConstant::ROWID] = rowid;
936 errCode = SaveSyncDataItem(item, isUpdate, saveStmt, inserter, saveVals);
937 if (errCode == E_OK) {
938 item.hashKey = sourceHash;
939 errCode = inserter.SaveSyncLog(dbHandle_, item, deviceSyncSaveDataInfo, saveVals, saveStmt);
940 }
941 return errCode;
942 }
943
SaveSyncDataItems(RelationalSyncDataInserter & inserter)944 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
945 {
946 SaveSyncDataStmt saveStmt;
947 int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
948 if (errCode != E_OK) {
949 LOGE("Prepare insert sync data statement failed.");
950 return errCode;
951 }
952
953 errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
954 if (item.neglect) { // Do not save this record if it is neglected
955 return E_OK;
956 }
957 int errCode = SaveSyncDataItem(inserter, saveStmt, item);
958 if (errCode != E_OK) {
959 LOGE("save sync data item failed. err=%d", errCode);
960 return errCode;
961 }
962 // Need not reset rmDataStmt and rmLogStmt here.
963 return saveStmt.ResetStatements(false);
964 });
965
966 int ret = saveStmt.ResetStatements(true);
967 return errCode != E_OK ? errCode : ret;
968 }
969
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)970 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
971 {
972 if (useTrans) {
973 int errCode = StartTransaction(TransactType::IMMEDIATE);
974 if (errCode != E_OK) {
975 return errCode;
976 }
977 }
978
979 int errCode = SetLogTriggerStatus(false);
980 if (errCode != E_OK) {
981 goto END;
982 }
983 errCode = SaveSyncDataItems(inserter);
984 if (errCode != E_OK) {
985 LOGE("Save sync data items failed. errCode=%d", errCode);
986 goto END;
987 }
988 errCode = SetLogTriggerStatus(true);
989 END:
990 if (useTrans) {
991 if (errCode == E_OK) {
992 errCode = Commit();
993 } else {
994 (void)Rollback(); // Keep the error code of the first scene
995 }
996 }
997 return errCode;
998 }
999
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1000 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1001 bool isGettingDeletedData) const
1002 {
1003 RowDataWithLog data;
1004 int errCode = SQLiteRelationalUtils::GetLogData(stmt, data.logInfo);
1005 if (errCode != E_OK) {
1006 LOGE("relational data value transfer to kv fail");
1007 return errCode;
1008 }
1009 std::vector<FieldInfo> serializeFields;
1010 if (!isGettingDeletedData) {
1011 auto fields = table_.GetFieldInfos();
1012 for (size_t cid = 0; cid < fields.size(); ++cid) {
1013 if (localSchema_.IsNeedSkipSyncField(fields[cid], table_.GetTableName())) {
1014 continue;
1015 }
1016 DataValue value;
1017 errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1018 value);
1019 if (errCode != E_OK) {
1020 return errCode;
1021 }
1022 data.rowData.push_back(std::move(value)); // sorted by cid
1023 serializeFields.push_back(fields[cid]);
1024 }
1025 }
1026
1027 errCode = DataTransformer::SerializeDataItem(data,
1028 isGettingDeletedData ? std::vector<FieldInfo>() : serializeFields, dataItem);
1029 if (errCode != E_OK) {
1030 LOGE("relational data value transfer to kv fail");
1031 }
1032 return errCode;
1033 }
1034
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1035 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1036 {
1037 int errCode = GetDataItemForSync(fullStmt, item, false);
1038 if (errCode != E_OK) {
1039 return errCode;
1040 }
1041 item.value = {};
1042 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1043 return E_OK;
1044 }
1045
1046 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1047 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp ×tamp)
1048 {
1049 if (stmt == nullptr) {
1050 return -E_INVALID_ARGS;
1051 }
1052 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1053 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1054 timestamp = INT64_MAX;
1055 errCode = E_OK;
1056 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1057 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
1058 errCode = E_OK;
1059 }
1060 return errCode;
1061 }
1062
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1063 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1064 std::vector<DataItem> &dataItems, DataItem &&item)
1065 {
1066 // If one record is over 4M, ignore it.
1067 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1068 overLongSize++;
1069 } else {
1070 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1071 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1072 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1073 return -E_UNFINISHED;
1074 } else {
1075 dataItems.push_back(item);
1076 }
1077 }
1078 return E_OK;
1079 }
1080 }
1081
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1082 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1083 sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1084 {
1085 if (!isFirstTime) { // For the first time, never step before, can get nothing
1086 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1087 if (errCode != E_OK) {
1088 return errCode;
1089 }
1090 }
1091 return StepNext(isMemDb_, queryStmt, queryTime);
1092 }
1093
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1094 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1095 Timestamp &missQueryTime)
1096 {
1097 int errCode = GetMissQueryData(fullStmt, item);
1098 if (errCode != E_OK) {
1099 return errCode;
1100 }
1101 return StepNext(isMemDb_, fullStmt, missQueryTime);
1102 }
1103
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1104 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1105 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1106 const TableInfo &tableInfo)
1107 {
1108 baseTblName_ = tableInfo.GetTableName();
1109 SetTableInfo(tableInfo);
1110 sqlite3_stmt *queryStmt = nullptr;
1111 sqlite3_stmt *fullStmt = nullptr;
1112 bool isGettingDeletedData = false;
1113 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1114 if (errCode != E_OK) {
1115 return errCode;
1116 }
1117
1118 Timestamp queryTime = 0;
1119 Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1120
1121 bool isFirstTime = true;
1122 size_t dataTotalSize = 0;
1123 size_t overLongSize = 0;
1124 do {
1125 DataItem item;
1126 if (queryTime < missQueryTime) {
1127 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1128 } else if (queryTime == missQueryTime) {
1129 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1130 if (errCode != E_OK) {
1131 break;
1132 }
1133 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1134 } else {
1135 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1136 }
1137
1138 if (errCode == E_OK && !isFirstTime) {
1139 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1140 }
1141
1142 if (errCode != E_OK) {
1143 break;
1144 }
1145
1146 isFirstTime = false;
1147 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1148 errCode = -E_FINISHED;
1149 break;
1150 }
1151 } while (true);
1152 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1153 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1154 int ret = E_OK;
1155 SQLiteUtils::ResetStatement(queryStmt, true, ret);
1156 SQLiteUtils::ResetStatement(fullStmt, true, ret);
1157 return errCode != E_OK ? errCode : ret;
1158 }
1159
CheckDBModeForRelational()1160 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1161 {
1162 std::string journalMode;
1163 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1164
1165 for (auto &c : journalMode) { // convert to lowercase
1166 c = static_cast<char>(std::tolower(c));
1167 }
1168
1169 if (errCode == E_OK && journalMode != "wal") {
1170 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1171 return -E_NOT_SUPPORT;
1172 }
1173 return errCode;
1174 }
1175
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1176 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1177 const std::string &tableName)
1178 {
1179 std::vector<std::string> deviceTables;
1180 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1181 if (errCode != E_OK) {
1182 LOGE("Get device table name for alter table failed. %d", errCode);
1183 return errCode;
1184 }
1185
1186 LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1187 for (const auto &table : deviceTables) {
1188 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1189 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1190 if (errCode != E_OK) {
1191 LOGE("Delete device data failed. %d", errCode);
1192 break;
1193 }
1194 }
1195 return errCode;
1196 }
1197
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1198 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1199 {
1200 std::string deleteLogSql =
1201 "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1202 "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1203 return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1204 }
1205
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1206 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1207 const std::string &tableName)
1208 {
1209 std::string deleteLogSql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1210 "_log WHERE device = ?";
1211 sqlite3_stmt *deleteLogStmt = nullptr;
1212 int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1213 if (errCode != E_OK) {
1214 LOGE("Get delete device data log statement failed. %d", errCode);
1215 return errCode;
1216 }
1217
1218 int ret = E_OK;
1219 errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1220 if (errCode != E_OK) {
1221 LOGE("Bind device to delete data log statement failed. %d", errCode);
1222 SQLiteUtils::ResetStatement(deleteLogStmt, true, ret);
1223 return errCode;
1224 }
1225
1226 errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1227 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1228 errCode = E_OK;
1229 } else {
1230 LOGE("Delete data log failed. %d", errCode);
1231 }
1232
1233 SQLiteUtils::ResetStatement(deleteLogStmt, true, ret);
1234 return errCode != E_OK ? errCode : ret;
1235 }
1236
DeleteDistributedLogTable(const std::string & tableName)1237 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1238 {
1239 if (tableName.empty()) {
1240 return -E_INVALID_ARGS;
1241 }
1242 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1243 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1244 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1245 if (errCode != E_OK) {
1246 LOGE("Delete distributed log table failed. %d", errCode);
1247 }
1248 return errCode;
1249 }
1250
IsTableOnceDropped(const std::string & tableName,bool & onceDropped)1251 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, bool &onceDropped)
1252 {
1253 std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1254 Key key;
1255 DBCommon::StringToVector(keyStr, key);
1256 Value value;
1257
1258 int errCode = GetKvData(key, value);
1259 if (errCode == E_OK) {
1260 onceDropped = true;
1261 LOGW("[RDBExecutor] Table %s once dropped!", DBCommon::StringMiddleMasking(tableName).c_str());
1262 return E_OK;
1263 } else if (errCode == -E_NOT_FOUND) {
1264 onceDropped = false;
1265 return E_OK;
1266 }
1267 return errCode;
1268 }
1269
CleanResourceForDroppedTable(const std::string & tableName)1270 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1271 {
1272 int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1273 if (errCode != E_OK) {
1274 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1275 return errCode;
1276 }
1277 errCode = DeleteDistributedLogTable(tableName);
1278 if (errCode != E_OK) {
1279 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1280 return errCode;
1281 }
1282 errCode = DeleteTableTrigger(tableName);
1283 if (errCode != E_OK) {
1284 LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1285 }
1286 return errCode;
1287 }
1288
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1289 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1290 std::vector<std::string> &missingTables)
1291 {
1292 if (tableNames.empty()) {
1293 return E_OK;
1294 }
1295 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1296 sqlite3_stmt *stmt = nullptr;
1297 int ret = E_OK;
1298 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1299 if (errCode != E_OK) {
1300 SQLiteUtils::ResetStatement(stmt, true, ret);
1301 return errCode;
1302 }
1303 for (const auto &tableName : tableNames) {
1304 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1305 if (errCode != E_OK) {
1306 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1307 break;
1308 }
1309
1310 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1311 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1312 errCode = CleanResourceForDroppedTable(tableName);
1313 if (errCode != E_OK) {
1314 LOGE("Clean resource for dropped table failed. %d", errCode);
1315 break;
1316 }
1317 missingTables.emplace_back(tableName);
1318 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1319 LOGE("[CheckAndCleanDisTable]Check distributed table failed. %d", errCode);
1320 break;
1321 }
1322 errCode = E_OK; // Check result ok for distributed table is still exists
1323 SQLiteUtils::ResetStatement(stmt, false, ret);
1324 }
1325 SQLiteUtils::ResetStatement(stmt, true, ret);
1326 return CheckCorruptedStatus(errCode);
1327 }
1328
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1329 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1330 const TableInfo &baseTbl, const StoreInfo &info)
1331 {
1332 if (dbHandle_ == nullptr) {
1333 return -E_INVALID_DB;
1334 }
1335
1336 if (device.empty() || !baseTbl.IsValid()) {
1337 return -E_INVALID_ARGS;
1338 }
1339
1340 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1341 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1342 if (errCode != E_OK) {
1343 LOGE("Create device table failed. %d", errCode);
1344 return errCode;
1345 }
1346
1347 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1348 if (errCode != E_OK) {
1349 LOGE("Copy index to device table failed. %d", errCode);
1350 }
1351 return errCode;
1352 }
1353
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1354 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1355 const std::string &schemaVersion)
1356 {
1357 if (dbHandle_ == nullptr) {
1358 return -E_INVALID_DB;
1359 }
1360
1361 TableInfo newTable;
1362 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1363 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1364 LOGE("Check new schema failed. %d", errCode);
1365 return errCode;
1366 } else {
1367 errCode = table.CompareWithTable(newTable, schemaVersion);
1368 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1369 LOGE("Check schema failed, schema was changed. %d", errCode);
1370 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1371 } else {
1372 errCode = E_OK;
1373 }
1374 }
1375
1376 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1377 if (errCode != E_OK) {
1378 LOGE("Get query helper for check query failed. %d", errCode);
1379 return errCode;
1380 }
1381
1382 if (!query.IsQueryForRelationalDB()) {
1383 LOGE("Not support for this query type.");
1384 return -E_NOT_SUPPORT;
1385 }
1386
1387 SyncTimeRange defaultTimeRange;
1388 sqlite3_stmt *stmt = nullptr;
1389 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1390 stmt);
1391 if (errCode != E_OK) {
1392 LOGE("Get query statement for check query failed. %d", errCode);
1393 }
1394
1395 int ret = E_OK;
1396 SQLiteUtils::ResetStatement(stmt, true, ret);
1397 return errCode != E_OK ? errCode : ret;
1398 }
1399
CheckQueryObjectLegal(const QuerySyncObject & query)1400 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1401 {
1402 if (dbHandle_ == nullptr) {
1403 return -E_INVALID_DB;
1404 }
1405 TableInfo newTable;
1406 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1407 if (errCode != E_OK) {
1408 LOGE("Check new schema failed. %d", errCode);
1409 }
1410 return errCode;
1411 }
1412
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1413 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1414 Timestamp &maxTimestamp) const
1415 {
1416 maxTimestamp = 0;
1417 int ret = E_OK;
1418 for (const auto &tableName : tableNames) {
1419 const std::string sql = "SELECT MAX(timestamp) FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1420 "_log;";
1421 sqlite3_stmt *stmt = nullptr;
1422 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1423 if (errCode != E_OK) {
1424 return errCode;
1425 }
1426 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1427 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1428 maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1429 errCode = E_OK;
1430 }
1431 SQLiteUtils::ResetStatement(stmt, true, ret);
1432 if (errCode != E_OK) {
1433 maxTimestamp = 0;
1434 return errCode;
1435 }
1436 }
1437 return E_OK;
1438 }
1439
SetLogTriggerStatus(bool status)1440 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1441 {
1442 return SQLiteRelationalUtils::SetLogTriggerStatus(dbHandle_, status);
1443 }
1444
1445 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1446 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1447 std::vector<RelationalRowData *> &data)
1448 {
1449 size_t totalLength = 0;
1450 do {
1451 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1452 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1453 return E_OK;
1454 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1455 LOGE("Get data by bind sql failed:%d", errCode);
1456 return errCode;
1457 }
1458
1459 if (colNames.empty()) {
1460 SQLiteUtils::GetSelectCols(stmt, colNames); // Get column names.
1461 }
1462 auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1463 if (relaRowData == nullptr) {
1464 LOGE("ExecuteQueryBySqlStmt OOM");
1465 return -E_OUT_OF_MEMORY;
1466 }
1467
1468 auto dataSz = relaRowData->CalcLength();
1469 if (dataSz == 0) { // invalid data
1470 delete relaRowData;
1471 relaRowData = nullptr;
1472 continue;
1473 }
1474
1475 totalLength += static_cast<size_t>(dataSz);
1476 if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) { // the set has been full
1477 delete relaRowData;
1478 relaRowData = nullptr;
1479 LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1480 return -E_REMOTE_OVER_SIZE;
1481 }
1482 data.push_back(relaRowData);
1483 } while (true);
1484 return E_OK;
1485 }
1486 }
1487
1488 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1489 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1490 const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1491 std::vector<RelationalRowData *> &data)
1492 {
1493 int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1494 if (errCode != E_OK) {
1495 return errCode;
1496 }
1497
1498 sqlite3_stmt *stmt = nullptr;
1499 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1500 if (errCode != E_OK) {
1501 (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1502 return errCode;
1503 }
1504 ResFinalizer finalizer([this, &stmt, &errCode] {
1505 (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1506 SQLiteUtils::ResetStatement(stmt, true, errCode);
1507 });
1508 for (size_t i = 0; i < bindArgs.size(); ++i) {
1509 errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1510 if (errCode != E_OK) {
1511 return errCode;
1512 }
1513 }
1514 return GetRowDatas(stmt, isMemDb_, colNames, data);
1515 }
1516
CheckEncryptedOrCorrupted() const1517 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1518 {
1519 if (dbHandle_ == nullptr) {
1520 return -E_INVALID_DB;
1521 }
1522
1523 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1524 if (errCode != E_OK) {
1525 LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1526 }
1527 return errCode;
1528 }
1529
GetExistsDeviceList(std::set<std::string> & devices) const1530 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1531 {
1532 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1533 isMemDb_, devices);
1534 }
1535
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1536 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1537 const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1538 bool isCompensatedTask, std::vector<std::string> &cloudGid)
1539 {
1540 sqlite3_stmt *queryStmt = nullptr;
1541 int errCode = E_OK;
1542 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1543 if (errCode != E_OK) {
1544 return errCode;
1545 }
1546 std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1547 isCompensatedTask);
1548 errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, queryStmt);
1549 if (errCode != E_OK) {
1550 return errCode;
1551 }
1552 do {
1553 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1554 if (errCode != E_OK) {
1555 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1556 break;
1557 }
1558 GetCloudGid(queryStmt, cloudGid);
1559 } while (errCode == E_OK);
1560 int resetStatementErrCode = E_OK;
1561 SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1562 queryStmt = nullptr;
1563 return (errCode == E_OK ? resetStatementErrCode : errCode);
1564 }
1565
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1566 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1567 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1568 {
1569 VBucket log;
1570 VBucket extraLog;
1571 uint32_t preSize = totalSize;
1572 int64_t revisedTime = 0;
1573 int64_t invalidTime = 0;
1574 GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
1575 GetCloudExtraLog(statement, extraLog);
1576 if (revisedTime != 0) {
1577 Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
1578 cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
1579 }
1580
1581 VBucket data;
1582 int64_t flag = 0;
1583 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1584 if (errCode != E_OK) {
1585 return errCode;
1586 }
1587 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1588 for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1589 Type cloudValue;
1590 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1591 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1592 if (errCode != E_OK) {
1593 return errCode;
1594 }
1595 SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1596 errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1597 if (errCode != E_OK) {
1598 return errCode;
1599 }
1600 }
1601 }
1602
1603 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1604 errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1605 } else {
1606 errCode = -E_UNFINISHED;
1607 }
1608 if (errCode == -E_IGNORE_DATA) {
1609 errCode = E_OK;
1610 totalSize = preSize;
1611 stepNum--;
1612 }
1613 return errCode;
1614 }
1615
SetLocalSchema(const RelationalSchemaObject & localSchema)1616 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1617 {
1618 localSchema_ = localSchema;
1619 }
1620
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1621 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1622 {
1623 std::string setFlag;
1624 if (mode == FLAG_ONLY && isLogicDelete_) {
1625 setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1626 } else {
1627 setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1628 }
1629 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1630 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1631 SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1632 CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1633 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1634 if (errCode != E_OK) {
1635 LOGE("clean cloud log failed, %d", errCode);
1636 return errCode;
1637 }
1638 cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1639 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1640 if (errCode != E_OK) {
1641 LOGE("delete cloud log failed, %d", errCode);
1642 return errCode;
1643 }
1644 // set all flag logout and data upload is not finished.
1645 cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1646 if (mode == FLAG_ONLY) {
1647 cleanLogSql += " = flag | 0x800 & ~0x400;";
1648 } else {
1649 cleanLogSql += " = flag & ~0x400;";
1650 }
1651 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1652 }
1653
ClearVersionOnLogTable(const std::string & logTableName)1654 int SQLiteSingleVerRelationalStorageExecutor::ClearVersionOnLogTable(const std::string &logTableName)
1655 {
1656 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + VERSION + " = '';";
1657 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1658 }
1659
CleanUploadFinishedFlag(const std::string & tableName)1660 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1661 {
1662 // unset upload finished flag
1663 std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1664 CloudDbConstant::FLAG + " = flag & ~0x400;";
1665 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1666 }
1667
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1668 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1669 const std::string &logTableName, const RelationalSchemaObject &localSchema)
1670 {
1671 std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1672 " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1673 ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1674 "));";
1675 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1676 if (errCode != E_OK) {
1677 LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1678 return errCode;
1679 }
1680 std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1681 FLAG_IS_CLOUD_CONSISTENCY + ";";
1682 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1683 if (errCode != E_OK) {
1684 LOGE("Failed to delete cloud data on log table, %d.", errCode);
1685 return errCode;
1686 }
1687 errCode = DoCleanAssetId(tableName, localSchema);
1688 if (errCode != E_OK) {
1689 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1690 return errCode;
1691 }
1692 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1693 if (errCode != E_OK) {
1694 LOGE("Failed to clean gid on log table, %d.", errCode);
1695 }
1696 return errCode;
1697 }
1698
GetFlagIsLocalCount(const std::string & logTableName,int32_t & count)1699 int SQLiteSingleVerRelationalStorageExecutor::GetFlagIsLocalCount(const std::string &logTableName, int32_t &count)
1700 {
1701 std::string sql = "SELECT count(*) from " + logTableName + " WHERE FLAG&0x02 = 0x02;";
1702 int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
1703 if (errCode != E_OK) {
1704 LOGW("[RDBExecutor] get mark local count from log table failed: %d", errCode);
1705 }
1706 return errCode;
1707 }
1708
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1709 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1710 {
1711 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1712 SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1713 CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1714 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1715 if (errCode != E_OK) {
1716 LOGE("change cloud log flag failed, %d", errCode);
1717 }
1718 return errCode;
1719 }
1720
SetDataOnUserTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1721 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTableWithLogicDelete(const std::string &tableName,
1722 const std::string &logTableName)
1723 {
1724 UpdateCursorContext context;
1725 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1726 LOGI("removeData on userTable:%s length:%zu start and cursor is %llu.",
1727 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1728 errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1729 if (errCode != E_OK) {
1730 LOGE("Failed to create updateCursor func on userTable errCode=%d.", errCode);
1731 return errCode;
1732 }
1733 // data from cloud and not modified by local or consistency with cloud to flag logout
1734 std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_WHEN_LOGOUT +
1735 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1736 SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1737 " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD_CONSISTENCY + " OR " +
1738 FLAG_IS_CLOUD + ") AND NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE
1739 + "));";
1740 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1741 // here just clear updateCursor func, fail will not influence other function
1742 (void)CreateFuncUpdateCursor(context, nullptr);
1743 if (errCode != E_OK) {
1744 LOGE("Failed to change cloud data flag on userTable, %d.", errCode);
1745 return errCode;
1746 }
1747 // clear some column when data is logicDelete or physical delete
1748 sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1749 " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1750 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1751 if (errCode != E_OK) {
1752 LOGE("Failed to deal logic delete data flag on userTable, %d.", errCode);
1753 return errCode;
1754 }
1755 LOGI("removeData on userTable:%s length:%zu finish and cursor is %llu.",
1756 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1757 errCode = SetCursor(tableName, context.cursor);
1758 if (errCode != E_OK) {
1759 LOGE("set new cursor after removeData error %d.", errCode);
1760 return errCode;
1761 }
1762 return ChangeCloudDataFlagOnLogTable(logTableName);
1763 }
1764
SetDataOnShareTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1765 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnShareTableWithLogicDelete(const std::string &tableName,
1766 const std::string &logTableName)
1767 {
1768 UpdateCursorContext context;
1769 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1770 LOGI("removeData on shareTable:%s length:%zu start and cursor is %llu.",
1771 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1772 errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1773 if (errCode != E_OK) {
1774 LOGE("Failed to create updateCursor func on shareTable errCode=%d.", errCode);
1775 return errCode;
1776 }
1777 std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " +
1778 SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1779 CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1780 " WHERE (NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE + "));";
1781 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1782 // here just clear updateCursor func, fail will not influence other function
1783 (void)CreateFuncUpdateCursor(context, nullptr);
1784 if (errCode != E_OK) {
1785 LOGE("Failed to change cloud data flag on shareTable, %d.", errCode);
1786 return errCode;
1787 }
1788 // clear some column when data is logicDelete or physical delete
1789 sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1790 " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1791 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1792 if (errCode != E_OK) {
1793 LOGE("Failed to deal logic delete data flag on shareTable, %d.", errCode);
1794 return errCode;
1795 }
1796 LOGI("removeData on shareTable:%s length:%zu finish and cursor is %llu.",
1797 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1798 return SetCursor(tableName, context.cursor);
1799 }
1800
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1801 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1802 std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1803 {
1804 sqlite3_stmt *selectStmt = nullptr;
1805 std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1806 " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1807 if (distinguishCloudFlag) {
1808 sql += " AND (";
1809 sql += FLAG_IS_CLOUD;
1810 sql += " OR ";
1811 sql += FLAG_IS_CLOUD_CONSISTENCY;
1812 sql += " )";
1813 }
1814 sql += ";";
1815 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1816 if (errCode != E_OK) {
1817 LOGE("Get select data_key statement failed, %d", errCode);
1818 return errCode;
1819 }
1820 do {
1821 errCode = SQLiteUtils::StepWithRetry(selectStmt);
1822 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1823 dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1824 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1825 LOGE("SQLite step failed when query log's data_key : %d", errCode);
1826 break;
1827 }
1828 } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1829 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1830 return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1831 }
1832
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1833 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1834 const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1835 {
1836 std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1837 if (opType == OpType::ONLY_UPDATE_GID) {
1838 updateLogSql += "cloud_gid = ?";
1839 updateColName.push_back(CloudDbConstant::GID_FIELD);
1840 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1841 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1842 updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1843 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1844 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1845 updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1846 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1847 } else if (opType == OpType::UPDATE_TIMESTAMP) {
1848 updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1849 ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1850 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1851 } else if (opType == OpType::CLEAR_GID) {
1852 updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1853 std::to_string(SET_FLAG_ZERO_MASK);
1854 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1855 updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1856 updateColName.push_back(CloudDbConstant::GID_FIELD);
1857 updateLogSql += ", version = ?";
1858 updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1859 } else {
1860 updateLogSql += " device = 'cloud', timestamp = ?,";
1861 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1862 if (opType == OpType::DELETE) {
1863 int errCode = GetCloudDeleteSql(tableSchema.name, updateLogSql);
1864 if (errCode != E_OK) {
1865 return errCode;
1866 }
1867 } else {
1868 updateLogSql += GetUpdateDataFlagSql(vBucket) + ", cloud_gid = ?";
1869 updateColName.push_back(CloudDbConstant::GID_FIELD);
1870 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1871 }
1872 }
1873
1874 int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1875 if (errCode != E_OK) {
1876 return errCode;
1877 }
1878
1879 errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1880 if (errCode != E_OK) {
1881 LOGE("Get update log statement failed when update cloud data, %d", errCode);
1882 }
1883 return errCode;
1884 }
1885
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)1886 int SQLiteSingleVerRelationalStorageExecutor::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
1887 LockStatus &status)
1888 {
1889 std::string sql = "select status from " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = '" +
1890 gid + "';";
1891 sqlite3_stmt *stmt = nullptr;
1892 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1893 if (errCode != E_OK) {
1894 LOGE("Get stmt failed when get lock status: %d", errCode);
1895 return errCode;
1896 }
1897 errCode = SQLiteUtils::StepWithRetry(stmt);
1898 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1899 status = static_cast<LockStatus>(sqlite3_column_int(stmt, 0));
1900 errCode = E_OK;
1901 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1902 LOGE("Not found lock status.");
1903 errCode = -E_NOT_FOUND;
1904 } else {
1905 LOGE("Get lock status failed: %d", errCode);
1906 }
1907 int resetRet;
1908 SQLiteUtils::ResetStatement(stmt, true, resetRet);
1909 return errCode;
1910 }
1911
UpdateHashKey(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1912 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKey(DistributedTableMode mode, const TableInfo &tableInfo,
1913 TableSyncType syncType, const std::string &localIdentity)
1914 {
1915 if (tableInfo.GetIdentifyKey().size() == 1u && tableInfo.GetIdentifyKey().at(0) == "rowid") {
1916 return UpdateHashKeyWithOutPk(mode, tableInfo, syncType, localIdentity);
1917 }
1918 auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1919 auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
1920 std::string sql = "UPDATE OR REPLACE " + logName + " SET hash_key = hash_key || '_old' where data_key in " +
1921 "(select _rowid_ from '" + tableInfo.GetTableName() + "') AND device = '';";
1922 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1923 if (errCode != E_OK) {
1924 return errCode;
1925 }
1926 sql = "UPDATE OR REPLACE " + logName + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
1927 tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, "") + " AS hash_value " +
1928 "FROM '" + tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + logName +
1929 ".data_key AND device = '';";
1930 return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1931 }
1932
UpdateHashKeyWithOutPk(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1933 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKeyWithOutPk(
1934 DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity)
1935 {
1936 auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1937 auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
1938 std::string extendStr = "_tmp";
1939 std::string tmpTable = logName + extendStr;
1940 std::string sql = tableManager->GetCreateRelationalLogTableSql(tableInfo, extendStr);
1941 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1942 if (errCode != E_OK) {
1943 return errCode;
1944 }
1945 sql = "INSERT OR REPLACE INTO " + tmpTable + " SELECT * FROM " + logName + " WHERE data_key != -1;";
1946 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1947 if (errCode != E_OK) {
1948 return errCode;
1949 }
1950
1951 sql = "UPDATE " + logName + " SET flag = flag | 0x01, data_key = -1 WHERE data_key IN " + "(SELECT _rowid_ FROM '" +
1952 tableInfo.GetTableName() + "');";
1953 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1954 if (errCode != E_OK) {
1955 return errCode;
1956 }
1957
1958 sql = "UPDATE " + tmpTable + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
1959 tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, localIdentity) + " AS hash_value " + "FROM '" +
1960 tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + tmpTable +
1961 ".data_key AND device = '';";
1962 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1963 if (errCode != E_OK) {
1964 return errCode;
1965 }
1966
1967 sql = "INSERT OR REPLACE INTO " + logName + " SELECT * FROM " + tmpTable + ";";
1968 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1969 if (errCode != E_OK) {
1970 return errCode;
1971 }
1972
1973 sql = "DROP TABLE " + tmpTable + ";";
1974 return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1975 }
1976
SetTableMode(DistributedTableMode mode)1977 void SQLiteSingleVerRelationalStorageExecutor::SetTableMode(DistributedTableMode mode)
1978 {
1979 mode_ = mode;
1980 }
1981 } // namespace DistributedDB
1982 #endif
1983