1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
34 namespace {
35 static constexpr const char* ROWID = "ROWID";
36 static constexpr const char* TIMESTAMP = "TIMESTAMP";
37 static constexpr const char* FLAG = "FLAG";
38 static constexpr const char* DATAKEY = "DATA_KEY";
39 static constexpr const char* DEVICE_FIELD = "DEVICE";
40 static constexpr const char* CLOUD_GID_FIELD = "CLOUD_GID";
41 static constexpr const char* FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
42 static constexpr const char* SET_FLAG_LOCAL = "FLAG | 0x02"; // set 1th bit of flag to one which is local
43 static constexpr const int SET_FLAG_ZERO_MASK = 0x03; // clear 2th bit of flag
44 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
45 static constexpr const int SET_CLOUD_FLAG = 0x05; // set 1th bit of flag to 0
46 static constexpr const int DATA_KEY_INDEX = 0;
47 static constexpr const int TIMESTAMP_INDEX = 3;
48 static constexpr const int W_TIMESTAMP_INDEX = 4;
49 static constexpr const int FLAG_INDEX = 5;
50 static constexpr const int CLOUD_GID_INDEX = 7;
51
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)52 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
53 {
54 if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
55 return SQLITE_DENY;
56 }
57 return SQLITE_OK;
58 }
59 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)60 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
61 DistributedTableMode mode)
62 : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode)
63 {
64 bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
65 bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
66 bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
67 bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
68 bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
69 bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
70 bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
71 }
72
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)73 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
74 {
75 std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
76 if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
77 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
78 return -E_NOT_SUPPORT;
79 }
80
81 if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
82 if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
83 LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
84 return -E_NOT_SUPPORT;
85 }
86
87 if (DBCommon::HasConstraint(trimedSql, "ON CONFLICT", " )", " ")) {
88 LOGE("[CreateDistributedTable] Not support create distributed table with 'ON CONFLICT' constraint.");
89 return -E_NOT_SUPPORT;
90 }
91
92 if (mode == DistributedTableMode::COLLABORATION) {
93 if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
94 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
95 return -E_NOT_SUPPORT;
96 }
97 }
98
99 if (syncType == CLOUD_COOPERATION) {
100 int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
101 if (errCode != E_OK) {
102 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
103 return errCode;
104 }
105 }
106 }
107
108 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
109 if (table.GetPrimaryKey().size() > 1) {
110 LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
111 return -E_NOT_SUPPORT;
112 }
113 }
114
115 return E_OK;
116 }
117
118 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)119 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
120 {
121 std::string sql = "SELECT get_sys_time(0) - max(rowid) - 1 FROM '" + tableName + "';";
122 sqlite3_stmt *stmt = nullptr;
123 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
124 if (errCode != E_OK) {
125 return errCode;
126 }
127 errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
128 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
129 timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
130 errCode = E_OK;
131 }
132 SQLiteUtils::ResetStatement(stmt, true, errCode);
133 return errCode;
134 }
135 }
136
GeneLogInfoForExistedData(sqlite3 * db,const std::string & tableName,const TableInfo & table,const std::string & calPrimaryKeyHash)137 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName,
138 const TableInfo &table, const std::string &calPrimaryKeyHash)
139 {
140 int64_t timeOffset = 0;
141 int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
142 if (errCode != E_OK) {
143 return errCode;
144 }
145 std::string timeOffsetStr = std::to_string(timeOffset);
146 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
147 std::string sql = "INSERT INTO " + logTable +
148 " SELECT rowid, '', '', " + timeOffsetStr + " + rowid, " + timeOffsetStr + " + rowid, 0x2, " +
149 calPrimaryKeyHash + ", ''" + " FROM '" + tableName + "' AS a WHERE 1=1;";
150 return SQLiteUtils::ExecuteRawSQL(db, sql);
151 }
152
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table,TableSyncType syncType)153 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
154 const std::string &identity, TableInfo &table, TableSyncType syncType)
155 {
156 if (dbHandle_ == nullptr) {
157 return -E_INVALID_DB;
158 }
159
160 const std::string tableName = table.GetTableName();
161 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
162 if (errCode != E_OK) {
163 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
164 return errCode;
165 }
166
167 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
168 bool isEmpty = false;
169 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
170 if (errCode != E_OK || !isEmpty) {
171 LOGE("[CreateDistributedTable] check table empty failed. error=%d, isEmpty=%d", errCode, isEmpty);
172 return -E_NOT_SUPPORT;
173 }
174 }
175
176 errCode = CheckTableConstraint(table, mode, syncType);
177 if (errCode != E_OK) {
178 LOGE("[CreateDistributedTable] check table constraint failed.");
179 return errCode;
180 }
181
182 // create log table
183 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
184 errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
185 if (errCode != E_OK) {
186 LOGE("[CreateDistributedTable] create log table failed");
187 return errCode;
188 }
189
190 if (!isUpgraded) {
191 std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, identity);
192 errCode = GeneLogInfoForExistedData(dbHandle_, tableName, table, calPrimaryKeyHash);
193 if (errCode != E_OK) {
194 return errCode;
195 }
196 }
197
198 // add trigger
199 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
200 if (errCode != E_OK) {
201 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
202 return errCode;
203 }
204 return SetLogTriggerStatus(true);
205 }
206
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)207 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
208 DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
209 {
210 if (dbHandle_ == nullptr) {
211 return -E_INVALID_DB;
212 }
213 TableInfo newTableInfo;
214 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
215 if (errCode != E_OK) {
216 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
217 return errCode;
218 }
219
220 if (CheckTableConstraint(newTableInfo, mode, syncType)) {
221 LOGE("[UpgradeDistributedTable] Not support create distributed table without rowid.");
222 return -E_NOT_SUPPORT;
223 }
224
225 // new table should has same or compatible upgrade
226 TableInfo tableInfo = schema.GetTable(tableName);
227 errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
228 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
229 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
230 return -E_SCHEMA_MISMATCH;
231 } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
232 LOGD("[UpgradeDistributedTable] schema has not changed.");
233 return E_OK;
234 }
235
236 schemaChanged = true;
237 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
238 if (errCode != E_OK) {
239 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
240 }
241
242 schema.AddRelationalTable(newTableInfo);
243 return errCode;
244 }
245
246 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)247 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
248 std::vector<std::string> &deviceTables)
249 {
250 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
251 return -E_INVALID_ARGS;
252 }
253 std::string devicePattern = device.empty() ? "%" : device;
254 std::string tablePattern = tableName.empty() ? "%" : tableName;
255 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
256
257 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
258 deviceTableName + "';";
259 sqlite3_stmt *stmt = nullptr;
260 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
261 if (errCode != E_OK) {
262 SQLiteUtils::ResetStatement(stmt, true, errCode);
263 return errCode;
264 }
265
266 do {
267 errCode = SQLiteUtils::StepWithRetry(stmt, false);
268 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
269 errCode = E_OK;
270 break;
271 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
272 LOGE("Get table name failed. %d", errCode);
273 break;
274 }
275 std::string realTableName;
276 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
277 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
278 continue;
279 }
280 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
281 continue;
282 }
283 deviceTables.emplace_back(realTableName);
284 } while (true);
285
286 SQLiteUtils::ResetStatement(stmt, true, errCode);
287 return errCode;
288 }
289
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)290 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
291 {
292 std::vector<FieldInfo> fields;
293 auto itOld = oldTableInfo.GetFields().begin();
294 auto itNew = newTableInfo.GetFields().begin();
295 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
296 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
297 fields.emplace_back(itNew->second);
298 continue;
299 }
300 itOld++;
301 }
302 return fields;
303 }
304
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)305 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
306 {
307 if (db == nullptr) {
308 return -E_INVALID_ARGS;
309 }
310
311 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
312 return a.GetColumnId()< b.GetColumnId();
313 });
314 int errCode = E_OK;
315 for (const auto &table : tables) {
316 for (const auto &field : fields) {
317 std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
318 alterSql += "'" + field.GetDataType() + "'";
319 alterSql += field.IsNotNull() ? " NOT NULL" : "";
320 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
321 alterSql += ";";
322 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
323 if (errCode != E_OK) {
324 LOGE("Alter table failed. %d", errCode);
325 break;
326 }
327 }
328 }
329 return errCode;
330 }
331
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)332 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
333 {
334 IndexInfoMap indexes;
335 auto itOld = oldTableInfo.GetIndexDefine().begin();
336 auto itNew = newTableInfo.GetIndexDefine().begin();
337 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
338 auto itNewEnd = newTableInfo.GetIndexDefine().end();
339
340 while (itOld != itOldEnd && itNew != itNewEnd) {
341 if (itOld->first == itNew->first) {
342 if (itOld->second != itNew->second) {
343 indexes.insert({itNew->first, itNew->second});
344 }
345 itOld++;
346 itNew++;
347 } else if (itOld->first < itNew->first) {
348 indexes.insert({itOld->first, {}});
349 itOld++;
350 } else if (itOld->first > itNew->first) {
351 indexes.insert({itNew->first, itNew->second});
352 itNew++;
353 }
354 }
355
356 while (itOld != itOldEnd) {
357 indexes.insert({itOld->first, {}});
358 itOld++;
359 }
360
361 while (itNew != itNewEnd) {
362 indexes.insert({itNew->first, itNew->second});
363 itNew++;
364 }
365
366 return indexes;
367 }
368
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)369 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
370 {
371 if (db == nullptr) {
372 return -E_INVALID_ARGS;
373 }
374
375 int errCode = E_OK;
376 for (const auto &table : tables) {
377 for (const auto &index : indexes) {
378 if (index.first.empty()) {
379 continue;
380 }
381 std::string realIndexName = table + "_" + index.first;
382 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
383 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
384 if (errCode != E_OK) {
385 LOGE("Drop index failed. %d", errCode);
386 return errCode;
387 }
388
389 if (index.second.empty()) { // empty means drop index only
390 continue;
391 }
392
393 auto it = index.second.begin();
394 std::string indexDefine = *it++;
395 while (it != index.second.end()) {
396 indexDefine += ", " + *it++;
397 }
398 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
399 "(" + indexDefine + ");";
400 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
401 if (errCode != E_OK) {
402 LOGE("Create index failed. %d", errCode);
403 break;
404 }
405 }
406 }
407 return errCode;
408 }
409 }
410
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)411 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
412 const TableInfo &newTableInfo)
413 {
414 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
415 IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
416 std::vector<std::string> deviceTables;
417 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
418 if (errCode != E_OK) {
419 LOGE("Get device table name for alter table failed. %d", errCode);
420 return errCode;
421 }
422
423 LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
424 upgradeIndexes.size(), deviceTables.size());
425 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
426 if (errCode != E_OK) {
427 LOGE("upgrade fields failed. %d", errCode);
428 return errCode;
429 }
430
431 errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
432 if (errCode != E_OK) {
433 LOGE("upgrade indexes failed. %d", errCode);
434 }
435
436 return E_OK;
437 }
438
StartTransaction(TransactType type)439 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
440 {
441 if (dbHandle_ == nullptr) {
442 LOGE("Begin transaction failed, dbHandle is null.");
443 return -E_INVALID_DB;
444 }
445 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
446 if (errCode != E_OK) {
447 LOGE("Begin transaction failed, errCode = %d", errCode);
448 }
449 return errCode;
450 }
451
Commit()452 int SQLiteSingleVerRelationalStorageExecutor::Commit()
453 {
454 if (dbHandle_ == nullptr) {
455 return -E_INVALID_DB;
456 }
457
458 return SQLiteUtils::CommitTransaction(dbHandle_);
459 }
460
Rollback()461 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
462 {
463 if (dbHandle_ == nullptr) {
464 return -E_INVALID_DB;
465 }
466 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
467 if (errCode != E_OK) {
468 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
469 }
470 return errCode;
471 }
472
SetTableInfo(const TableInfo & tableInfo)473 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
474 {
475 table_ = tableInfo;
476 }
477
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)478 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
479 {
480 logInfo.dataKey = sqlite3_column_int64(logStatement, 0); // 0 means dataKey index
481
482 std::vector<uint8_t> dev;
483 int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev); // 1 means dev index
484 if (errCode != E_OK) {
485 return errCode;
486 }
487 logInfo.device = std::string(dev.begin(), dev.end());
488
489 std::vector<uint8_t> oriDev;
490 errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev); // 2 means ori_dev index
491 if (errCode != E_OK) {
492 return errCode;
493 }
494 logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
495 logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3)); // 3 means timestamp index
496 logInfo.wTimestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4)); // 4 means w_timestamp index
497 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5)); // 5 means flag index
498 logInfo.flag &= (~DataItem::LOCAL_FLAG);
499 logInfo.flag &= (~DataItem::UPDATE_FLAG);
500 return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey); // 6 means hashKey index
501 }
502
503 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize)504 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize)
505 {
506 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD,
507 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
508 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
509 static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
510 totalSize += sizeof(int64_t) + sizeof(int64_t);
511 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
512 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
513 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
514 if (!cloudGid.empty()) {
515 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
516 totalSize += cloudGid.size();
517 }
518 }
519 }
520
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)521 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
522 {
523 flags.insert_or_assign(ROWID,
524 static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
525 flags.insert_or_assign(TIMESTAMP,
526 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
527 flags.insert_or_assign(FLAG,
528 static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
529 }
530
IdentifyCloudType(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)531 int IdentifyCloudType(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
532 {
533 int64_t *rowid = std::get_if<int64_t>(&flags[ROWID]);
534 int64_t *flag = std::get_if<int64_t>(&flags[FLAG]);
535 int64_t *timeStamp = std::get_if<int64_t>(&flags[TIMESTAMP]);
536 if (rowid == nullptr || flag == nullptr || timeStamp == nullptr) {
537 return -E_INVALID_DATA;
538 }
539 if ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0) {
540 cloudSyncData.delData.record.push_back(data);
541 cloudSyncData.delData.extend.push_back(log);
542 } else if (log.find(CloudDbConstant::GID_FIELD) == log.end()) {
543 if (data.empty()) {
544 LOGE("The cloud data to be inserted is empty.");
545 return -E_INVALID_DATA;
546 }
547 cloudSyncData.insData.record.push_back(data);
548 cloudSyncData.insData.rowid.push_back(*rowid);
549 VBucket asset;
550 CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
551 cloudSyncData.insData.timestamp.push_back(*timeStamp);
552 cloudSyncData.insData.assets.push_back(asset);
553 cloudSyncData.insData.extend.push_back(log);
554 } else {
555 if (data.empty()) {
556 LOGE("The cloud data to be updated is empty.");
557 return -E_INVALID_DATA;
558 }
559 cloudSyncData.updData.record.push_back(data);
560 VBucket asset;
561 CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
562 if (!asset.empty()) {
563 cloudSyncData.updData.rowid.push_back(*rowid);
564 cloudSyncData.updData.timestamp.push_back(*timeStamp);
565 cloudSyncData.updData.assets.push_back(asset);
566 }
567 cloudSyncData.updData.extend.push_back(log);
568 }
569 return E_OK;
570 }
571 }
572
GetDataItemSerialSize(DataItem & item,size_t appendLen)573 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
574 {
575 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
576 // the size would not be very large.
577 static const size_t maxOrigDevLength = 40;
578 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
579 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
580 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
581 return dataSize;
582 }
583
GetKvData(const Key & key,Value & value) const584 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
585 {
586 static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
587 "metadata WHERE key=?;";
588 sqlite3_stmt *statement = nullptr;
589 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
590 if (errCode != E_OK) {
591 goto END;
592 }
593
594 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
595 if (errCode != E_OK) {
596 goto END;
597 }
598
599 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
600 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
601 errCode = -E_NOT_FOUND;
602 goto END;
603 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
604 goto END;
605 }
606
607 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
608 END:
609 SQLiteUtils::ResetStatement(statement, true, errCode);
610 return errCode;
611 }
612
PutKvData(const Key & key,const Value & value) const613 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
614 {
615 static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
616 "metadata VALUES(?,?);";
617 sqlite3_stmt *statement = nullptr;
618 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
619 if (errCode != E_OK) {
620 goto ERROR;
621 }
622
623 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // 1 means key index
624 if (errCode != E_OK) {
625 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
626 goto ERROR;
627 }
628
629 errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true); // 2 means value index
630 if (errCode != E_OK) {
631 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
632 goto ERROR;
633 }
634 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
635 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
636 errCode = E_OK;
637 }
638 ERROR:
639 SQLiteUtils::ResetStatement(statement, true, errCode);
640 return errCode;
641 }
642
DeleteMetaData(const std::vector<Key> & keys) const643 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
644 {
645 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
646 "metadata WHERE key=?;";
647 sqlite3_stmt *statement = nullptr;
648 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
649 if (errCode != E_OK) {
650 return errCode;
651 }
652
653 for (const auto &key : keys) {
654 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
655 if (errCode != E_OK) {
656 break;
657 }
658
659 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
660 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
661 break;
662 }
663 errCode = E_OK;
664 SQLiteUtils::ResetStatement(statement, false, errCode);
665 }
666 SQLiteUtils::ResetStatement(statement, true, errCode);
667 return CheckCorruptedStatus(errCode);
668 }
669
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const670 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
671 {
672 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
673 "metadata WHERE key>=? AND key<=?;";
674 sqlite3_stmt *statement = nullptr;
675 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
676 if (errCode != E_OK) {
677 return errCode;
678 }
679
680 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
681 if (errCode == E_OK) {
682 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
683 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
684 errCode = E_OK;
685 }
686 }
687 SQLiteUtils::ResetStatement(statement, true, errCode);
688 return CheckCorruptedStatus(errCode);
689 }
690
GetAllMetaKeys(std::vector<Key> & keys) const691 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
692 {
693 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
694 sqlite3_stmt *statement = nullptr;
695 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
696 if (errCode != E_OK) {
697 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
698 return errCode;
699 }
700 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
701 SQLiteUtils::ResetStatement(statement, true, errCode);
702 return errCode;
703 }
704
GetLogInfoPre(sqlite3_stmt * queryStmt,const DataItem & dataItem,LogInfo & logInfoGet)705 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem,
706 LogInfo &logInfoGet)
707 {
708 if (queryStmt == nullptr) {
709 return -E_INVALID_ARGS;
710 }
711 int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey); // 1 means hashkey index.
712 if (errCode != E_OK) {
713 return errCode;
714 }
715 if (mode_ != DistributedTableMode::COLLABORATION) {
716 errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev); // 2 means device index.
717 if (errCode != E_OK) {
718 return errCode;
719 }
720 }
721
722 errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
723 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
724 errCode = -E_NOT_FOUND;
725 } else {
726 errCode = GetLogData(queryStmt, logInfoGet);
727 }
728 return errCode;
729 }
730
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)731 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
732 const DataItem &dataItem, int64_t rowid)
733 {
734 LogInfo logInfoGet;
735 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
736 LogInfo logInfoBind;
737 logInfoBind.hashKey = dataItem.hashKey;
738 logInfoBind.device = dataItem.dev;
739 logInfoBind.timestamp = dataItem.timestamp;
740 logInfoBind.flag = dataItem.flag;
741
742 if (errCode == -E_NOT_FOUND) { // insert
743 logInfoBind.wTimestamp = dataItem.writeTimestamp;
744 logInfoBind.originDev = dataItem.dev;
745 } else if (errCode == E_OK) { // update
746 logInfoBind.wTimestamp = logInfoGet.wTimestamp;
747 logInfoBind.originDev = logInfoGet.originDev;
748 } else {
749 return errCode;
750 }
751
752 // bind
753 SQLiteUtils::BindInt64ToStatement(statement, 1, rowid); // 1 means dataKey index
754 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
755 SQLiteUtils::BindBlobToStatement(statement, 2, originDev); // 2 means ori_dev index
756 SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp); // 3 means timestamp index
757 SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp); // 4 means w_timestamp index
758 SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag); // 5 means flag index
759 SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey); // 6 means hashKey index
760 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
761 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
762 return E_OK;
763 }
764 return errCode;
765 }
766
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)767 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
768 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
769 {
770 if (stmt == nullptr) {
771 int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
772 if (errCode != E_OK) {
773 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
774 return errCode;
775 }
776 }
777
778 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
779 if (errCode != E_OK) {
780 SQLiteUtils::ResetStatement(stmt, true, errCode);
781 return errCode;
782 }
783 if (mode_ != DistributedTableMode::COLLABORATION) {
784 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
785 if (errCode != E_OK) {
786 SQLiteUtils::ResetStatement(stmt, true, errCode);
787 return errCode;
788 }
789 }
790 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
791 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
792 errCode = E_OK;
793 }
794 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
795 return errCode;
796 }
797
SaveSyncDataItem(const DataItem & dataItem,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)798 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt,
799 RelationalSyncDataInserter &inserter, int64_t &rowid)
800 {
801 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
802 return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
803 }
804 if ((mode_ == DistributedTableMode::COLLABORATION && inserter.GetLocalTable().GetIdentifyKey().size() == 1u &&
805 inserter.GetLocalTable().GetIdentifyKey().at(0) == "rowid") ||
806 (mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().GetPrimaryKey().size() == 1u &&
807 inserter.GetLocalTable().GetPrimaryKey().at(0) == "rowid") ||
808 inserter.GetLocalTable().GetAutoIncrement()) { // No primary key of auto increment
809 int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
810 if (errCode != E_OK) {
811 LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
812 return errCode;
813 }
814 }
815
816 int errCode = inserter.BindInsertStatement(saveStmt.saveDataStmt, dataItem);
817 if (errCode != E_OK) {
818 LOGE("Bind data failed, errCode=%d.", errCode);
819 return errCode;
820 }
821 errCode = SQLiteUtils::StepWithRetry(saveStmt.saveDataStmt, isMemDb_);
822 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
823 rowid = SQLiteUtils::GetLastRowId(dbHandle_);
824 errCode = E_OK;
825 }
826 return errCode;
827 }
828
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)829 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
830 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
831 {
832 if (stmt == nullptr) {
833 int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
834 if (errCode != E_OK) {
835 LOGE("[DeleteSyncLog] Get statement fail!");
836 return errCode;
837 }
838 }
839
840 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
841 if (errCode != E_OK) {
842 SQLiteUtils::ResetStatement(stmt, true, errCode);
843 return errCode;
844 }
845 if (mode_ != DistributedTableMode::COLLABORATION) {
846 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
847 if (errCode != E_OK) {
848 SQLiteUtils::ResetStatement(stmt, true, errCode);
849 return errCode;
850 }
851 }
852 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
853 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
854 errCode = E_OK;
855 }
856 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
857 return errCode;
858 }
859
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)860 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
861 RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
862 {
863 int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
864 if (errCode != E_OK) {
865 return errCode;
866 }
867 return DeleteSyncLog(item, inserter, rmLogStmt);
868 }
869
GetSyncDataPre(const DataItem & dataItem,sqlite3_stmt * queryStmt,DataItem & itemGet)870 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt,
871 DataItem &itemGet)
872 {
873 LogInfo logInfoGet;
874 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
875 itemGet.timestamp = logInfoGet.timestamp;
876 SQLiteUtils::ResetStatement(queryStmt, false, errCode);
877 return errCode;
878 }
879
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated)880 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
881 sqlite3_stmt *queryStmt, bool &isDefeated)
882 {
883 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
884 mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
885 isDefeated = false; // no need to solve conflict except miss query data
886 return E_OK;
887 }
888
889 DataItem itemGet;
890 int errCode = GetSyncDataPre(dataItem, queryStmt, itemGet);
891 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
892 LOGE("Failed to get raw data. %d", errCode);
893 return errCode;
894 }
895 isDefeated = (dataItem.timestamp <= itemGet.timestamp); // defeated if item timestamp is earlier then raw data
896 return E_OK;
897 }
898
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)899 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
900 SaveSyncDataStmt &saveStmt, DataItem &item)
901 {
902 bool isDefeated = false;
903 int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated);
904 if (errCode != E_OK) {
905 LOGE("check data conflict failed. %d", errCode);
906 return errCode;
907 }
908
909 if (isDefeated) {
910 LOGD("Data was defeated.");
911 return E_OK;
912 }
913 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
914 return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
915 }
916 int64_t rowid = -1;
917 errCode = SaveSyncDataItem(item, saveStmt, inserter, rowid);
918 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
919 errCode = SaveSyncLog(saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
920 }
921 return errCode;
922 }
923
SaveSyncDataItems(RelationalSyncDataInserter & inserter)924 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
925 {
926 SaveSyncDataStmt saveStmt;
927 int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
928 if (errCode != E_OK) {
929 LOGE("Prepare insert sync data statement failed.");
930 return errCode;
931 }
932
933 errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
934 if (item.neglect) { // Do not save this record if it is neglected
935 return E_OK;
936 }
937 int errCode = SaveSyncDataItem(inserter, saveStmt, item);
938 if (errCode != E_OK) {
939 LOGE("save sync data item failed. err=%d", errCode);
940 return errCode;
941 }
942 // Need not reset rmDataStmt and rmLogStmt here.
943 return saveStmt.ResetStatements(false);
944 });
945
946 int ret = saveStmt.ResetStatements(true);
947 return errCode != E_OK ? errCode : ret;
948 }
949
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)950 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
951 {
952 if (useTrans) {
953 int errCode = StartTransaction(TransactType::IMMEDIATE);
954 if (errCode != E_OK) {
955 return errCode;
956 }
957 }
958
959 int errCode = SetLogTriggerStatus(false);
960 if (errCode != E_OK) {
961 goto END;
962 }
963
964 errCode = SaveSyncDataItems(inserter);
965 if (errCode != E_OK) {
966 LOGE("Save sync data items failed. errCode=%d", errCode);
967 goto END;
968 }
969
970 errCode = SetLogTriggerStatus(true);
971 END:
972 if (useTrans) {
973 if (errCode == E_OK) {
974 errCode = Commit();
975 } else {
976 (void)Rollback(); // Keep the error code of the first scene
977 }
978 }
979 return errCode;
980 }
981
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const982 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
983 bool isGettingDeletedData) const
984 {
985 RowDataWithLog data;
986 int errCode = GetLogData(stmt, data.logInfo);
987 if (errCode != E_OK) {
988 LOGE("relational data value transfer to kv fail");
989 return errCode;
990 }
991
992 if (!isGettingDeletedData) {
993 for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
994 DataValue value;
995 errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
996 value);
997 if (errCode != E_OK) {
998 return errCode;
999 }
1000 data.rowData.push_back(std::move(value)); // sorted by cid
1001 }
1002 }
1003
1004 errCode = DataTransformer::SerializeDataItem(data,
1005 isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
1006 if (errCode != E_OK) {
1007 LOGE("relational data value transfer to kv fail");
1008 }
1009 return errCode;
1010 }
1011
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1012 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1013 {
1014 int errCode = GetDataItemForSync(fullStmt, item, false);
1015 if (errCode != E_OK) {
1016 return errCode;
1017 }
1018 item.value = {};
1019 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1020 return errCode;
1021 }
1022
1023 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1024 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp ×tamp)
1025 {
1026 if (stmt == nullptr) {
1027 return -E_INVALID_ARGS;
1028 }
1029 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1030 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1031 timestamp = INT64_MAX;
1032 errCode = E_OK;
1033 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1034 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
1035 errCode = E_OK;
1036 }
1037 return errCode;
1038 }
1039
StepNext(bool isMemDB,sqlite3_stmt * stmt)1040 int StepNext(bool isMemDB, sqlite3_stmt *stmt)
1041 {
1042 if (stmt == nullptr) {
1043 return -E_INVALID_ARGS;
1044 }
1045 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1046 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1047 errCode = -E_FINISHED;
1048 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1049 errCode = E_OK;
1050 }
1051 return errCode;
1052 }
1053
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1054 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1055 std::vector<DataItem> &dataItems, DataItem &&item)
1056 {
1057 // If one record is over 4M, ignore it.
1058 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1059 overLongSize++;
1060 } else {
1061 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1062 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1063 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1064 return -E_UNFINISHED;
1065 } else {
1066 dataItems.push_back(item);
1067 }
1068 }
1069 return E_OK;
1070 }
1071 }
1072
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1073 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1074 sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1075 {
1076 if (!isFirstTime) { // For the first time, never step before, can get nothing
1077 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1078 if (errCode != E_OK) {
1079 return errCode;
1080 }
1081 }
1082 return StepNext(isMemDb_, queryStmt, queryTime);
1083 }
1084
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1085 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1086 Timestamp &missQueryTime)
1087 {
1088 int errCode = GetMissQueryData(fullStmt, item);
1089 if (errCode != E_OK) {
1090 return errCode;
1091 }
1092 return StepNext(isMemDb_, fullStmt, missQueryTime);
1093 }
1094
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1095 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1096 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1097 const TableInfo &tableInfo)
1098 {
1099 baseTblName_ = tableInfo.GetTableName();
1100 SetTableInfo(tableInfo);
1101 sqlite3_stmt *queryStmt = nullptr;
1102 sqlite3_stmt *fullStmt = nullptr;
1103 bool isGettingDeletedData = false;
1104 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1105 if (errCode != E_OK) {
1106 return errCode;
1107 }
1108
1109 Timestamp queryTime = 0;
1110 Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1111
1112 bool isFirstTime = true;
1113 size_t dataTotalSize = 0;
1114 size_t overLongSize = 0;
1115 do {
1116 DataItem item;
1117 if (queryTime < missQueryTime) {
1118 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1119 } else if (queryTime == missQueryTime) {
1120 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1121 if (errCode != E_OK) {
1122 break;
1123 }
1124 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1125 } else {
1126 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1127 }
1128
1129 if (errCode == E_OK && !isFirstTime) {
1130 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1131 }
1132
1133 if (errCode != E_OK) {
1134 break;
1135 }
1136
1137 isFirstTime = false;
1138 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1139 errCode = -E_FINISHED;
1140 break;
1141 }
1142 } while (true);
1143 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1144 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1145 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1146 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1147 return errCode;
1148 }
1149
CheckDBModeForRelational()1150 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1151 {
1152 std::string journalMode;
1153 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1154
1155 for (auto &c : journalMode) { // convert to lowercase
1156 c = static_cast<char>(std::tolower(c));
1157 }
1158
1159 if (errCode == E_OK && journalMode != "wal") {
1160 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1161 return -E_NOT_SUPPORT;
1162 }
1163 return errCode;
1164 }
1165
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1166 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1167 const std::string &tableName)
1168 {
1169 std::vector<std::string> deviceTables;
1170 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1171 if (errCode != E_OK) {
1172 LOGE("Get device table name for alter table failed. %d", errCode);
1173 return errCode;
1174 }
1175
1176 LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1177 for (const auto &table : deviceTables) {
1178 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1179 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1180 if (errCode != E_OK) {
1181 LOGE("Delete device data failed. %d", errCode);
1182 break;
1183 }
1184 }
1185 return errCode;
1186 }
1187
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1188 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1189 {
1190 std::string deleteLogSql =
1191 "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName +
1192 "_log WHERE flag&0x02=0 and (cloud_gid = '' or cloud_gid is null)";
1193 return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1194 }
1195
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1196 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1197 const std::string &tableName)
1198 {
1199 std::string deleteLogSql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE device = ?";
1200 sqlite3_stmt *deleteLogStmt = nullptr;
1201 int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1202 if (errCode != E_OK) {
1203 LOGE("Get delete device data log statement failed. %d", errCode);
1204 return errCode;
1205 }
1206
1207 errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1208 if (errCode != E_OK) {
1209 LOGE("Bind device to delete data log statement failed. %d", errCode);
1210 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1211 return errCode;
1212 }
1213
1214 errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1215 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1216 errCode = E_OK;
1217 } else {
1218 LOGE("Delete data log failed. %d", errCode);
1219 }
1220
1221 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1222 return errCode;
1223 }
1224
DeleteDistributedLogTable(const std::string & tableName)1225 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1226 {
1227 if (tableName.empty()) {
1228 return -E_INVALID_ARGS;
1229 }
1230 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1231 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1232 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1233 if (errCode != E_OK) {
1234 LOGE("Delete distributed log table failed. %d", errCode);
1235 }
1236 return errCode;
1237 }
1238
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1239 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1240 std::vector<std::string> &missingTables)
1241 {
1242 if (tableNames.empty()) {
1243 return E_OK;
1244 }
1245 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1246 sqlite3_stmt *stmt = nullptr;
1247 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1248 if (errCode != E_OK) {
1249 SQLiteUtils::ResetStatement(stmt, true, errCode);
1250 return errCode;
1251 }
1252 for (const auto &tableName : tableNames) {
1253 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1254 if (errCode != E_OK) {
1255 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1256 break;
1257 }
1258
1259 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1260 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1261 errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1262 if (errCode != E_OK) {
1263 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1264 break;
1265 }
1266 errCode = DeleteDistributedLogTable(tableName);
1267 if (errCode != E_OK) {
1268 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1269 break;
1270 }
1271 missingTables.emplace_back(tableName);
1272 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1273 LOGE("Check distributed table failed. %d", errCode);
1274 break;
1275 }
1276 errCode = E_OK; // Check result ok for distributed table is still exists
1277 SQLiteUtils::ResetStatement(stmt, false, errCode);
1278 }
1279 SQLiteUtils::ResetStatement(stmt, true, errCode);
1280 return CheckCorruptedStatus(errCode);
1281 }
1282
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1283 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1284 const TableInfo &baseTbl, const StoreInfo &info)
1285 {
1286 if (dbHandle_ == nullptr) {
1287 return -E_INVALID_DB;
1288 }
1289
1290 if (device.empty() || !baseTbl.IsValid()) {
1291 return -E_INVALID_ARGS;
1292 }
1293
1294 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1295 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1296 if (errCode != E_OK) {
1297 LOGE("Create device table failed. %d", errCode);
1298 return errCode;
1299 }
1300
1301 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1302 if (errCode != E_OK) {
1303 LOGE("Copy index to device table failed. %d", errCode);
1304 }
1305 return errCode;
1306 }
1307
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1308 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1309 const std::string &schemaVersion)
1310 {
1311 if (dbHandle_ == nullptr) {
1312 return -E_INVALID_DB;
1313 }
1314
1315 TableInfo newTable;
1316 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1317 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1318 LOGE("Check new schema failed. %d", errCode);
1319 return errCode;
1320 } else {
1321 errCode = table.CompareWithTable(newTable, schemaVersion);
1322 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1323 LOGE("Check schema failed, schema was changed. %d", errCode);
1324 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1325 } else {
1326 errCode = E_OK;
1327 }
1328 }
1329
1330 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1331 if (errCode != E_OK) {
1332 LOGE("Get query helper for check query failed. %d", errCode);
1333 return errCode;
1334 }
1335
1336 if (!query.IsQueryForRelationalDB()) {
1337 LOGE("Not support for this query type.");
1338 return -E_NOT_SUPPORT;
1339 }
1340
1341 SyncTimeRange defaultTimeRange;
1342 sqlite3_stmt *stmt = nullptr;
1343 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1344 stmt);
1345 if (errCode != E_OK) {
1346 LOGE("Get query statement for check query failed. %d", errCode);
1347 }
1348
1349 SQLiteUtils::ResetStatement(stmt, true, errCode);
1350 return errCode;
1351 }
1352
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1353 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1354 Timestamp &maxTimestamp) const
1355 {
1356 maxTimestamp = 0;
1357 for (const auto &tableName : tableNames) {
1358 const std::string sql = "SELECT max(timestamp) from " + DBConstant::RELATIONAL_PREFIX + tableName + "_log;";
1359 sqlite3_stmt *stmt = nullptr;
1360 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1361 if (errCode != E_OK) {
1362 return errCode;
1363 }
1364 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1365 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1366 maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1367 errCode = E_OK;
1368 }
1369 SQLiteUtils::ResetStatement(stmt, true, errCode);
1370 if (errCode != E_OK) {
1371 maxTimestamp = 0;
1372 return errCode;
1373 }
1374 }
1375 return E_OK;
1376 }
1377
SetLogTriggerStatus(bool status)1378 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1379 {
1380 const std::string key = "log_trigger_switch";
1381 std::string val = status ? "true" : "false";
1382 std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
1383 " VALUES ('" + key + "', '" + val + "')";
1384 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1385 if (errCode != E_OK) {
1386 LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1387 }
1388 return errCode;
1389 }
1390
1391 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1392 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1393 std::vector<RelationalRowData *> &data)
1394 {
1395 size_t totalLength = 0;
1396 do {
1397 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1398 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1399 return E_OK;
1400 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1401 LOGE("Get data by bind sql failed:%d", errCode);
1402 return errCode;
1403 }
1404
1405 if (colNames.empty()) {
1406 SQLiteUtils::GetSelectCols(stmt, colNames); // Get column names.
1407 }
1408 auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1409 if (relaRowData == nullptr) {
1410 LOGE("ExecuteQueryBySqlStmt OOM");
1411 return -E_OUT_OF_MEMORY;
1412 }
1413
1414 auto dataSz = relaRowData->CalcLength();
1415 if (dataSz == 0) { // invalid data
1416 delete relaRowData;
1417 relaRowData = nullptr;
1418 continue;
1419 }
1420
1421 totalLength += static_cast<size_t>(dataSz);
1422 if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) { // the set has been full
1423 delete relaRowData;
1424 relaRowData = nullptr;
1425 LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1426 return -E_REMOTE_OVER_SIZE;
1427 }
1428 data.push_back(relaRowData);
1429 } while (true);
1430 return E_OK;
1431 }
1432 }
1433
1434 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1435 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1436 const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1437 std::vector<RelationalRowData *> &data)
1438 {
1439 int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1440 if (errCode != E_OK) {
1441 return errCode;
1442 }
1443
1444 sqlite3_stmt *stmt = nullptr;
1445 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1446 if (errCode != E_OK) {
1447 (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1448 return errCode;
1449 }
1450 ResFinalizer finalizer([this, &stmt, &errCode] {
1451 (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1452 SQLiteUtils::ResetStatement(stmt, true, errCode);
1453 });
1454 for (size_t i = 0; i < bindArgs.size(); ++i) {
1455 errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1456 if (errCode != E_OK) {
1457 return errCode;
1458 }
1459 }
1460 return GetRowDatas(stmt, isMemDb_, colNames, data);
1461 }
1462
CheckEncryptedOrCorrupted() const1463 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1464 {
1465 if (dbHandle_ == nullptr) {
1466 return -E_INVALID_DB;
1467 }
1468
1469 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1470 if (errCode != E_OK) {
1471 LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1472 }
1473 return errCode;
1474 }
1475
GetExistsDeviceList(std::set<std::string> & devices) const1476 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1477 {
1478 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1479 isMemDb_, devices);
1480 }
1481
GetUploadCount(const std::string & tableName,const Timestamp & timestamp,const bool isCloudForcePush,int64_t & count)1482 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const std::string &tableName,
1483 const Timestamp ×tamp, const bool isCloudForcePush, int64_t &count)
1484 {
1485 std::string sql = isCloudForcePush ?
1486 ("SELECT count(rowid) from '" + DBCommon::GetLogTableName(tableName)
1487 + "' where timestamp > ? and (flag & 0x04) != 0x04 and (cloud_gid != ''"
1488 " or (cloud_gid == '' and (flag & 0x01) = 0 ));"):
1489 ("SELECT count(rowid) from '" + DBCommon::GetLogTableName(tableName)
1490 + "' where timestamp > ? and (flag & 0x02) = 0x02 and (cloud_gid != ''"
1491 " or (cloud_gid == '' and (flag & 0x01) = 0 ));");
1492
1493 sqlite3_stmt *stmt = nullptr;
1494 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1495 if (errCode != E_OK) {
1496 return errCode;
1497 }
1498 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timestamp);
1499 if (errCode != E_OK) {
1500 SQLiteUtils::ResetStatement(stmt, true, errCode);
1501 return errCode;
1502 }
1503 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1504 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1505 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1506 errCode = E_OK;
1507 } else {
1508 LOGE("Failed to get the count to be uploaded. %d", errCode);
1509 }
1510 SQLiteUtils::ResetStatement(stmt, true, errCode);
1511 LOGD("upload count is %d, isCloudForcePush is %d", count, isCloudForcePush);
1512 return errCode;
1513 }
1514
UpdateCloudLogGid(const CloudSyncData & cloudDataResult)1515 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult)
1516 {
1517 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1518 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1519 return -E_INVALID_ARGS;
1520 }
1521 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1522 + "' SET cloud_gid = ? where data_key = ? ";
1523 sqlite3_stmt *stmt = nullptr;
1524 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1525 if (errCode != E_OK) {
1526 return errCode;
1527 }
1528 for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
1529 auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
1530 int64_t rowid = cloudDataResult.insData.rowid[i];
1531 if (gidEntry == cloudDataResult.insData.extend[i].end()) {
1532 errCode = -E_INVALID_ARGS;
1533 break;
1534 }
1535 std::string val;
1536 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
1537 cloudDataResult.insData.extend[i], val) != E_OK) {
1538 errCode = -E_INVALID_DATA;
1539 break;
1540 }
1541 if (val.empty()) {
1542 errCode = -E_CLOUD_ERROR;
1543 break;
1544 }
1545 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, val);
1546 if (errCode != E_OK) {
1547 break;
1548 }
1549 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means the second bind args
1550 if (errCode != E_OK) {
1551 break;
1552 }
1553 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1554 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1555 errCode = E_OK;
1556 SQLiteUtils::ResetStatement(stmt, false, errCode);
1557 } else {
1558 LOGE("Update cloud log failed:%d", errCode);
1559 break;
1560 }
1561 }
1562 SQLiteUtils::ResetStatement(stmt, true, errCode);
1563 return errCode;
1564 }
1565
GetSyncCloudData(CloudSyncData & cloudDataResult,const uint32_t & maxSize,SQLiteSingleVerRelationalContinueToken & token)1566 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(CloudSyncData &cloudDataResult,
1567 const uint32_t &maxSize, SQLiteSingleVerRelationalContinueToken &token)
1568 {
1569 token.GetCloudTableSchema(tableSchema_);
1570 sqlite3_stmt *queryStmt = nullptr;
1571 bool isStepNext = false;
1572 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult.isCloudForcePushStrategy, queryStmt, isStepNext);
1573 if (errCode != E_OK) {
1574 (void)token.ReleaseCloudStatement();
1575 return errCode;
1576 }
1577 uint32_t totalSize = 0;
1578 uint32_t stepNum = 0;
1579 do {
1580 if (isStepNext) {
1581 errCode = StepNext(isMemDb_, queryStmt);
1582 if (errCode != E_OK) {
1583 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1584 break;
1585 }
1586 }
1587 isStepNext = true;
1588 errCode = GetCloudDataForSync(queryStmt, cloudDataResult, stepNum++, totalSize, maxSize);
1589 } while (errCode == E_OK);
1590 LOGD("Get cloud sync data, insData:%u, upData:%u, delLog:%u", cloudDataResult.insData.record.size(),
1591 cloudDataResult.updData.record.size(), cloudDataResult.delData.extend.size());
1592 if (errCode != -E_UNFINISHED) {
1593 (void)token.ReleaseCloudStatement();
1594 }
1595 return errCode;
1596 }
1597
GetCloudDataForSync(sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t stepNum,uint32_t & totalSize,const uint32_t & maxSize)1598 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(sqlite3_stmt *statement,
1599 CloudSyncData &cloudDataResult, uint32_t stepNum, uint32_t &totalSize, const uint32_t &maxSize)
1600 {
1601 VBucket log;
1602 VBucket extraLog;
1603 GetCloudLog(statement, log, totalSize);
1604 GetCloudExtraLog(statement, extraLog);
1605
1606 VBucket data;
1607 int64_t flag = 0;
1608 int errCode = CloudStorageUtils::GetValueFromVBucket(FLAG, extraLog, flag);
1609 if (errCode != E_OK) {
1610 return errCode;
1611 }
1612 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1613 for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1614 Type cloudValue;
1615 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1616 tableSchema_.fields[cid].type, cid + 8, cloudValue); // 8 is the start index of query cloud data
1617 if (errCode != E_OK) {
1618 return errCode;
1619 }
1620 SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1621 errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1622 if (errCode != E_OK) {
1623 return errCode;
1624 }
1625 }
1626 }
1627
1628 if (IsGetCloudDataContinue(stepNum, totalSize, maxSize)) {
1629 errCode = IdentifyCloudType(cloudDataResult, data, log, extraLog);
1630 } else {
1631 errCode = -E_UNFINISHED;
1632 }
1633 return errCode;
1634 }
1635
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1636 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1637 {
1638 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1639 Asset asset;
1640 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1641 if (errCode != E_OK) {
1642 return errCode;
1643 }
1644 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1645 return -E_CLOUD_INVALID_ASSET;
1646 }
1647 vBucket.insert_or_assign(field.colName, asset);
1648 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1649 Assets assets;
1650 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1651 if (errCode != E_OK) {
1652 return errCode;
1653 }
1654 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1655 return -E_CLOUD_ERROR;
1656 }
1657 if (!CloudStorageUtils::CheckAssetStatus(assets)) {
1658 return -E_CLOUD_INVALID_ASSET;
1659 }
1660 vBucket.insert_or_assign(field.colName, assets);
1661 } else {
1662 vBucket.insert_or_assign(field.colName, cloudValue);
1663 }
1664 return E_OK;
1665 }
1666
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1667 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
1668 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1669 {
1670 std::string querySql;
1671 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1672 std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
1673 int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
1674 if (errCode != E_OK) {
1675 LOGE("Get query log sql fail, %d", errCode);
1676 return errCode;
1677 }
1678
1679 sqlite3_stmt *selectStmt = nullptr;
1680 errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, pkSet, selectStmt);
1681 if (errCode != E_OK) {
1682 LOGE("Get query log statement fail, %d", errCode);
1683 return errCode;
1684 }
1685
1686 bool alreadyFound = false;
1687 do {
1688 errCode = SQLiteUtils::StepWithRetry(selectStmt);
1689 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1690 if (alreadyFound) {
1691 LOGE("found more than one records in log table for one primary key or gid.");
1692 errCode = -E_CLOUD_ERROR;
1693 break;
1694 }
1695 alreadyFound = true;
1696 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1697 errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
1698 if (errCode != E_OK) {
1699 LOGE("Get info by statement fail, %d", errCode);
1700 break;
1701 }
1702 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1703 errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
1704 break;
1705 } else {
1706 LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
1707 break;
1708 }
1709 } while (errCode == E_OK);
1710
1711 int ret = E_OK;
1712 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1713 return errCode != E_OK ? errCode : ret;
1714 }
1715
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)1716 void SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
1717 {
1718 logInfo.dataKey = sqlite3_column_int64(statement, 0);
1719 std::vector<uint8_t> device;
1720 (void)SQLiteUtils::GetColumnBlobValue(statement, 1, device); // 1 is device
1721 DBCommon::VectorToString(device, logInfo.device);
1722 std::vector<uint8_t> originDev;
1723 (void)SQLiteUtils::GetColumnBlobValue(statement, 2, originDev); // 2 is originDev
1724 DBCommon::VectorToString(originDev, logInfo.originDev);
1725 logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, 3)); // 3 is timestamp
1726 logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, 4)); // 4 is wtimestamp
1727 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, 5)); // 5 is flag
1728 (void)SQLiteUtils::GetColumnBlobValue(statement, 6, logInfo.hashKey); // 6 is hash_key
1729 (void)SQLiteUtils::GetColumnTextValue(statement, 7, logInfo.cloudGid); // 7 is cloud_gid
1730 }
1731
GetInfoByStatement(sqlite3_stmt * statement,std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1732 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
1733 std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
1734 VBucket &assetInfo)
1735 {
1736 GetLogInfoByStatement(statement, dataInfoWithLog.logInfo);
1737 int index = 8; // 8 is start index of assetInfo or primary key
1738 int errCode = E_OK;
1739 for (const auto &field: assetFields) {
1740 Type cloudValue;
1741 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
1742 if (errCode != E_OK) {
1743 break;
1744 }
1745 errCode = PutVBucketByType(assetInfo, field, cloudValue);
1746 if (errCode != E_OK) {
1747 break;
1748 }
1749 }
1750 if (errCode != E_OK) {
1751 LOGE("set asset field failed, errCode = %d", errCode);
1752 return errCode;
1753 }
1754
1755 // fill primary key
1756 for (const auto &item : pkMap) {
1757 Type cloudValue;
1758 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
1759 if (errCode != E_OK) {
1760 break;
1761 }
1762 errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
1763 if (errCode != E_OK) {
1764 break;
1765 }
1766 }
1767 return errCode;
1768 }
1769
GetInsertSqlForCloudSync(const TableSchema & tableSchema)1770 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
1771 {
1772 std::string sql = "insert into " + tableSchema.name + "(";
1773 for (const auto &field : tableSchema.fields) {
1774 sql += field.colName + ",";
1775 }
1776 sql.pop_back();
1777 sql += ") values(";
1778 for (size_t i = 0; i < tableSchema.fields.size(); i++) {
1779 sql += "?,";
1780 }
1781 sql.pop_back();
1782 sql += ");";
1783 return sql;
1784 }
1785
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)1786 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
1787 const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
1788 {
1789 int errCode = E_OK;
1790 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1791 if (pkMap.size() == 0) {
1792 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
1793 std::vector<uint8_t> value;
1794 DBCommon::StringToVector(std::to_string(rowid), value);
1795 errCode = DBCommon::CalcValueHash(value, hashValue);
1796 } else if (pkMap.size() == 1) {
1797 std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
1798 errCode = CloudStorageUtils::CalculateHashKeyForOneField(pkVec.at(0), vBucket, allowEmpty, hashValue);
1799 } else {
1800 std::vector<uint8_t> tempRes;
1801 for (const auto &item: pkMap) {
1802 std::vector<uint8_t> temp;
1803 errCode = CloudStorageUtils::CalculateHashKeyForOneField(item.second, vBucket, allowEmpty, temp);
1804 if (errCode != E_OK) {
1805 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1806 return errCode;
1807 }
1808 tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1809 }
1810 errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1811 }
1812 return errCode;
1813 }
1814
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,std::set<std::string> & pkSet,sqlite3_stmt * & selectStmt)1815 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
1816 const VBucket &vBucket, const std::string &querySql, std::set<std::string> &pkSet, sqlite3_stmt *&selectStmt)
1817 {
1818 int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
1819 if (errCode != E_OK) {
1820 LOGE("Get select log statement failed, %d", errCode);
1821 return errCode;
1822 }
1823
1824 std::string cloudGid;
1825 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
1826 if (errCode != E_OK) {
1827 LOGE("Get cloud gid fail when bind query log statement.");
1828 return errCode;
1829 }
1830
1831 int index = 0;
1832 if (!cloudGid.empty()) {
1833 index++;
1834 errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
1835 if (errCode != E_OK) {
1836 LOGE("Bind cloud gid to query log statement failed. %d", errCode);
1837 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1838 return errCode;
1839 }
1840 }
1841
1842 std::vector<uint8_t> hashValue;
1843 if (!pkSet.empty()) {
1844 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashValue, true);
1845 }
1846 if (errCode != E_OK) {
1847 LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
1848 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1849 return errCode;
1850 }
1851
1852 index++;
1853 errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashValue, true);
1854 int ret = E_OK;
1855 if (errCode != E_OK) {
1856 LOGE("Bind hash key to query log statement failed. %d", errCode);
1857 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1858 }
1859 return errCode != E_OK ? errCode : ret;
1860 }
1861
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::string & querySql)1862 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
1863 std::set<std::string> &pkSet, std::string &querySql)
1864 {
1865 std::string cloudGid;
1866 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
1867 if (errCode != E_OK) {
1868 LOGE("Get cloud gid fail when query log table.");
1869 return errCode;
1870 }
1871
1872 if (pkSet.empty() && cloudGid.empty()) {
1873 LOGE("query log table failed because of both primary key and gid are empty.");
1874 return -E_CLOUD_ERROR;
1875 }
1876 std::string sql = "select data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid FROM "
1877 + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
1878 if (!cloudGid.empty()) {
1879 sql += "cloud_gid = ? or ";
1880 }
1881 sql += "hash_key = ?";
1882
1883 querySql = sql;
1884 return E_OK;
1885 }
1886
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,DownloadData & downloadData,std::map<int,int> & statisticMap)1887 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
1888 const TableSchema &tableSchema, DownloadData &downloadData, std::map<int, int> &statisticMap)
1889 {
1890 int index = 0;
1891 int errCode = E_OK;
1892 for (OpType op : downloadData.opType) {
1893 VBucket &vBucket = downloadData.data[index];
1894 switch (op) {
1895 case OpType::INSERT:
1896 errCode = InsertCloudData(tableName, vBucket, tableSchema);
1897 break;
1898 case OpType::UPDATE:
1899 errCode = UpdateCloudData(tableName, vBucket, tableSchema);
1900 break;
1901 case OpType::DELETE:
1902 errCode = DeleteCloudData(tableName, vBucket, tableSchema);
1903 break;
1904 case OpType::ONLY_UPDATE_GID:
1905 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1906 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1907 case OpType::UPDATE_TIMESTAMP:
1908 case OpType::CLEAR_GID:
1909 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
1910 break;
1911 case OpType::NOT_HANDLE:
1912 break;
1913 default:
1914 errCode = -E_CLOUD_ERROR;
1915 break;
1916 }
1917 if (errCode != E_OK) {
1918 LOGE("put cloud sync data fail: %d", errCode);
1919 return errCode;
1920 }
1921 statisticMap[static_cast<int>(op)]++;
1922 index++;
1923 }
1924 return errCode;
1925 }
1926
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1927 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
1928 const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
1929 std::vector<Asset> &assets)
1930 {
1931 int errCode = SetLogTriggerStatus(false);
1932 if (errCode != E_OK) {
1933 LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
1934 return errCode;
1935 }
1936 if (mode == FLAG_ONLY) {
1937 errCode = DoCleanLogs(tableNameList);
1938 if (errCode != E_OK) {
1939 LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
1940 return errCode;
1941 }
1942 } else if (mode == FLAG_AND_DATA) {
1943 errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
1944 if (errCode != E_OK) {
1945 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
1946 return errCode;
1947 }
1948 }
1949 errCode = SetLogTriggerStatus(true);
1950 if (errCode != E_OK) {
1951 LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
1952 }
1953
1954 return errCode;
1955 }
1956
DoCleanLogs(const std::vector<std::string> & tableNameList)1957 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList)
1958 {
1959 int errCode = E_OK;
1960 int i = 1;
1961 for (const auto &tableName: tableNameList) {
1962 std::string logTableName = DBCommon::GetLogTableName(tableName);
1963 LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
1964 errCode = CleanCloudDataOnLogTable(logTableName);
1965 if (errCode != E_OK) {
1966 LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
1967 return errCode;
1968 }
1969 i++;
1970 }
1971
1972 return errCode;
1973 }
1974
CleanCloudDataOnLogTable(const std::string & logTableName)1975 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName)
1976 {
1977 std::string cleanLogSql = "UPDATE '" + logTableName + "' SET " + FLAG + " = " + SET_FLAG_LOCAL + ", " +
1978 DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '' WHERE " + CLOUD_GID_FIELD + " is not NULL and " +
1979 CLOUD_GID_FIELD + " != '';";
1980 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1981 }
1982
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName)1983 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1984 const std::string &logTableName)
1985 {
1986 std::string sql = "DELETE FROM '" + tableName + "' WHERE " + ROWID +" in (SELECT " + DATAKEY +
1987 " FROM '" + logTableName + "' WHERE CLOUD_GID is not NULL and CLOUD_GID != '' and " + FLAG_IS_CLOUD + ");";
1988 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1989 if (errCode != E_OK) {
1990 LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1991 return errCode;
1992 }
1993 std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD + " is not NULL and " +
1994 CLOUD_GID_FIELD + " != '' AND " + FLAG_IS_CLOUD + ";";
1995 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1996 if (errCode != E_OK) {
1997 LOGE("Failed to delete cloud data on log table, %d.", errCode);
1998 return errCode;
1999 }
2000 errCode = CleanCloudDataOnLogTable(logTableName);
2001 if (errCode != E_OK) {
2002 LOGE("Failed to clean gid on log table, %d.", errCode);
2003 }
2004 return errCode;
2005 }
2006
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys)2007 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
2008 std::vector<int64_t> &dataKeys)
2009 {
2010 sqlite3_stmt *selectStmt = nullptr;
2011 std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
2012 " is not NULL and " + CLOUD_GID_FIELD + " != '' AND " + FLAG_IS_CLOUD + ";";
2013
2014 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
2015 if (errCode != E_OK) {
2016 LOGE("Get select data_key statement failed, %d", errCode);
2017 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2018 return errCode;
2019 }
2020 do {
2021 errCode = SQLiteUtils::StepWithRetry(selectStmt);
2022 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2023 dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
2024 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2025 LOGE("SQLite step failed when query log's data_key : %d", errCode);
2026 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2027 break;
2028 }
2029 } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
2030 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2031 return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
2032 }
2033
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)2034 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
2035 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
2036 {
2037 int errCode = E_OK;
2038 for (size_t i = 0; i < tableNameList.size(); i++) {
2039 std::string tableName = tableNameList[i];
2040 std::string logTableName = DBCommon::GetLogTableName(tableName);
2041 std::vector<int64_t> dataKeys;
2042 errCode = GetCleanCloudDataKeys(logTableName, dataKeys);
2043 if (errCode != E_OK) {
2044 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
2045 return errCode;
2046 }
2047
2048 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
2049 errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
2050 if (errCode != E_OK) {
2051 LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
2052 return errCode;
2053 }
2054
2055 errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName);
2056 if (errCode != E_OK) {
2057 LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
2058 return errCode;
2059 }
2060 }
2061
2062 return errCode;
2063 }
2064
GetCloudAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2065 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetOnTable(const std::string &tableName,
2066 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2067 {
2068 int errCode = E_OK;
2069 for (const auto &rowId : dataKeys) {
2070 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
2071 "' WHERE " + ROWID + " = " + std::to_string(rowId) + ";";
2072 sqlite3_stmt *selectStmt = nullptr;
2073 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
2074 if (errCode != E_OK) {
2075 LOGE("Get select asset statement failed, %d", errCode);
2076 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2077 return errCode;
2078 }
2079 errCode = SQLiteUtils::StepWithRetry(selectStmt);
2080 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2081 std::vector<uint8_t> blobValue;
2082 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
2083 if (errCode != E_OK) {
2084 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2085 return errCode;
2086 }
2087 Asset asset;
2088 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
2089 if (errCode != E_OK) {
2090 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2091 return errCode;
2092 }
2093 assets.push_back(asset);
2094 }
2095 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2096 }
2097 return errCode;
2098 }
2099
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2100 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
2101 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2102 {
2103 int errCode = E_OK;
2104 int ret = E_OK;
2105 sqlite3_stmt *selectStmt = nullptr;
2106 for (const auto &rowId : dataKeys) {
2107 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
2108 "' WHERE " + ROWID + " = " + std::to_string(rowId) + ";";
2109 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
2110 if (errCode != E_OK) {
2111 LOGE("Get select assets statement failed, %d", errCode);
2112 goto END;
2113 }
2114 errCode = SQLiteUtils::StepWithRetry(selectStmt);
2115 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2116 std::vector<uint8_t> blobValue;
2117 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
2118 if (errCode != E_OK) {
2119 goto END;
2120 }
2121 Assets tmpAssets;
2122 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
2123 if (errCode != E_OK) {
2124 goto END;
2125 }
2126 for (const auto &asset: tmpAssets) {
2127 assets.push_back(asset);
2128 }
2129 }
2130 SQLiteUtils::ResetStatement(selectStmt, true, ret);
2131 }
2132 return errCode != E_OK ? errCode : ret;
2133 END:
2134 SQLiteUtils::ResetStatement(selectStmt, true, ret);
2135 return errCode != E_OK ? errCode : ret;
2136 }
2137
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2138 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
2139 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2140 {
2141 int errCode = E_OK;
2142 for (const auto &fieldInfo: fieldInfos) {
2143 if (fieldInfo.IsAssetType()) {
2144 errCode = GetCloudAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
2145 if (errCode != E_OK) {
2146 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
2147 return errCode;
2148 }
2149 } else if (fieldInfo.IsAssetsType()) {
2150 errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
2151 if (errCode != E_OK) {
2152 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
2153 return errCode;
2154 }
2155 }
2156 }
2157 return errCode;
2158 }
2159
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,DownloadData & downloadData)2160 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
2161 const TableSchema &tableSchema, DownloadData &downloadData)
2162 {
2163 if (downloadData.data.size() != downloadData.opType.size()) {
2164 LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
2165 downloadData.opType.size());
2166 return -E_CLOUD_ERROR;
2167 }
2168
2169 int errCode = SetLogTriggerStatus(false);
2170 if (errCode != E_OK) {
2171 LOGE("Fail to set log trigger off, %d", errCode);
2172 return errCode;
2173 }
2174
2175 std::map<int, int> statisticMap = {};
2176 errCode = ExecutePutCloudData(tableName, tableSchema, downloadData, statisticMap);
2177 int ret = SetLogTriggerStatus(true);
2178 if (ret != E_OK) {
2179 LOGE("Fail to set log trigger on, %d", ret);
2180 }
2181 LOGD("save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d, only update gid cnt = %d, "
2182 "set LCC flag zero cnt = %d, set LCC flag one cnt = %d, update timestamp cnt = %d, clear gid count = %d,"
2183 " not handle cnt = %d",
2184 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
2185 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
2186 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
2187 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
2188 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
2189 statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
2190 return errCode == E_OK ? ret : errCode;
2191 }
2192
InsertCloudData(const std::string & tableName,VBucket & vBucket,const TableSchema & tableSchema)2193 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(const std::string &tableName, VBucket &vBucket,
2194 const TableSchema &tableSchema)
2195 {
2196 std::string sql = GetInsertSqlForCloudSync(tableSchema);
2197 sqlite3_stmt *insertStmt = nullptr;
2198 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
2199 if (errCode != E_OK) {
2200 LOGE("Get insert statement failed when save cloud data, %d", errCode);
2201 return errCode;
2202 }
2203 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
2204 errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
2205 if (errCode != E_OK) {
2206 SQLiteUtils::ResetStatement(insertStmt, true, errCode);
2207 return errCode;
2208 }
2209 // insert data
2210 errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
2211 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2212 errCode = E_OK;
2213 } else {
2214 int ret = E_OK;
2215 SQLiteUtils::ResetStatement(insertStmt, true, ret);
2216 LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
2217 return errCode;
2218 }
2219 SQLiteUtils::ResetStatement(insertStmt, true, errCode);
2220
2221 // insert log
2222 return InsertLogRecord(tableSchema, vBucket);
2223 }
2224
InsertLogRecord(const TableSchema & tableSchema,VBucket & vBucket)2225 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema, VBucket &vBucket)
2226 {
2227 if (!CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
2228 // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
2229 // so we need to delete the old log record according to the gid first
2230 std::string gidStr;
2231 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2232 if (errCode != E_OK || gidStr.empty()) {
2233 LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
2234 return errCode;
2235 }
2236 std::string sql = "delete from " + DBCommon::GetLogTableName(tableSchema.name) + " where cloud_gid = '"
2237 + gidStr + "';";
2238 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2239 if (errCode != E_OK) {
2240 LOGE("delete log record according gid fail, errCode = %d", errCode);
2241 return errCode;
2242 }
2243 }
2244
2245 std::string sql = "insert or replace into " + DBCommon::GetLogTableName(tableSchema.name) +
2246 " values(?, ?, ?, ?, ?, ?, ?, ?)";
2247 sqlite3_stmt *insertLogStmt = nullptr;
2248 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
2249 if (errCode != E_OK) {
2250 LOGE("Get insert log statement failed when save cloud data, %d", errCode);
2251 return errCode;
2252 }
2253
2254 errCode = BindValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
2255 if (errCode != E_OK) {
2256 SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
2257 return errCode;
2258 }
2259
2260 errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
2261 int ret = E_OK;
2262 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2263 errCode = E_OK;
2264 } else {
2265 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
2266 LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
2267 return errCode;
2268 }
2269
2270 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
2271 return errCode != E_OK ? errCode : ret;
2272 }
2273
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)2274 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
2275 sqlite3_stmt *updateStmt)
2276 {
2277 auto it = bindCloudFieldFuncMap_.find(field.type);
2278 if (it == bindCloudFieldFuncMap_.end()) {
2279 LOGE("unknown cloud type when bind one field.");
2280 return -E_CLOUD_ERROR;
2281 }
2282 return it->second(index, vBucket, field, updateStmt);
2283 }
2284
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)2285 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
2286 const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
2287 {
2288 int errCode = E_OK;
2289 int index = 0;
2290 for (const auto &field : fields) {
2291 index++;
2292 errCode = BindOneField(index, vBucket, field, upsertStmt);
2293 if (errCode != E_OK) {
2294 return errCode;
2295 }
2296 }
2297 return errCode;
2298 }
2299
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)2300 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
2301 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
2302 {
2303 std::vector<uint8_t> hashKey;
2304 int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
2305 if (errCode != E_OK) {
2306 return errCode;
2307 }
2308 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
2309 if (errCode != E_OK) {
2310 LOGE("Bind hash_key to insert log statement failed, %d", errCode);
2311 return errCode;
2312 }
2313
2314 std::string cloudGid;
2315 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
2316 if (errCode != E_OK) {
2317 LOGE("get gid for insert log statement failed, %d", errCode);
2318 return -E_CLOUD_ERROR;
2319 }
2320
2321 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
2322 if (errCode != E_OK) {
2323 LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
2324 }
2325 return errCode;
2326 }
2327
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)2328 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
2329 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
2330 {
2331 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
2332 int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
2333 if (errCode != E_OK) {
2334 LOGE("Bind rowid to insert log statement failed, %d", errCode);
2335 return errCode;
2336 }
2337
2338 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, "cloud"); // 2 is device
2339 if (errCode != E_OK) {
2340 LOGE("Bind device to insert log statement failed, %d", errCode);
2341 return errCode;
2342 }
2343
2344 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, "cloud"); // 3 is ori_device
2345 if (errCode != E_OK) {
2346 LOGE("Bind ori_device to insert log statement failed, %d", errCode);
2347 return errCode;
2348 }
2349
2350 int64_t val = 0;
2351 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
2352 if (errCode != E_OK) {
2353 LOGE("get modify time for insert log statement failed, %d", errCode);
2354 return -E_CLOUD_ERROR;
2355 }
2356
2357 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
2358 if (errCode != E_OK) {
2359 LOGE("Bind timestamp to insert log statement failed, %d", errCode);
2360 return errCode;
2361 }
2362
2363 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
2364 if (errCode != E_OK) {
2365 LOGE("get create time for insert log statement failed, %d", errCode);
2366 return -E_CLOUD_ERROR;
2367 }
2368
2369 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
2370 if (errCode != E_OK) {
2371 LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
2372 return errCode;
2373 }
2374
2375 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, 0)); // 6 is flag
2376 if (errCode != E_OK) {
2377 LOGE("Bind flag to insert log statement failed, %d", errCode);
2378 return errCode;
2379 }
2380
2381 vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
2382 return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
2383 }
2384
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)2385 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
2386 const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
2387 {
2388 std::string where = " where";
2389 if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
2390 where += " rowid = (select data_key from " + DBCommon::GetLogTableName(tableName) +
2391 " where cloud_gid = '" + gidStr + "')";
2392 }
2393 if (!pkSet.empty() && queryByPk) {
2394 if (!gidStr.empty()) {
2395 where += " or";
2396 }
2397 where += " (1 = 1";
2398 for (const auto &pk : pkSet) {
2399 where += (" and " + pk + " = ?");
2400 }
2401 where += ");";
2402 }
2403 return where;
2404 }
2405
GetUpdateSqlForCloudSync(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)2406 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const TableSchema &tableSchema,
2407 const VBucket &vBucket, const std::string &gidStr, const std::set<std::string> &pkSet, std::string &updateSql)
2408 {
2409 if (pkSet.empty() && gidStr.empty()) {
2410 LOGE("update data fail because both primary key and gid is empty.");
2411 return -E_CLOUD_ERROR;
2412 }
2413 std::string sql = "update " + tableSchema.name + " set";
2414 for (const auto &field : tableSchema.fields) {
2415 sql += " " + field.colName + " = ?,";
2416 }
2417 sql.pop_back();
2418 sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
2419 updateSql = sql;
2420 return E_OK;
2421 }
2422
IsGidValid(const std::string & gidStr)2423 static bool IsGidValid(const std::string &gidStr)
2424 {
2425 if (!gidStr.empty()) {
2426 return gidStr.find("'") == std::string::npos;
2427 }
2428 return true;
2429 }
2430
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)2431 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
2432 const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
2433 {
2434 std::string gidStr;
2435 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2436 if (errCode != E_OK) {
2437 LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
2438 return errCode;
2439 }
2440 if (!IsGidValid(gidStr)) {
2441 LOGE("invalid char in cloud gid");
2442 return -E_CLOUD_ERROR;
2443 }
2444
2445 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
2446 std::string updateSql;
2447 errCode = GetUpdateSqlForCloudSync(tableSchema, vBucket, gidStr, pkSet, updateSql);
2448 if (errCode != E_OK) {
2449 return errCode;
2450 }
2451
2452 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
2453 if (errCode != E_OK) {
2454 LOGE("Get update statement failed when update cloud data, %d", errCode);
2455 return errCode;
2456 }
2457
2458 // bind value
2459 std::vector<Field> fields = tableSchema.fields;
2460 if (!pkSet.empty()) {
2461 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
2462 fields.insert(fields.end(), pkFields.begin(), pkFields.end());
2463 }
2464 errCode = BindValueToUpsertStatement(vBucket, fields, updateStmt);
2465 if (errCode != E_OK) {
2466 LOGE("bind value to update statement failed when update cloud data, %d", errCode);
2467 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2468 }
2469 return errCode;
2470 }
2471
UpdateCloudData(const std::string & tableName,VBucket & vBucket,const TableSchema & tableSchema)2472 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(const std::string &tableName, VBucket &vBucket,
2473 const TableSchema &tableSchema)
2474 {
2475 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
2476 sqlite3_stmt *updateStmt = nullptr;
2477 int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
2478 if (errCode != E_OK) {
2479 LOGE("Get update data table statement fail, %d", errCode);
2480 return errCode;
2481 }
2482
2483 // update data
2484 errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
2485 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2486 errCode = E_OK;
2487 } else {
2488 LOGE("update data failed when save cloud data:%d", errCode);
2489 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2490 return errCode;
2491 }
2492 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2493
2494 // update log
2495 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
2496 if (errCode != E_OK) {
2497 LOGE("update log record failed when update cloud data, errCode = %d", errCode);
2498 }
2499 return errCode;
2500 }
2501
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)2502 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
2503 const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
2504 {
2505 std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
2506 if (opType == OpType::ONLY_UPDATE_GID) {
2507 updateLogSql += "cloud_gid = ?";
2508 updateColName.push_back(CloudDbConstant::GID_FIELD);
2509 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
2510 updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
2511 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
2512 updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
2513 } else if (opType == OpType::UPDATE_TIMESTAMP) {
2514 updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
2515 ", timestamp = ?, cloud_gid = ''";
2516 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
2517 } else if (opType == OpType::CLEAR_GID) {
2518 updateLogSql += "cloud_gid = '', flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK);
2519 } else {
2520 if (opType == OpType::DELETE) {
2521 updateLogSql += "data_key = -1, flag = 1, cloud_gid = '', ";
2522 } else {
2523 updateLogSql += "flag = 0, cloud_gid = ?, ";
2524 updateColName.push_back(CloudDbConstant::GID_FIELD);
2525 }
2526 updateLogSql += "device = 'cloud', timestamp = ?";
2527 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
2528 }
2529
2530 std::string gidStr;
2531 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2532 if (errCode != E_OK) {
2533 LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d", errCode);
2534 return errCode;
2535 }
2536
2537 updateLogSql += " where ";
2538 if (!gidStr.empty()) {
2539 updateLogSql += "cloud_gid = '" + gidStr + "'";
2540 }
2541 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
2542 if (!pkMap.empty()) {
2543 if (!gidStr.empty()) {
2544 updateLogSql += " or ";
2545 }
2546 updateLogSql += "(hash_key = ?);";
2547 }
2548
2549 errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
2550 if (errCode != E_OK) {
2551 LOGE("Get update log statement failed when update cloud data, %d", errCode);
2552 }
2553 return errCode;
2554 }
2555
IsAllowWithPrimaryKey(OpType opType)2556 static inline bool IsAllowWithPrimaryKey(OpType opType)
2557 {
2558 return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
2559 opType == OpType::ONLY_UPDATE_GID);
2560 }
2561
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)2562 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
2563 OpType opType)
2564 {
2565 sqlite3_stmt *updateLogStmt = nullptr;
2566 std::vector<std::string> updateColName;
2567 int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
2568 if (errCode != E_OK) {
2569 LOGE("Get update log statement failed, errCode = %d", errCode);
2570 return errCode;
2571 }
2572
2573 errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
2574 updateLogStmt);
2575 int ret = E_OK;
2576 if (errCode != E_OK) {
2577 LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
2578 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
2579 return errCode != E_OK ? errCode : ret;
2580 }
2581
2582 errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
2583 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2584 errCode = E_OK;
2585 } else {
2586 LOGE("update log record failed when update cloud data:%d", errCode);
2587 }
2588 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
2589 return errCode != E_OK ? errCode : ret;
2590 }
2591
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)2592 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
2593 const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
2594 sqlite3_stmt *updateLogStmt)
2595 {
2596 int index = 0;
2597 int errCode = E_OK;
2598 for (const auto &colName : colNames) {
2599 index++;
2600 if (colName == CloudDbConstant::GID_FIELD) {
2601 if (vBucket.find(colName) == vBucket.end()) {
2602 LOGE("cloud data doesn't contain gid field when bind update log stmt.");
2603 return -E_CLOUD_ERROR;
2604 }
2605 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
2606 std::get<std::string>(vBucket.at(colName)));
2607 } else if (colName == CloudDbConstant::MODIFY_FIELD) {
2608 if (vBucket.find(colName) == vBucket.end()) {
2609 LOGE("cloud data doesn't contain modify field when bind update log stmt.");
2610 return -E_CLOUD_ERROR;
2611 }
2612 errCode = SQLiteUtils::BindInt64ToStatement(updateLogStmt, index, std::get<int64_t>(vBucket.at(colName)));
2613 } else {
2614 LOGE("invalid col name when bind value to update log statement.");
2615 return -E_INTERNAL_ERROR;
2616 }
2617 if (errCode != E_OK) {
2618 LOGE("fail to bind value to update log statement.");
2619 return errCode;
2620 }
2621 }
2622 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
2623 if (pkMap.empty()) {
2624 return E_OK;
2625 }
2626
2627 std::vector<uint8_t> hashKey;
2628 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
2629 if (errCode != E_OK) {
2630 return errCode;
2631 }
2632 return SQLiteUtils::BindBlobToStatement(updateLogStmt, index + 1, hashKey);
2633 }
2634
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)2635 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
2636 const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
2637 {
2638 std::string gidStr;
2639 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2640 if (errCode != E_OK) {
2641 LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
2642 return errCode;
2643 }
2644 if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
2645 LOGE("empty or invalid char in cloud gid");
2646 return -E_CLOUD_ERROR;
2647 }
2648
2649 bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
2650 std::string deleteSql = "delete from " + tableSchema.name;
2651 deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
2652 errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
2653 if (errCode != E_OK) {
2654 LOGE("Get delete statement failed when delete data, %d", errCode);
2655 return errCode;
2656 }
2657
2658 int ret = E_OK;
2659 if (!pkSet.empty() && queryByPk) {
2660 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
2661 errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
2662 if (errCode != E_OK) {
2663 LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
2664 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
2665 }
2666 }
2667 return errCode != E_OK ? errCode : ret;
2668 }
2669
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema)2670 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
2671 const TableSchema &tableSchema)
2672 {
2673 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
2674 sqlite3_stmt *deleteStmt = nullptr;
2675 int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
2676 if (errCode != E_OK) {
2677 return errCode;
2678 }
2679
2680 errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
2681 int ret = E_OK;
2682 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
2683 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2684 LOGE("delete data failed when sync with cloud:%d", errCode);
2685 return errCode;
2686 }
2687 if (ret != E_OK) {
2688 LOGE("reset delete statement failed:%d", ret);
2689 return ret;
2690 }
2691
2692 // update log
2693 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
2694 if (errCode != E_OK) {
2695 LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
2696 }
2697 return errCode;
2698 }
2699
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)2700 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
2701 const TableSchema &tableSchema, OpType opType)
2702 {
2703 return UpdateLogRecord(vBucket, tableSchema, opType);
2704 }
2705 } // namespace DistributedDB
2706 #endif
2707