1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33 #include "device_tracker_log_table_manager.h"
34
35 namespace DistributedDB {
36 namespace {
37 static constexpr const char *DATAKEY = "DATA_KEY";
38 static constexpr const char *DEVICE_FIELD = "DEVICE";
39 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
40 static constexpr const char *VERSION = "VERSION";
41 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
42 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
43 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
44 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
45 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
46 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
47 // clean 5th bit of flag to one which is wait compensated sync
48 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
49 "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
50 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
51 // set data logic delete, exist passport, delete, not compensated, cloud and consistency
52 static constexpr const char *SET_FLAG_WHEN_LOGOUT = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12) & (~0x20)";
53 // set data logic delete, exist passport, delete, not compensated and cloud
54 static constexpr const char *SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12)";
55 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
56 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
57 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
58 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
59 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
60 static constexpr const int DATA_KEY_INDEX = 0;
61 static constexpr const int TIMESTAMP_INDEX = 3;
62 static constexpr const int W_TIMESTAMP_INDEX = 4;
63 static constexpr const int FLAG_INDEX = 5;
64 static constexpr const int HASH_KEY_INDEX = 6;
65 static constexpr const int CLOUD_GID_INDEX = 7;
66 static constexpr const int VERSION_INDEX = 8;
67 static constexpr const int STATUS_INDEX = 9;
68
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)69 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
70 {
71 if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
72 return SQLITE_DENY;
73 }
74 if (b == SQLITE_FUNCTION) {
75 if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
76 LOGE("Deny fts3_tokenizer in remote query");
77 return SQLITE_DENY;
78 }
79 }
80 return SQLITE_OK;
81 }
82 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)83 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
84 DistributedTableMode mode)
85 : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
86 assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
87 maxUploadCount_(0), maxUploadSize_(0)
88 {
89 bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
90 bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
91 bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
92 bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
93 bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
94 bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
95 bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
96 }
97
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)98 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
99 {
100 std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
101 if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
102 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
103 return -E_NOT_SUPPORT;
104 }
105 std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
106 for (const auto &field : fieldInfos) {
107 if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
108 LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
109 return -E_NOT_SUPPORT;
110 }
111 }
112
113 if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
114 if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
115 LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
116 return -E_NOT_SUPPORT;
117 }
118
119 if (mode == DistributedTableMode::COLLABORATION) {
120 if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
121 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
122 return -E_NOT_SUPPORT;
123 }
124 }
125
126 if (syncType == CLOUD_COOPERATION) {
127 int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
128 if (errCode != E_OK) {
129 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
130 return errCode;
131 }
132 }
133 }
134
135 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
136 if (table.GetPrimaryKey().size() > 1) {
137 LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
138 return -E_NOT_SUPPORT;
139 }
140 }
141
142 return E_OK;
143 }
144
145 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)146 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
147 {
148 std::string sql = "SELECT get_sys_time(0) - max(" + std::string(DBConstant::SQLITE_INNER_ROWID) + ") - 1 FROM '" +
149 tableName + "';";
150 sqlite3_stmt *stmt = nullptr;
151 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
152 if (errCode != E_OK) {
153 return errCode;
154 }
155 errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
156 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
157 timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
158 errCode = E_OK;
159 }
160 SQLiteUtils::ResetStatement(stmt, true, errCode);
161 return errCode;
162 }
163
GetMetaLocalTimeOffset(sqlite3 * db,int64_t & timeOffset)164 int GetMetaLocalTimeOffset(sqlite3 *db, int64_t &timeOffset)
165 {
166 std::string sql = "SELECT value FROM " + DBCommon::GetMetaTableName() + " WHERE key=x'" +
167 DBCommon::TransferStringToHex(std::string(DBConstant::LOCALTIME_OFFSET_KEY)) + "';";
168 sqlite3_stmt *stmt = nullptr;
169 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
170 if (errCode != E_OK) {
171 return errCode;
172 }
173 errCode = SQLiteUtils::StepWithRetry(stmt);
174 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
175 timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
176 if (timeOffset < 0) {
177 LOGE("TimeOffset %" PRId64 "is invalid.", timeOffset);
178 SQLiteUtils::ResetStatement(stmt, true, errCode);
179 return -E_INTERNAL_ERROR;
180 }
181 errCode = E_OK;
182 }
183 SQLiteUtils::ResetStatement(stmt, true, errCode);
184 return errCode;
185 }
186 }
187
GetExtendValue(const TrackerTable & trackerTable)188 std::string GetExtendValue(const TrackerTable &trackerTable)
189 {
190 std::string extendValue;
191 const std::set<std::string> &extendNames = trackerTable.GetExtendNames();
192 if (!extendNames.empty()) {
193 extendValue += "json_object(";
194 for (const auto &extendName : extendNames) {
195 extendValue += "'" + extendName + "'," + extendName + ",";
196 }
197 extendValue.pop_back();
198 extendValue += ")";
199 } else {
200 extendValue = "''";
201 }
202 return extendValue;
203 }
204
GeneLogInfoForExistedDataInner(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)205 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedDataInner(sqlite3 *db, const std::string &identity,
206 const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
207 {
208 std::string tableName = tableInfo.GetTableName();
209 int64_t timeOffset = 0;
210 int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
211 if (errCode != E_OK) {
212 return errCode;
213 }
214 errCode = SetLogTriggerStatus(false);
215 if (errCode != E_OK) {
216 return errCode;
217 }
218 std::string timeOffsetStr = std::to_string(timeOffset);
219 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
220 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
221 std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
222 static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
223 TrackerTable trackerTable = tableInfo.GetTrackerTable();
224 trackerTable.SetTableName(tableName);
225 std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
226 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid +
227 ", '', '', " + timeOffsetStr + " + " + rowid + ", " +
228 timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', ";
229 sql += GetExtendValue(tableInfo.GetTrackerTable());
230 sql += ", 0, '', '', 0 FROM '" + tableName + "' AS a ";
231 if (isTrackerTable) {
232 sql += " WHERE 1 = 1;";
233 } else {
234 sql += "WHERE NOT EXISTS (SELECT 1 FROM " + logTable + " WHERE data_key = a._rowid_);";
235 }
236 errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
237 int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
238 if (ret != E_OK) {
239 LOGE("Failed to initialize cloud type log data.%d", ret);
240 }
241 return ret;
242 });
243 return errCode;
244 }
245
GeneLogInfoForExistedData(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)246 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &identity,
247 const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
248 {
249 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
250 return UpdateTrackerTable(db, identity, tableInfo, logMgrPtr, false);
251 }
252 return GeneLogInfoForExistedDataInner(db, identity, tableInfo, logMgrPtr, isTrackerTable);
253 }
254
ResetLogStatus(std::string & tableName)255 int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(std::string &tableName)
256 {
257 int errCode = SetLogTriggerStatus(false);
258 if (errCode != E_OK) {
259 LOGE("Fail to set log trigger on when reset log status, %d", errCode);
260 return errCode;
261 }
262 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
263 std::string sql = "UPDATE " + logTable + " SET status = 0;";
264 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
265 if (errCode != E_OK) {
266 LOGE("Failed to initialize cloud type log data.%d", errCode);
267 }
268 return errCode;
269 }
270
UpdateTrackerTable(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTimestampOnly)271 int SQLiteSingleVerRelationalStorageExecutor::UpdateTrackerTable(sqlite3 *db, const std::string &identity,
272 const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTimestampOnly)
273 {
274 int64_t errCode = SetLogTriggerStatus(false);
275 if (errCode != E_OK) {
276 return errCode;
277 }
278 int64_t localTimeOffset = 0;
279 errCode = GetMetaLocalTimeOffset(db, localTimeOffset);
280 if (errCode != E_OK) {
281 LOGE("Failed to get local timeOffset.%d", errCode);
282 return errCode;
283 }
284 std::string tableName = tableInfo.GetTableName();
285 std::string logTable = DBCommon::GetLogTableName(tableName);
286 Timestamp currentSysTime = TimeHelper::GetSysCurrentTime();
287 Timestamp currentLocalTime = currentSysTime + static_cast<uint64_t>(localTimeOffset);
288 std::string currentLocalTimeStr = std::to_string(currentLocalTime);
289 if (isTimestampOnly) {
290 std::string sql = "update " + logTable + " set timestamp = " + currentLocalTimeStr +
291 " + data_key, wtimestamp = " + currentLocalTimeStr + " + data_key where data_key != -1;";
292 errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
293 if (errCode != E_OK) {
294 LOGE("Failed to initialize device type log data.%d", errCode);
295 }
296 return errCode;
297 }
298 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
299 std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
300 static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
301 std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
302 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + currentLocalTimeStr +
303 " + " + rowid + ", " + currentLocalTimeStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash +
304 ", '', '', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
305 errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
306 if (errCode != E_OK) {
307 LOGE("Failed to initialize device type log data.%d", errCode);
308 }
309 return errCode;
310 }
311
CreateRelationalLogTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)312 int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded,
313 const std::string &identity, TableInfo &table)
314 {
315 // create log table
316 std::unique_ptr<SqliteLogTableManager> tableManager;
317 if (!table.GetTrackerTable().IsEmpty() && table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
318 tableManager = std::make_unique<DeviceTrackerLogTableManager>();
319 } else {
320 tableManager = LogTableManagerFactory::GetTableManager(mode, table.GetTableSyncType());
321 }
322 if (tableManager == nullptr) {
323 LOGE("[CreateRelationalLogTable] get table manager failed");
324 return -E_INVALID_DB;
325 }
326 int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
327 if (errCode != E_OK) {
328 LOGE("[CreateDistributedTable] create log table failed");
329 return errCode;
330 }
331 std::string tableName = table.GetTableName();
332 if (!isUpgraded) {
333 if (table.GetTrackerTable().GetTableName().empty()) {
334 errCode = GeneLogInfoForExistedData(dbHandle_, identity, table, tableManager, false);
335 } else if (table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
336 // tracker table -> distributed device table
337 errCode = UpdateTrackerTable(dbHandle_, tableName, table, tableManager, true);
338 } else {
339 // tracker table -> distributed cloud table
340 errCode = ResetLogStatus(tableName);
341 }
342 } else {
343 if (table.GetTrackerTable().GetTableName().empty()) {
344 errCode = GeneLogInfoForExistedDataInner(dbHandle_, identity, table, tableManager, false);
345 }
346 }
347 if (errCode != E_OK) {
348 LOGE("[CreateDistributedTable] generate log isUpgraded %d failed %d.", static_cast<int>(isUpgraded), errCode);
349 return errCode;
350 }
351
352 // add trigger
353 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
354 if (errCode != E_OK) {
355 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
356 return errCode;
357 }
358 return SetLogTriggerStatus(true);
359 }
360
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)361 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
362 const std::string &identity, TableInfo &table)
363 {
364 if (dbHandle_ == nullptr) {
365 return -E_INVALID_DB;
366 }
367
368 const std::string tableName = table.GetTableName();
369 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
370 if (errCode != E_OK) {
371 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
372 return errCode;
373 }
374
375 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
376 bool isEmpty = false;
377 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
378 if (errCode != E_OK) {
379 LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
380 return -E_NOT_SUPPORT;
381 }
382 if (!isEmpty) {
383 LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
384 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
385 static_cast<int>(table.GetTableSyncType()));
386 }
387 }
388
389 errCode = CheckTableConstraint(table, mode, table.GetTableSyncType());
390 if (errCode != E_OK) {
391 LOGE("[CreateDistributedTable] check table constraint failed.");
392 return errCode;
393 }
394
395 return CreateRelationalLogTable(mode, isUpgraded, identity, table);
396 }
397
CompareSchemaTableColumns(const std::string & tableName)398 int SQLiteSingleVerRelationalStorageExecutor::CompareSchemaTableColumns(const std::string &tableName)
399 {
400 bool onceDropped = false;
401 int errCode = IsTableOnceDropped(tableName, onceDropped);
402 if (!onceDropped) {
403 // do not return error code to make sure main procedure will continue
404 return E_OK;
405 }
406 LOGI("[CompareSchemaTableColumns] table once dropped check schema. table name is %s length is %zu",
407 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
408 TableInfo newTableInfo;
409 errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
410 if (errCode != E_OK) {
411 LOGE("[CompareSchemaTableColumns] analysis table schema failed. %d", errCode);
412 return errCode;
413 }
414 // new table should has same or compatible upgrade
415 TableInfo tableInfo = localSchema_.GetTable(tableName);
416 errCode = tableInfo.CompareWithTable(newTableInfo, localSchema_.GetSchemaVersion());
417 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
418 LOGE("[CompareSchemaTableColumns] Not support with incompatible table.");
419 return -E_SCHEMA_MISMATCH;
420 }
421 return errCode;
422 }
423
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)424 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
425 DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
426 {
427 if (dbHandle_ == nullptr) {
428 return -E_INVALID_DB;
429 }
430 TableInfo newTableInfo;
431 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
432 if (errCode != E_OK) {
433 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
434 return errCode;
435 }
436
437 if (CheckTableConstraint(newTableInfo, mode, syncType)) {
438 LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
439 return -E_NOT_SUPPORT;
440 }
441
442 // new table should has same or compatible upgrade
443 TableInfo tableInfo = schema.GetTable(tableName);
444 errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
445 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
446 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
447 return -E_SCHEMA_MISMATCH;
448 } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
449 LOGD("[UpgradeDistributedTable] schema has not changed.");
450 // update table if tableName changed
451 schema.RemoveRelationalTable(tableName);
452 tableInfo.SetTableName(tableName);
453 schema.AddRelationalTable(tableInfo);
454 return E_OK;
455 }
456
457 schemaChanged = true;
458 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
459 if (errCode != E_OK) {
460 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
461 }
462
463 schema.AddRelationalTable(newTableInfo);
464 return errCode;
465 }
466
467 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)468 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
469 std::vector<std::string> &deviceTables)
470 {
471 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
472 return -E_INVALID_ARGS;
473 }
474 std::string devicePattern = device.empty() ? "%" : device;
475 std::string tablePattern = tableName.empty() ? "%" : tableName;
476 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
477
478 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
479 deviceTableName + "';";
480 sqlite3_stmt *stmt = nullptr;
481 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
482 if (errCode != E_OK) {
483 return errCode;
484 }
485
486 do {
487 errCode = SQLiteUtils::StepWithRetry(stmt, false);
488 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
489 errCode = E_OK;
490 break;
491 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
492 LOGE("Get table name failed. %d", errCode);
493 break;
494 }
495 std::string realTableName;
496 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
497 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
498 continue;
499 }
500 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
501 continue;
502 }
503 deviceTables.emplace_back(realTableName);
504 } while (true);
505
506 SQLiteUtils::ResetStatement(stmt, true, errCode);
507 return errCode;
508 }
509
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)510 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
511 {
512 std::vector<FieldInfo> fields;
513 auto itOld = oldTableInfo.GetFields().begin();
514 auto itNew = newTableInfo.GetFields().begin();
515 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
516 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
517 fields.emplace_back(itNew->second);
518 continue;
519 }
520 itOld++;
521 }
522 return fields;
523 }
524
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)525 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
526 {
527 if (db == nullptr) {
528 return -E_INVALID_ARGS;
529 }
530
531 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
532 return a.GetColumnId()< b.GetColumnId();
533 });
534 int errCode = E_OK;
535 for (const auto &table : tables) {
536 for (const auto &field : fields) {
537 std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
538 alterSql += "'" + field.GetDataType() + "'";
539 alterSql += field.IsNotNull() ? " NOT NULL" : "";
540 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
541 alterSql += ";";
542 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
543 if (errCode != E_OK) {
544 LOGE("Alter table failed. %d", errCode);
545 break;
546 }
547 }
548 }
549 return errCode;
550 }
551
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)552 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
553 {
554 IndexInfoMap indexes;
555 auto itOld = oldTableInfo.GetIndexDefine().begin();
556 auto itNew = newTableInfo.GetIndexDefine().begin();
557 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
558 auto itNewEnd = newTableInfo.GetIndexDefine().end();
559
560 while (itOld != itOldEnd && itNew != itNewEnd) {
561 if (itOld->first == itNew->first) {
562 if (itOld->second != itNew->second) {
563 indexes.insert({itNew->first, itNew->second});
564 }
565 itOld++;
566 itNew++;
567 } else if (itOld->first < itNew->first) {
568 indexes.insert({itOld->first, {}});
569 itOld++;
570 } else {
571 indexes.insert({itNew->first, itNew->second});
572 itNew++;
573 }
574 }
575
576 while (itOld != itOldEnd) {
577 indexes.insert({itOld->first, {}});
578 itOld++;
579 }
580
581 while (itNew != itNewEnd) {
582 indexes.insert({itNew->first, itNew->second});
583 itNew++;
584 }
585
586 return indexes;
587 }
588
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)589 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
590 {
591 if (db == nullptr) {
592 return -E_INVALID_ARGS;
593 }
594
595 int errCode = E_OK;
596 for (const auto &table : tables) {
597 for (const auto &index : indexes) {
598 if (index.first.empty()) {
599 continue;
600 }
601 std::string realIndexName = table + "_" + index.first;
602 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
603 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
604 if (errCode != E_OK) {
605 LOGE("Drop index failed. %d", errCode);
606 return errCode;
607 }
608
609 if (index.second.empty()) { // empty means drop index only
610 continue;
611 }
612
613 auto it = index.second.begin();
614 std::string indexDefine = *it++;
615 while (it != index.second.end()) {
616 indexDefine += ", " + *it++;
617 }
618 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
619 "(" + indexDefine + ");";
620 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
621 if (errCode != E_OK) {
622 LOGE("Create index failed. %d", errCode);
623 break;
624 }
625 }
626 }
627 return errCode;
628 }
629 }
630
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)631 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
632 const TableInfo &newTableInfo)
633 {
634 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
635 IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
636 std::vector<std::string> deviceTables;
637 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
638 if (errCode != E_OK) {
639 LOGE("Get device table name for alter table failed. %d", errCode);
640 return errCode;
641 }
642
643 LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
644 upgradeIndexes.size(), deviceTables.size());
645 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
646 if (errCode != E_OK) {
647 LOGE("upgrade fields failed. %d", errCode);
648 return errCode;
649 }
650
651 errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
652 if (errCode != E_OK) {
653 LOGE("upgrade indexes failed. %d", errCode);
654 }
655
656 return errCode;
657 }
658
StartTransaction(TransactType type)659 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
660 {
661 if (dbHandle_ == nullptr) {
662 LOGE("Begin transaction failed, dbHandle is null.");
663 return -E_INVALID_DB;
664 }
665 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
666 if (errCode != E_OK) {
667 LOGE("Begin transaction failed, errCode = %d", errCode);
668 }
669 return errCode;
670 }
671
Commit()672 int SQLiteSingleVerRelationalStorageExecutor::Commit()
673 {
674 if (dbHandle_ == nullptr) {
675 return -E_INVALID_DB;
676 }
677
678 return SQLiteUtils::CommitTransaction(dbHandle_);
679 }
680
Rollback()681 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
682 {
683 if (dbHandle_ == nullptr) {
684 return -E_INVALID_DB;
685 }
686 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
687 if (errCode != E_OK) {
688 LOGE("sqlite single ver relation storage executor rollback fail! errCode = [%d]", errCode);
689 }
690 return errCode;
691 }
692
SetTableInfo(const TableInfo & tableInfo)693 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
694 {
695 table_ = tableInfo;
696 }
697
698 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)699 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize,
700 int64_t &revisedTime, int64_t &invalidTime)
701 {
702 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
703 uint64_t curTime = 0;
704 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
705 if (modifyTime > static_cast<int64_t>(curTime)) {
706 invalidTime = modifyTime;
707 modifyTime = static_cast<int64_t>(curTime);
708 revisedTime = modifyTime;
709 }
710 } else {
711 LOGW("[Relational] get raw sys time failed.");
712 }
713 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
714 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
715 static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
716 totalSize += sizeof(int64_t) + sizeof(int64_t);
717 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
718 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
719 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
720 if (!cloudGid.empty()) {
721 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
722 totalSize += cloudGid.size();
723 }
724 }
725 if (revisedTime != 0) {
726 std::string cloudGid;
727 if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
728 cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
729 }
730 LOGW("[Relational] Found invalid mod time: %lld, curTime: %lld, gid: %s", invalidTime, revisedTime,
731 DBCommon::StringMiddleMasking(cloudGid).c_str());
732 }
733 std::string version;
734 SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
735 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
736 totalSize += version.size();
737 }
738
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)739 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
740 {
741 flags.insert_or_assign(DBConstant::ROWID,
742 static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
743 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
744 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
745 flags.insert_or_assign(CloudDbConstant::FLAG,
746 static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
747 Bytes hashKey;
748 (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
749 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
750 flags.insert_or_assign(CloudDbConstant::STATUS,
751 static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
752 }
753
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)754 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
755 {
756 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
757 return;
758 }
759 std::string gid = reinterpret_cast<const std::string::value_type *>(
760 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
761 if (gid.empty()) {
762 LOGW("[Relational] Get cloud gid is null.");
763 return;
764 }
765 cloudGid.emplace_back(gid);
766 }
767 }
768
GetDataItemSerialSize(DataItem & item,size_t appendLen)769 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
770 {
771 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
772 // the size would not be very large.
773 static const size_t maxOrigDevLength = 40;
774 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
775 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
776 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
777 return dataSize;
778 }
779
GetKvData(const Key & key,Value & value) const780 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
781 {
782 static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
783 "metadata WHERE key=?;";
784 sqlite3_stmt *statement = nullptr;
785 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
786 if (errCode != E_OK) {
787 goto END;
788 }
789
790 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
791 if (errCode != E_OK) {
792 goto END;
793 }
794
795 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
796 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
797 errCode = -E_NOT_FOUND;
798 goto END;
799 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
800 goto END;
801 }
802
803 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
804 END:
805 SQLiteUtils::ResetStatement(statement, true, errCode);
806 return errCode;
807 }
808
PutKvData(const Key & key,const Value & value) const809 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
810 {
811 static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) +
812 "metadata VALUES(?,?);";
813 sqlite3_stmt *statement = nullptr;
814 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
815 if (errCode != E_OK) {
816 return errCode;
817 }
818
819 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // 1 means key index
820 if (errCode != E_OK) {
821 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
822 goto ERROR;
823 }
824
825 errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true); // 2 means value index
826 if (errCode != E_OK) {
827 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
828 goto ERROR;
829 }
830 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
831 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
832 errCode = E_OK;
833 }
834 ERROR:
835 SQLiteUtils::ResetStatement(statement, true, errCode);
836 return errCode;
837 }
838
DeleteMetaData(const std::vector<Key> & keys) const839 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
840 {
841 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
842 "metadata WHERE key=?;";
843 sqlite3_stmt *statement = nullptr;
844 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
845 if (errCode != E_OK) {
846 return errCode;
847 }
848
849 for (const auto &key : keys) {
850 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
851 if (errCode != E_OK) {
852 break;
853 }
854
855 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
856 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
857 break;
858 }
859 errCode = E_OK;
860 SQLiteUtils::ResetStatement(statement, false, errCode);
861 }
862 SQLiteUtils::ResetStatement(statement, true, errCode);
863 return CheckCorruptedStatus(errCode);
864 }
865
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const866 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
867 {
868 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " +
869 std::string(DBConstant::RELATIONAL_PREFIX) + "metadata WHERE key>=? AND key<=?;";
870 sqlite3_stmt *statement = nullptr;
871 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
872 if (errCode != E_OK) {
873 return errCode;
874 }
875
876 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
877 if (errCode == E_OK) {
878 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
879 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
880 errCode = E_OK;
881 }
882 }
883 SQLiteUtils::ResetStatement(statement, true, errCode);
884 return CheckCorruptedStatus(errCode);
885 }
886
GetAllMetaKeys(std::vector<Key> & keys) const887 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
888 {
889 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
890 "metadata;";
891 sqlite3_stmt *statement = nullptr;
892 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
893 if (errCode != E_OK) {
894 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
895 return errCode;
896 }
897 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
898 SQLiteUtils::ResetStatement(statement, true, errCode);
899 return errCode;
900 }
901
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)902 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
903 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
904 {
905 if (stmt == nullptr) {
906 int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
907 if (errCode != E_OK) {
908 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
909 return errCode;
910 }
911 }
912
913 int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
914 if (errCode != E_OK) {
915 return errCode;
916 }
917 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
918 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
919 errCode = E_OK;
920 }
921 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
922 return errCode;
923 }
924
SaveSyncDataItem(const DataItem & dataItem,bool isUpdate,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)925 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, bool isUpdate,
926 SaveSyncDataStmt &saveStmt, RelationalSyncDataInserter &inserter, int64_t &rowid)
927 {
928 auto &data = inserter.GetChangedData();
929 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
930 std::vector<Type> primaryValues;
931 primaryValues.push_back(rowid);
932 data.primaryData[ChangeType::OP_DELETE].push_back(primaryValues);
933 rowid = -1;
934 return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
935 }
936 // we don't know the rowid if user drop device table
937 // SPLIT_BY_DEVICE use insert or replace to update data
938 // no pk table should delete by hash key(rowid) first
939 if ((mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().IsNoPkTable())) {
940 int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
941 if (errCode != E_OK) {
942 LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
943 return errCode;
944 }
945 std::vector<Type> primaryValues;
946 primaryValues.push_back(rowid);
947 data.primaryData[ChangeType::OP_DELETE].push_back(primaryValues);
948 }
949 std::map<std::string, Type> pkVals;
950 int errCode = inserter.SaveData(isUpdate, dataItem, saveStmt, pkVals);
951 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
952 if (!isUpdate) {
953 rowid = SQLiteUtils::GetLastRowId(dbHandle_);
954 }
955 std::vector<Type> primaryValues;
956 std::vector<std::string> primaryKeys;
957 if (inserter.GetLocalTable().IsMultiPkTable() || inserter.GetLocalTable().IsNoPkTable()) {
958 primaryKeys.push_back(DBConstant::ROWID);
959 primaryValues.push_back(rowid);
960 }
961 for (const auto &pkVal : pkVals) {
962 primaryKeys.push_back(pkVal.first);
963 primaryValues.push_back(pkVal.second);
964 }
965 ChangeType type = isUpdate ? ChangeType::OP_UPDATE : ChangeType::OP_INSERT;
966 data.primaryData[type].push_back(primaryValues);
967 data.field = primaryKeys;
968 errCode = E_OK;
969 }
970 return errCode;
971 }
972
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)973 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
974 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
975 {
976 if (stmt == nullptr) {
977 int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
978 if (errCode != E_OK) {
979 LOGE("[DeleteSyncLog] Get statement fail!");
980 return errCode;
981 }
982 }
983
984 int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
985 if (errCode != E_OK) {
986 return errCode;
987 }
988 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
989 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
990 errCode = E_OK;
991 }
992 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
993 return errCode;
994 }
995
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)996 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
997 RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
998 {
999 int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
1000 if (errCode != E_OK) {
1001 return errCode;
1002 }
1003 return DeleteSyncLog(item, inserter, rmLogStmt);
1004 }
1005
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated,bool & isExist,int64_t & rowId)1006 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
1007 sqlite3_stmt *queryStmt, bool &isDefeated, bool &isExist, int64_t &rowId)
1008 {
1009 LogInfo logInfoGet;
1010 int errCode = SQLiteRelationalUtils::GetLogInfoPre(queryStmt, mode_, dataItem, logInfoGet);
1011 int ret = E_OK;
1012 SQLiteUtils::ResetStatement(queryStmt, false, ret);
1013 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1014 LOGE("Failed to get raw data. %d", errCode);
1015 return errCode;
1016 }
1017 rowId = logInfoGet.dataKey;
1018 isExist = (errCode != -E_NOT_FOUND) && ((logInfoGet.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) == 0);
1019 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
1020 mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
1021 isDefeated = false; // no need to solve conflict except miss query data
1022 return E_OK;
1023 }
1024 if (!isExist || dataItem.dev != logInfoGet.device) {
1025 // defeated if item timestamp is earlier than raw data
1026 isDefeated = (dataItem.timestamp <= logInfoGet.timestamp);
1027 }
1028 return E_OK;
1029 }
1030
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)1031 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
1032 SaveSyncDataStmt &saveStmt, DataItem &item)
1033 {
1034 bool isDefeated = false;
1035 bool isExist = false;
1036 int64_t rowid = -1;
1037 int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated, isExist, rowid);
1038 if (errCode != E_OK) {
1039 LOGE("check data conflict failed. %d", errCode);
1040 return errCode;
1041 }
1042
1043 if (isDefeated) {
1044 LOGD("Data was defeated.");
1045 return E_OK;
1046 }
1047 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
1048 return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
1049 }
1050 bool isUpdate = isExist && mode_ == DistributedTableMode::COLLABORATION;
1051 errCode = SaveSyncDataItem(item, isUpdate, saveStmt, inserter, rowid);
1052 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
1053 errCode = inserter.SaveSyncLog(dbHandle_, saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
1054 }
1055 return errCode;
1056 }
1057
SaveSyncDataItems(RelationalSyncDataInserter & inserter)1058 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
1059 {
1060 SaveSyncDataStmt saveStmt;
1061 int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
1062 if (errCode != E_OK) {
1063 LOGE("Prepare insert sync data statement failed.");
1064 return errCode;
1065 }
1066
1067 errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
1068 if (item.neglect) { // Do not save this record if it is neglected
1069 return E_OK;
1070 }
1071 int errCode = SaveSyncDataItem(inserter, saveStmt, item);
1072 if (errCode != E_OK) {
1073 LOGE("save sync data item failed. err=%d", errCode);
1074 return errCode;
1075 }
1076 // Need not reset rmDataStmt and rmLogStmt here.
1077 return saveStmt.ResetStatements(false);
1078 });
1079
1080 int ret = saveStmt.ResetStatements(true);
1081 return errCode != E_OK ? errCode : ret;
1082 }
1083
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)1084 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
1085 {
1086 if (useTrans) {
1087 int errCode = StartTransaction(TransactType::IMMEDIATE);
1088 if (errCode != E_OK) {
1089 return errCode;
1090 }
1091 }
1092
1093 int errCode = SaveSyncDataItems(inserter);
1094 if (errCode != E_OK) {
1095 LOGE("Save sync data items failed. errCode=%d", errCode);
1096 goto END;
1097 }
1098 END:
1099 if (useTrans) {
1100 if (errCode == E_OK) {
1101 errCode = Commit();
1102 } else {
1103 (void)Rollback(); // Keep the error code of the first scene
1104 }
1105 }
1106 return errCode;
1107 }
1108
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1109 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1110 bool isGettingDeletedData) const
1111 {
1112 RowDataWithLog data;
1113 int errCode = SQLiteRelationalUtils::GetLogData(stmt, data.logInfo);
1114 if (errCode != E_OK) {
1115 LOGE("relational data value transfer to kv fail");
1116 return errCode;
1117 }
1118 std::vector<FieldInfo> serializeFields;
1119 if (!isGettingDeletedData) {
1120 auto fields = table_.GetFieldInfos();
1121 for (size_t cid = 0; cid < fields.size(); ++cid) {
1122 if (localSchema_.IsNeedSkipSyncField(fields[cid], table_.GetTableName())) {
1123 continue;
1124 }
1125 DataValue value;
1126 errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1127 value);
1128 if (errCode != E_OK) {
1129 return errCode;
1130 }
1131 data.rowData.push_back(std::move(value)); // sorted by cid
1132 serializeFields.push_back(fields[cid]);
1133 }
1134 }
1135
1136 errCode = DataTransformer::SerializeDataItem(data,
1137 isGettingDeletedData ? std::vector<FieldInfo>() : serializeFields, dataItem);
1138 if (errCode != E_OK) {
1139 LOGE("relational data value transfer to kv fail");
1140 }
1141 return errCode;
1142 }
1143
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1144 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1145 {
1146 int errCode = GetDataItemForSync(fullStmt, item, false);
1147 if (errCode != E_OK) {
1148 return errCode;
1149 }
1150 item.value = {};
1151 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1152 return E_OK;
1153 }
1154
1155 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1156 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp ×tamp)
1157 {
1158 if (stmt == nullptr) {
1159 return -E_INVALID_ARGS;
1160 }
1161 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1162 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1163 timestamp = INT64_MAX;
1164 errCode = E_OK;
1165 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1166 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
1167 errCode = E_OK;
1168 }
1169 return errCode;
1170 }
1171
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1172 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1173 std::vector<DataItem> &dataItems, DataItem &&item)
1174 {
1175 // If one record is over 4M, ignore it.
1176 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1177 overLongSize++;
1178 } else {
1179 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1180 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1181 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1182 return -E_UNFINISHED;
1183 } else {
1184 dataItems.push_back(item);
1185 }
1186 }
1187 return E_OK;
1188 }
1189 }
1190
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1191 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1192 sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1193 {
1194 if (!isFirstTime) { // For the first time, never step before, can get nothing
1195 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1196 if (errCode != E_OK) {
1197 return errCode;
1198 }
1199 }
1200 return StepNext(isMemDb_, queryStmt, queryTime);
1201 }
1202
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1203 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1204 Timestamp &missQueryTime)
1205 {
1206 int errCode = GetMissQueryData(fullStmt, item);
1207 if (errCode != E_OK) {
1208 return errCode;
1209 }
1210 return StepNext(isMemDb_, fullStmt, missQueryTime);
1211 }
1212
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1213 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1214 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1215 const TableInfo &tableInfo)
1216 {
1217 baseTblName_ = tableInfo.GetTableName();
1218 SetTableInfo(tableInfo);
1219 sqlite3_stmt *queryStmt = nullptr;
1220 sqlite3_stmt *fullStmt = nullptr;
1221 bool isGettingDeletedData = false;
1222 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1223 if (errCode != E_OK) {
1224 return errCode;
1225 }
1226
1227 Timestamp queryTime = 0;
1228 Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1229
1230 bool isFirstTime = true;
1231 size_t dataTotalSize = 0;
1232 size_t overLongSize = 0;
1233 do {
1234 DataItem item;
1235 if (queryTime < missQueryTime) {
1236 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1237 } else if (queryTime == missQueryTime) {
1238 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1239 if (errCode != E_OK) {
1240 break;
1241 }
1242 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1243 } else {
1244 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1245 }
1246
1247 if (errCode == E_OK && !isFirstTime) {
1248 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1249 }
1250
1251 if (errCode != E_OK) {
1252 break;
1253 }
1254
1255 isFirstTime = false;
1256 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1257 errCode = -E_FINISHED;
1258 break;
1259 }
1260 } while (true);
1261 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1262 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1263 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1264 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1265 return errCode;
1266 }
1267
CheckDBModeForRelational()1268 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1269 {
1270 std::string journalMode;
1271 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1272
1273 for (auto &c : journalMode) { // convert to lowercase
1274 c = static_cast<char>(std::tolower(c));
1275 }
1276
1277 if (errCode == E_OK && journalMode != "wal") {
1278 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1279 return -E_NOT_SUPPORT;
1280 }
1281 return errCode;
1282 }
1283
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1284 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1285 const std::string &tableName)
1286 {
1287 std::vector<std::string> deviceTables;
1288 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1289 if (errCode != E_OK) {
1290 LOGE("Get device table name for alter table failed. %d", errCode);
1291 return errCode;
1292 }
1293
1294 LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1295 for (const auto &table : deviceTables) {
1296 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1297 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1298 if (errCode != E_OK) {
1299 LOGE("Delete device data failed. %d", errCode);
1300 break;
1301 }
1302 }
1303 return errCode;
1304 }
1305
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1306 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1307 {
1308 std::string deleteLogSql =
1309 "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1310 "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1311 return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1312 }
1313
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1314 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1315 const std::string &tableName)
1316 {
1317 std::string deleteLogSql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1318 "_log WHERE device = ?";
1319 sqlite3_stmt *deleteLogStmt = nullptr;
1320 int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1321 if (errCode != E_OK) {
1322 LOGE("Get delete device data log statement failed. %d", errCode);
1323 return errCode;
1324 }
1325
1326 errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1327 if (errCode != E_OK) {
1328 LOGE("Bind device to delete data log statement failed. %d", errCode);
1329 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1330 return errCode;
1331 }
1332
1333 errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1334 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1335 errCode = E_OK;
1336 } else {
1337 LOGE("Delete data log failed. %d", errCode);
1338 }
1339
1340 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1341 return errCode;
1342 }
1343
DeleteDistributedLogTable(const std::string & tableName)1344 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1345 {
1346 if (tableName.empty()) {
1347 return -E_INVALID_ARGS;
1348 }
1349 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1350 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1351 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1352 if (errCode != E_OK) {
1353 LOGE("Delete distributed log table failed. %d", errCode);
1354 }
1355 return errCode;
1356 }
1357
IsTableOnceDropped(const std::string & tableName,bool & onceDropped)1358 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, bool &onceDropped)
1359 {
1360 std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1361 Key key;
1362 DBCommon::StringToVector(keyStr, key);
1363 Value value;
1364
1365 int errCode = GetKvData(key, value);
1366 if (errCode == E_OK) {
1367 onceDropped = true;
1368 return E_OK;
1369 } else if (errCode == -E_NOT_FOUND) {
1370 onceDropped = false;
1371 return E_OK;
1372 }
1373 return errCode;
1374 }
1375
CleanResourceForDroppedTable(const std::string & tableName)1376 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1377 {
1378 int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1379 if (errCode != E_OK) {
1380 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1381 return errCode;
1382 }
1383 errCode = DeleteDistributedLogTable(tableName);
1384 if (errCode != E_OK) {
1385 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1386 return errCode;
1387 }
1388 errCode = DeleteTableTrigger(tableName);
1389 if (errCode != E_OK) {
1390 LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1391 }
1392 return errCode;
1393 }
1394
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1395 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1396 std::vector<std::string> &missingTables)
1397 {
1398 if (tableNames.empty()) {
1399 return E_OK;
1400 }
1401 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1402 sqlite3_stmt *stmt = nullptr;
1403 int ret = E_OK;
1404 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1405 if (errCode != E_OK) {
1406 SQLiteUtils::ResetStatement(stmt, true, ret);
1407 return errCode;
1408 }
1409 for (const auto &tableName : tableNames) {
1410 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1411 if (errCode != E_OK) {
1412 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1413 break;
1414 }
1415
1416 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1417 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1418 errCode = CleanResourceForDroppedTable(tableName);
1419 if (errCode != E_OK) {
1420 LOGE("Clean resource for dropped table failed. %d", errCode);
1421 break;
1422 }
1423 missingTables.emplace_back(tableName);
1424 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1425 LOGE("[CheckAndCleanDisTable]Check distributed table failed. %d", errCode);
1426 break;
1427 }
1428 errCode = E_OK; // Check result ok for distributed table is still exists
1429 SQLiteUtils::ResetStatement(stmt, false, ret);
1430 }
1431 SQLiteUtils::ResetStatement(stmt, true, ret);
1432 return CheckCorruptedStatus(errCode);
1433 }
1434
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1435 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1436 const TableInfo &baseTbl, const StoreInfo &info)
1437 {
1438 if (dbHandle_ == nullptr) {
1439 return -E_INVALID_DB;
1440 }
1441
1442 if (device.empty() || !baseTbl.IsValid()) {
1443 return -E_INVALID_ARGS;
1444 }
1445
1446 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1447 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1448 if (errCode != E_OK) {
1449 LOGE("Create device table failed. %d", errCode);
1450 return errCode;
1451 }
1452
1453 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1454 if (errCode != E_OK) {
1455 LOGE("Copy index to device table failed. %d", errCode);
1456 }
1457 return errCode;
1458 }
1459
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1460 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1461 const std::string &schemaVersion)
1462 {
1463 if (dbHandle_ == nullptr) {
1464 return -E_INVALID_DB;
1465 }
1466
1467 TableInfo newTable;
1468 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1469 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1470 LOGE("Check new schema failed. %d", errCode);
1471 return errCode;
1472 } else {
1473 errCode = table.CompareWithTable(newTable, schemaVersion);
1474 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1475 LOGE("Check schema failed, schema was changed. %d", errCode);
1476 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1477 } else {
1478 errCode = E_OK;
1479 }
1480 }
1481
1482 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1483 if (errCode != E_OK) {
1484 LOGE("Get query helper for check query failed. %d", errCode);
1485 return errCode;
1486 }
1487
1488 if (!query.IsQueryForRelationalDB()) {
1489 LOGE("Not support for this query type.");
1490 return -E_NOT_SUPPORT;
1491 }
1492
1493 SyncTimeRange defaultTimeRange;
1494 sqlite3_stmt *stmt = nullptr;
1495 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1496 stmt);
1497 if (errCode != E_OK) {
1498 LOGE("Get query statement for check query failed. %d", errCode);
1499 }
1500
1501 SQLiteUtils::ResetStatement(stmt, true, errCode);
1502 return errCode;
1503 }
1504
CheckQueryObjectLegal(const QuerySyncObject & query)1505 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1506 {
1507 if (dbHandle_ == nullptr) {
1508 return -E_INVALID_DB;
1509 }
1510 TableInfo newTable;
1511 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1512 if (errCode != E_OK) {
1513 LOGE("Check new schema failed. %d", errCode);
1514 }
1515 return errCode;
1516 }
1517
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1518 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1519 Timestamp &maxTimestamp) const
1520 {
1521 maxTimestamp = 0;
1522 for (const auto &tableName : tableNames) {
1523 const std::string sql = "SELECT MAX(timestamp) FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1524 "_log;";
1525 sqlite3_stmt *stmt = nullptr;
1526 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1527 if (errCode != E_OK) {
1528 return errCode;
1529 }
1530 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1531 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1532 maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1533 errCode = E_OK;
1534 }
1535 SQLiteUtils::ResetStatement(stmt, true, errCode);
1536 if (errCode != E_OK) {
1537 maxTimestamp = 0;
1538 return errCode;
1539 }
1540 }
1541 return E_OK;
1542 }
1543
SetLogTriggerStatus(bool status)1544 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1545 {
1546 const std::string key = "log_trigger_switch";
1547 std::string val = status ? "true" : "false";
1548 std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
1549 " VALUES ('" + key + "', '" + val + "')";
1550 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1551 if (errCode != E_OK) {
1552 LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1553 }
1554 return errCode;
1555 }
1556
1557 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1558 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1559 std::vector<RelationalRowData *> &data)
1560 {
1561 size_t totalLength = 0;
1562 do {
1563 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1564 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1565 return E_OK;
1566 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1567 LOGE("Get data by bind sql failed:%d", errCode);
1568 return errCode;
1569 }
1570
1571 if (colNames.empty()) {
1572 SQLiteUtils::GetSelectCols(stmt, colNames); // Get column names.
1573 }
1574 auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1575 if (relaRowData == nullptr) {
1576 LOGE("ExecuteQueryBySqlStmt OOM");
1577 return -E_OUT_OF_MEMORY;
1578 }
1579
1580 auto dataSz = relaRowData->CalcLength();
1581 if (dataSz == 0) { // invalid data
1582 delete relaRowData;
1583 relaRowData = nullptr;
1584 continue;
1585 }
1586
1587 totalLength += static_cast<size_t>(dataSz);
1588 if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) { // the set has been full
1589 delete relaRowData;
1590 relaRowData = nullptr;
1591 LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1592 return -E_REMOTE_OVER_SIZE;
1593 }
1594 data.push_back(relaRowData);
1595 } while (true);
1596 return E_OK;
1597 }
1598 }
1599
1600 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1601 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1602 const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1603 std::vector<RelationalRowData *> &data)
1604 {
1605 int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1606 if (errCode != E_OK) {
1607 return errCode;
1608 }
1609
1610 sqlite3_stmt *stmt = nullptr;
1611 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1612 if (errCode != E_OK) {
1613 (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1614 return errCode;
1615 }
1616 ResFinalizer finalizer([this, &stmt, &errCode] {
1617 (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1618 SQLiteUtils::ResetStatement(stmt, true, errCode);
1619 });
1620 for (size_t i = 0; i < bindArgs.size(); ++i) {
1621 errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1622 if (errCode != E_OK) {
1623 return errCode;
1624 }
1625 }
1626 return GetRowDatas(stmt, isMemDb_, colNames, data);
1627 }
1628
CheckEncryptedOrCorrupted() const1629 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1630 {
1631 if (dbHandle_ == nullptr) {
1632 return -E_INVALID_DB;
1633 }
1634
1635 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1636 if (errCode != E_OK) {
1637 LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1638 }
1639 return errCode;
1640 }
1641
GetExistsDeviceList(std::set<std::string> & devices) const1642 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1643 {
1644 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1645 isMemDb_, devices);
1646 }
1647
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1648 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1649 const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1650 bool isCompensatedTask, std::vector<std::string> &cloudGid)
1651 {
1652 sqlite3_stmt *queryStmt = nullptr;
1653 int errCode = E_OK;
1654 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1655 if (errCode != E_OK) {
1656 return errCode;
1657 }
1658 std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1659 isCompensatedTask);
1660 errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, queryStmt);
1661 if (errCode != E_OK) {
1662 return errCode;
1663 }
1664 do {
1665 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1666 if (errCode != E_OK) {
1667 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1668 break;
1669 }
1670 GetCloudGid(queryStmt, cloudGid);
1671 } while (errCode == E_OK);
1672 int resetStatementErrCode = E_OK;
1673 SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1674 queryStmt = nullptr;
1675 return (errCode == E_OK ? resetStatementErrCode : errCode);
1676 }
1677
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1678 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1679 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1680 {
1681 VBucket log;
1682 VBucket extraLog;
1683 uint32_t preSize = totalSize;
1684 int64_t revisedTime = 0;
1685 int64_t invalidTime = 0;
1686 GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
1687 GetCloudExtraLog(statement, extraLog);
1688 if (revisedTime != 0) {
1689 Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
1690 cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
1691 }
1692
1693 VBucket data;
1694 int64_t flag = 0;
1695 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1696 if (errCode != E_OK) {
1697 return errCode;
1698 }
1699 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1700 for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1701 Type cloudValue;
1702 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1703 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1704 if (errCode != E_OK) {
1705 return errCode;
1706 }
1707 SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1708 errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1709 if (errCode != E_OK) {
1710 return errCode;
1711 }
1712 }
1713 }
1714
1715 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1716 errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1717 } else {
1718 errCode = -E_UNFINISHED;
1719 }
1720 if (errCode == -E_IGNORE_DATA) {
1721 errCode = E_OK;
1722 totalSize = preSize;
1723 stepNum--;
1724 }
1725 return errCode;
1726 }
1727
SetLocalSchema(const RelationalSchemaObject & localSchema)1728 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1729 {
1730 localSchema_ = localSchema;
1731 }
1732
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1733 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1734 {
1735 std::string setFlag;
1736 if (mode == FLAG_ONLY && isLogicDelete_) {
1737 setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1738 } else {
1739 setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1740 }
1741 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1742 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1743 SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1744 CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1745 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1746 if (errCode != E_OK) {
1747 LOGE("clean cloud log failed, %d", errCode);
1748 return errCode;
1749 }
1750 cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1751 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1752 if (errCode != E_OK) {
1753 LOGE("delete cloud log failed, %d", errCode);
1754 return errCode;
1755 }
1756 // set all flag logout and data upload is not finished.
1757 cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1758 if (mode == FLAG_ONLY) {
1759 cleanLogSql += " = flag | 0x800 & ~0x400;";
1760 } else {
1761 cleanLogSql += " = flag & ~0x400;";
1762 }
1763 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1764 }
1765
CleanUploadFinishedFlag(const std::string & tableName)1766 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1767 {
1768 // unset upload finished flag
1769 std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1770 CloudDbConstant::FLAG + " = flag & ~0x400;";
1771 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1772 }
1773
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1774 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1775 const std::string &logTableName, const RelationalSchemaObject &localSchema)
1776 {
1777 std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1778 " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1779 ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1780 "));";
1781 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1782 if (errCode != E_OK) {
1783 LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1784 return errCode;
1785 }
1786 std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1787 FLAG_IS_CLOUD_CONSISTENCY + ";";
1788 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1789 if (errCode != E_OK) {
1790 LOGE("Failed to delete cloud data on log table, %d.", errCode);
1791 return errCode;
1792 }
1793 errCode = DoCleanAssetId(tableName, localSchema);
1794 if (errCode != E_OK) {
1795 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1796 return errCode;
1797 }
1798 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1799 if (errCode != E_OK) {
1800 LOGE("Failed to clean gid on log table, %d.", errCode);
1801 }
1802 return errCode;
1803 }
1804
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1805 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1806 {
1807 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1808 SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1809 CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1810 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1811 if (errCode != E_OK) {
1812 LOGE("change cloud log flag failed, %d", errCode);
1813 }
1814 return errCode;
1815 }
1816
SetDataOnUserTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1817 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTableWithLogicDelete(const std::string &tableName,
1818 const std::string &logTableName)
1819 {
1820 UpdateCursorContext context;
1821 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1822 LOGI("removeData on userTable:%s length:%d start and cursor is %llu.",
1823 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1824 errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1825 if (errCode != E_OK) {
1826 LOGE("Failed to create updateCursor func on userTable errCode=%d.", errCode);
1827 return errCode;
1828 }
1829 // data from cloud and not modified by local or consistency with cloud to flag logout
1830 std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_WHEN_LOGOUT +
1831 ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1832 SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1833 " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD_CONSISTENCY + " OR " +
1834 FLAG_IS_CLOUD + ") AND NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE
1835 + "));";
1836 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1837 // here just clear updateCursor func, fail will not influence other function
1838 (void)CreateFuncUpdateCursor(context, nullptr);
1839 if (errCode != E_OK) {
1840 LOGE("Failed to change cloud data flag on userTable, %d.", errCode);
1841 return errCode;
1842 }
1843 // clear some column when data is logicDelete or physical delete
1844 sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1845 " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1846 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1847 if (errCode != E_OK) {
1848 LOGE("Failed to deal logic delete data flag on userTable, %d.", errCode);
1849 return errCode;
1850 }
1851 LOGI("removeData on userTable:%s length:%d finish and cursor is %llu.",
1852 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1853 errCode = SetCursor(tableName, context.cursor);
1854 if (errCode != E_OK) {
1855 LOGE("set new cursor after removeData error %d.", errCode);
1856 return errCode;
1857 }
1858 return ChangeCloudDataFlagOnLogTable(logTableName);
1859 }
1860
SetDataOnShareTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1861 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnShareTableWithLogicDelete(const std::string &tableName,
1862 const std::string &logTableName)
1863 {
1864 UpdateCursorContext context;
1865 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1866 LOGI("removeData on shareTable:%s length:%d start and cursor is %llu.",
1867 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1868 errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1869 if (errCode != E_OK) {
1870 LOGE("Failed to create updateCursor func on shareTable errCode=%d.", errCode);
1871 return errCode;
1872 }
1873 std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " +
1874 SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1875 CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1876 " WHERE (NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE + "));";
1877 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1878 // here just clear updateCursor func, fail will not influence other function
1879 (void)CreateFuncUpdateCursor(context, nullptr);
1880 if (errCode != E_OK) {
1881 LOGE("Failed to change cloud data flag on shareTable, %d.", errCode);
1882 return errCode;
1883 }
1884 // clear some column when data is logicDelete or physical delete
1885 sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1886 " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1887 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1888 if (errCode != E_OK) {
1889 LOGE("Failed to deal logic delete data flag on shareTable, %d.", errCode);
1890 return errCode;
1891 }
1892 LOGI("removeData on shareTable:%s length:%d finish and cursor is %llu.",
1893 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1894 return SetCursor(tableName, context.cursor);
1895 }
1896
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1897 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1898 std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1899 {
1900 sqlite3_stmt *selectStmt = nullptr;
1901 std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1902 " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1903 if (distinguishCloudFlag) {
1904 sql += " AND (";
1905 sql += FLAG_IS_CLOUD;
1906 sql += " OR ";
1907 sql += FLAG_IS_CLOUD_CONSISTENCY;
1908 sql += " )";
1909 }
1910 sql += ";";
1911 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1912 if (errCode != E_OK) {
1913 LOGE("Get select data_key statement failed, %d", errCode);
1914 return errCode;
1915 }
1916 do {
1917 errCode = SQLiteUtils::StepWithRetry(selectStmt);
1918 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1919 dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1920 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1921 LOGE("SQLite step failed when query log's data_key : %d", errCode);
1922 break;
1923 }
1924 } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1925 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1926 return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1927 }
1928
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1929 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1930 const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1931 {
1932 std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1933 if (opType == OpType::ONLY_UPDATE_GID) {
1934 updateLogSql += "cloud_gid = ?";
1935 updateColName.push_back(CloudDbConstant::GID_FIELD);
1936 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1937 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1938 updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1939 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1940 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1941 updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1942 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1943 } else if (opType == OpType::UPDATE_TIMESTAMP) {
1944 updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1945 ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1946 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1947 } else if (opType == OpType::CLEAR_GID) {
1948 updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1949 std::to_string(SET_FLAG_ZERO_MASK);
1950 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1951 updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1952 updateColName.push_back(CloudDbConstant::GID_FIELD);
1953 updateLogSql += ", version = ?";
1954 updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1955 } else {
1956 updateLogSql += " device = 'cloud', timestamp = ?,";
1957 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1958 if (opType == OpType::DELETE) {
1959 int errCode = GetCloudDeleteSql(tableSchema.name, updateLogSql);
1960 if (errCode != E_OK) {
1961 return errCode;
1962 }
1963 } else {
1964 updateLogSql += GetUpdateDataFlagSql(vBucket) + ", cloud_gid = ?";
1965 updateColName.push_back(CloudDbConstant::GID_FIELD);
1966 CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1967 }
1968 }
1969
1970 int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1971 if (errCode != E_OK) {
1972 return errCode;
1973 }
1974
1975 errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1976 if (errCode != E_OK) {
1977 LOGE("Get update log statement failed when update cloud data, %d", errCode);
1978 }
1979 return errCode;
1980 }
1981
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)1982 int SQLiteSingleVerRelationalStorageExecutor::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
1983 LockStatus &status)
1984 {
1985 std::string sql = "select status from " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = '" +
1986 gid + "';";
1987 sqlite3_stmt *stmt = nullptr;
1988 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1989 if (errCode != E_OK) {
1990 LOGE("Get stmt failed when get lock status: %d", errCode);
1991 return errCode;
1992 }
1993 errCode = SQLiteUtils::StepWithRetry(stmt);
1994 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1995 status = static_cast<LockStatus>(sqlite3_column_int(stmt, 0));
1996 errCode = E_OK;
1997 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1998 LOGE("Not found lock status.");
1999 errCode = -E_NOT_FOUND;
2000 } else {
2001 LOGE("Get lock status failed: %d", errCode);
2002 }
2003 int resetRet;
2004 SQLiteUtils::ResetStatement(stmt, true, resetRet);
2005 return errCode;
2006 }
2007
UpdateHashKey(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)2008 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKey(DistributedTableMode mode, const TableInfo &tableInfo,
2009 TableSyncType syncType)
2010 {
2011 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
2012 auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
2013 std::string sql = "UPDATE " + logName + " SET hash_key = hash_key || '_old' where data_key in " +
2014 "(select _rowid_ from '" + tableInfo.GetTableName() + "');";
2015 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2016 if (errCode != E_OK) {
2017 return errCode;
2018 }
2019 sql = "UPDATE " + logName + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
2020 tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, "") + " AS hash_value " +
2021 "FROM '" + tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + logName + ".data_key;";
2022 return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2023 }
2024
SetTableMode(DistributedTableMode mode)2025 void SQLiteSingleVerRelationalStorageExecutor::SetTableMode(DistributedTableMode mode)
2026 {
2027 mode_ = mode;
2028 }
2029 } // namespace DistributedDB
2030 #endif
2031