1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37 std::string querySql;
38 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39 std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40 int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41 if (errCode != E_OK) {
42 LOGE("Get query log sql fail, %d", errCode);
43 return errCode;
44 }
45 if (!pkSet.empty()) {
46 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47 if (errCode != E_OK) {
48 LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49 return errCode;
50 }
51 }
52 sqlite3_stmt *selectStmt = nullptr;
53 errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54 if (errCode != E_OK) {
55 LOGE("Get query log statement fail, %d", errCode);
56 return errCode;
57 }
58
59 bool alreadyFound = false;
60 do {
61 errCode = SQLiteUtils::StepWithRetry(selectStmt);
62 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63 if (alreadyFound) {
64 LOGE("found more than one records in log table for one primary key or gid.");
65 errCode = -E_CLOUD_ERROR;
66 break;
67 }
68 alreadyFound = true;
69 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70 errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71 if (errCode != E_OK) {
72 LOGE("Get info by statement fail, %d", errCode);
73 break;
74 }
75 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76 errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77 break;
78 } else {
79 LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80 break;
81 }
82 } while (errCode == E_OK);
83
84 int ret = E_OK;
85 SQLiteUtils::ResetStatement(selectStmt, true, ret);
86 return errCode != E_OK ? errCode : ret;
87 }
88
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91 int index = 0;
92 logInfo.dataKey = sqlite3_column_int64(statement, index++);
93 std::vector<uint8_t> device;
94 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device); // 1 is device
95 DBCommon::VectorToString(device, logInfo.device);
96 std::vector<uint8_t> originDev;
97 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98 DBCommon::VectorToString(originDev, logInfo.originDev);
99 logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100 logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105 logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107 return index;
108 }
109
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111 const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112 VBucket &assetInfo)
113 {
114 int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115 int errCode = E_OK;
116 for (const auto &field: assetFields) {
117 Type cloudValue;
118 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119 if (errCode != E_OK) {
120 break;
121 }
122 errCode = PutVBucketByType(assetInfo, field, cloudValue);
123 if (errCode != E_OK) {
124 break;
125 }
126 }
127 if (errCode != E_OK) {
128 LOGE("set asset field failed, errCode = %d", errCode);
129 return errCode;
130 }
131
132 // fill primary key
133 for (const auto &item : pkMap) {
134 Type cloudValue;
135 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136 if (errCode != E_OK) {
137 break;
138 }
139 errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140 if (errCode != E_OK) {
141 break;
142 }
143 }
144 return errCode;
145 }
146
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149 std::string sql = "insert into " + tableSchema.name + "(";
150 for (const auto &field : tableSchema.fields) {
151 sql += field.colName + ",";
152 }
153 sql.pop_back();
154 sql += ") values(";
155 for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156 sql += "?,";
157 }
158 sql.pop_back();
159 sql += ");";
160 return sql;
161 }
162
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164 const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166 int errCode = E_OK;
167 TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168 // table name in cloud schema is in lower case
169 if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170 LOGE("localSchema doesn't contain table from cloud");
171 return -E_INTERNAL_ERROR;
172 }
173
174 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175 if (pkMap.size() == 0) {
176 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177 std::vector<uint8_t> value;
178 DBCommon::StringToVector(std::to_string(rowid), value);
179 errCode = DBCommon::CalcValueHash(value, hashValue);
180 } else {
181 std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182 tableSchema, localTable, pkMap, allowEmpty);
183 }
184 return errCode;
185 }
186
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188 const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190 int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191 if (errCode != E_OK) {
192 LOGE("Get select log statement failed, %d", errCode);
193 return errCode;
194 }
195
196 std::string cloudGid;
197 int ret = E_OK;
198 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200 SQLiteUtils::ResetStatement(selectStmt, true, ret);
201 LOGE("Get cloud gid fail when bind query log statement.");
202 return errCode;
203 }
204
205 int index = 0;
206 if (!cloudGid.empty()) {
207 index++;
208 errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209 if (errCode != E_OK) {
210 LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212 return errCode;
213 }
214 }
215
216 index++;
217 errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218 if (errCode != E_OK) {
219 LOGE("Bind hash key to query log statement failed. %d", errCode);
220 SQLiteUtils::ResetStatement(selectStmt, true, ret);
221 }
222 return errCode != E_OK ? errCode : ret;
223 }
224
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226 const std::set<std::string> &pkSet, std::string &querySql)
227 {
228 std::string cloudGid;
229 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230 if (errCode != E_OK) {
231 LOGE("Get cloud gid fail when query log table.");
232 return errCode;
233 }
234
235 if (pkSet.empty() && cloudGid.empty()) {
236 LOGE("query log table failed because of both primary key and gid are empty.");
237 return -E_CLOUD_ERROR;
238 }
239 std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240 " sharing_resource, status, version FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
241 if (!cloudGid.empty()) {
242 sql += "cloud_gid = ? OR ";
243 }
244 sql += "hash_key = ?";
245
246 querySql = sql;
247 return E_OK;
248 }
249
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)250 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
251 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
252 std::map<int, int> &statisticMap)
253 {
254 int index = 0;
255 int errCode = E_OK;
256 for (OpType op : downloadData.opType) {
257 VBucket &vBucket = downloadData.data[index];
258 switch (op) {
259 case OpType::INSERT:
260 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
261 break;
262 case OpType::UPDATE:
263 errCode = UpdateCloudData(vBucket, tableSchema);
264 break;
265 case OpType::DELETE:
266 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
267 break;
268 case OpType::ONLY_UPDATE_GID:
269 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
270 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
271 case OpType::UPDATE_TIMESTAMP:
272 case OpType::CLEAR_GID:
273 case OpType::LOCKED_NOT_HANDLE:
274 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
275 [[fallthrough]];
276 case OpType::NOT_HANDLE:
277 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
278 GetLocalDataKey(index, downloadData), op) : errCode;
279 break;
280 default:
281 errCode = -E_CLOUD_ERROR;
282 break;
283 }
284 if (errCode != E_OK) {
285 LOGE("put cloud sync data fail: %d", errCode);
286 return errCode;
287 }
288 statisticMap[static_cast<int>(op)]++;
289 index++;
290 }
291 return errCode;
292 }
293
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)294 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
295 const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
296 std::vector<Asset> &assets, std::vector<std::string> ¬ifyTableList)
297 {
298 int errCode = SetLogTriggerStatus(false);
299 if (errCode != E_OK) {
300 LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
301 return errCode;
302 }
303 if (mode == FLAG_ONLY) {
304 errCode = DoCleanLogs(tableNameList, localSchema);
305 if (errCode != E_OK) {
306 LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
307 return errCode;
308 }
309 notifyTableList = tableNameList;
310 } else if (mode == FLAG_AND_DATA) {
311 errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
312 if (errCode != E_OK) {
313 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
314 return errCode;
315 }
316 notifyTableList = tableNameList;
317 } else if (mode == CLEAR_SHARED_TABLE) {
318 errCode = DoCleanShareTableDataAndLog(tableNameList);
319 if (errCode != E_OK) {
320 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
321 return errCode;
322 }
323 }
324 errCode = SetLogTriggerStatus(true);
325 if (errCode != E_OK) {
326 LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
327 }
328
329 return errCode;
330 }
331
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)332 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
333 const RelationalSchemaObject &localSchema)
334 {
335 int errCode = E_OK;
336 int i = 1;
337 for (const auto &tableName: tableNameList) {
338 std::string logTableName = DBCommon::GetLogTableName(tableName);
339 LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
340 errCode = DoCleanAssetId(tableName, localSchema);
341 if (errCode != E_OK) {
342 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
343 return errCode;
344 }
345 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
346 if (errCode != E_OK) {
347 LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
348 return errCode;
349 }
350 i++;
351 }
352
353 return errCode;
354 }
355
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)356 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
357 {
358 if (ctx == nullptr || argc != 0 || argv == nullptr) {
359 LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
360 return;
361 }
362 auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
363 if (context == nullptr) {
364 LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
365 return;
366 }
367 context->cursor++;
368 sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
369 }
370
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const371 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
372 void (*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
373 {
374 std::string sql = "update_cursor";
375 int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
376 &context, updateCursor, nullptr, nullptr, nullptr);
377 if (errCode != SQLITE_OK) {
378 LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
379 return SQLiteUtils::MapSQLiteErrno(errCode);
380 }
381 return E_OK;
382 }
383
GetCursor(const std::string & tableName)384 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName)
385 {
386 int cursor = -1;
387 std::string sql = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX + "metadata where key = ?;";
388 sqlite3_stmt *stmt = nullptr;
389 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
390 if (errCode != E_OK) {
391 LOGE("[Storage Executor]get cursor failed=%d", errCode);
392 return cursor;
393 }
394 ResFinalizer finalizer([stmt]() {
395 sqlite3_stmt *statement = stmt;
396 int ret = E_OK;
397 SQLiteUtils::ResetStatement(statement, true, ret);
398 if (ret != E_OK) {
399 LOGW("Reset stmt failed %d when get cursor", ret);
400 }
401 });
402 Key key;
403 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
404 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, key, false); // first arg.
405 if (errCode != E_OK) {
406 return cursor;
407 }
408 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
409 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
410 cursor = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
411 }
412 return cursor;
413 }
414
SetCursor(const std::string & tableName,int cursor)415 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, int cursor)
416 {
417 std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + "metadata SET VALUE = ? where KEY = ?;";
418 sqlite3_stmt *stmt = nullptr;
419 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
420 if (errCode != E_OK) {
421 LOGE("Set cursor sql failed=%d", errCode);
422 return cursor;
423 }
424 ResFinalizer finalizer([stmt]() {
425 sqlite3_stmt *statement = stmt;
426 int ret = E_OK;
427 SQLiteUtils::ResetStatement(statement, true, ret);
428 if (ret != E_OK) {
429 LOGW("Reset stmt failed %d when set cursor", ret);
430 }
431 });
432 int index = 1;
433 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
434 if (errCode != E_OK) {
435 LOGE("Bind saved cursor failed:%d", errCode);
436 return errCode;
437 }
438 Key key;
439 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
440 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
441 if (errCode != E_OK) {
442 return cursor;
443 }
444 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
445 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
446 errCode = E_OK;
447 }
448 return errCode;
449 }
450
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)451 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
452 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
453 {
454 int errCode = E_OK;
455 for (size_t i = 0; i < tableNameList.size(); i++) {
456 std::string tableName = tableNameList[i];
457 std::string logTableName = DBCommon::GetLogTableName(tableName);
458 std::vector<int64_t> dataKeys;
459 errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
460 if (errCode != E_OK) {
461 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
462 return errCode;
463 }
464
465 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
466 errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
467 if (errCode != E_OK) {
468 LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
469 return errCode;
470 }
471 if (isLogicDelete_) {
472 errCode = SetDataOnUserTablWithLogicDelete(tableName, logTableName);
473 } else {
474 errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
475 }
476 if (errCode != E_OK) {
477 LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
478 return errCode;
479 }
480 }
481
482 return errCode;
483 }
484
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)485 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
486 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
487 {
488 int errCode = E_OK;
489 int ret = E_OK;
490 sqlite3_stmt *selectStmt = nullptr;
491 for (const auto &rowId : dataKeys) {
492 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
493 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
494 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
495 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
496 LOGE("Get select asset statement failed, %d", errCode);
497 return errCode;
498 }
499 errCode = SQLiteUtils::StepWithRetry(selectStmt);
500 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
501 std::vector<uint8_t> blobValue;
502 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
503 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
504 LOGE("Get column blob value failed, %d", errCode);
505 goto END;
506 }
507 Asset asset;
508 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
509 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
510 LOGE("Transfer blob to asset failed, %d", errCode);
511 goto END;
512 }
513 assets.push_back(asset);
514 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
515 errCode = E_OK;
516 Asset asset;
517 assets.push_back(asset);
518 }
519 SQLiteUtils::ResetStatement(selectStmt, true, ret);
520 }
521 return errCode != E_OK ? errCode : ret;
522 END:
523 SQLiteUtils::ResetStatement(selectStmt, true, ret);
524 return errCode != E_OK ? errCode : ret;
525 }
526
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)527 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
528 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
529 {
530 int errCode = E_OK;
531 int ret = E_OK;
532 sqlite3_stmt *selectStmt = nullptr;
533 for (const auto &rowId : dataKeys) {
534 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
535 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
536 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
537 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
538 LOGE("Get select assets statement failed, %d", errCode);
539 goto END;
540 }
541 errCode = SQLiteUtils::StepWithRetry(selectStmt);
542 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
543 std::vector<uint8_t> blobValue;
544 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
545 if (errCode != E_OK) {
546 goto END;
547 }
548 Assets tmpAssets;
549 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
550 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
551 goto END;
552 }
553 for (const auto &asset: tmpAssets) {
554 assets.push_back(asset);
555 }
556 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
557 errCode = E_OK;
558 }
559 SQLiteUtils::ResetStatement(selectStmt, true, ret);
560 }
561 return errCode != E_OK ? errCode : ret;
562 END:
563 SQLiteUtils::ResetStatement(selectStmt, true, ret);
564 return errCode != E_OK ? errCode : ret;
565 }
566
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)567 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
568 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
569 {
570 int errCode = E_OK;
571 for (const auto &fieldInfo: fieldInfos) {
572 if (fieldInfo.IsAssetType()) {
573 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
574 if (errCode != E_OK) {
575 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
576 return errCode;
577 }
578 } else if (fieldInfo.IsAssetsType()) {
579 errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
580 if (errCode != E_OK) {
581 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
582 return errCode;
583 }
584 }
585 }
586 return errCode;
587 }
588
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)589 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
590 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
591 {
592 if (downloadData.data.size() != downloadData.opType.size()) {
593 LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
594 downloadData.opType.size());
595 return -E_CLOUD_ERROR;
596 }
597
598 int errCode = SetLogTriggerStatus(false);
599 if (errCode != E_OK) {
600 LOGE("Fail to set log trigger off, %d", errCode);
601 return errCode;
602 }
603
604 std::map<int, int> statisticMap = {};
605 errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
606 int ret = SetLogTriggerStatus(true);
607 if (ret != E_OK) {
608 LOGE("Fail to set log trigger on, %d", ret);
609 }
610 LOGI("save cloud data:%d, ins:%d, upd:%d, del:%d, only gid:%d, flag zero:%d, flag one:%d, upd timestamp:%d,"
611 "clear gid:%d, not handle:%d, lock:%d",
612 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
613 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
614 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
615 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
616 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
617 statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
618 return errCode == E_OK ? ret : errCode;
619 }
620
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)621 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
622 const TrackerTable &trackerTable, int64_t dataKey)
623 {
624 int errCode = E_OK;
625 if (dataKey > 0) {
626 errCode = RemoveDataAndLog(tableSchema.name, dataKey);
627 if (errCode != E_OK) {
628 return errCode;
629 }
630 }
631 std::string sql = GetInsertSqlForCloudSync(tableSchema);
632 sqlite3_stmt *insertStmt = nullptr;
633 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
634 if (errCode != E_OK) {
635 LOGE("Get insert statement failed when save cloud data, %d", errCode);
636 return errCode;
637 }
638 if (putDataMode_ == PutDataMode::SYNC) {
639 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
640 }
641 errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
642 if (errCode != E_OK) {
643 SQLiteUtils::ResetStatement(insertStmt, true, errCode);
644 return errCode;
645 }
646 // insert data
647 errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
648 int ret = E_OK;
649 SQLiteUtils::ResetStatement(insertStmt, true, ret);
650 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
651 LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
652 return errCode;
653 }
654
655 // insert log
656 return InsertLogRecord(tableSchema, trackerTable, vBucket);
657 }
658
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)659 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
660 const TrackerTable &trackerTable, VBucket &vBucket)
661 {
662 if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
663 // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
664 // so we need to delete the old log record according to the gid first
665 std::string gidStr;
666 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
667 if (errCode != E_OK || gidStr.empty()) {
668 LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
669 return errCode;
670 }
671 std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
672 + gidStr + "';";
673 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
674 if (errCode != E_OK) {
675 LOGE("delete log record according gid fail, errCode = %d", errCode);
676 return errCode;
677 }
678 }
679
680 std::string sql = "INSERT OR REPLACE INTO " + DBCommon::GetLogTableName(tableSchema.name) +
681 " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " + "CASE WHEN (SELECT status FROM " +
682 DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
683 "(SELECT status FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) " + "END)";
684 sqlite3_stmt *insertLogStmt = nullptr;
685 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
686 if (errCode != E_OK) {
687 LOGE("Get insert log statement failed when save cloud data, %d", errCode);
688 return errCode;
689 }
690
691 errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
692 if (errCode != E_OK) {
693 SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
694 return errCode;
695 }
696
697 errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
698 int ret = E_OK;
699 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
700 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
701 return ret;
702 } else {
703 LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
704 return errCode;
705 }
706 }
707
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)708 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
709 sqlite3_stmt *updateStmt)
710 {
711 auto it = bindCloudFieldFuncMap_.find(field.type);
712 if (it == bindCloudFieldFuncMap_.end()) {
713 LOGE("unknown cloud type when bind one field.");
714 return -E_CLOUD_ERROR;
715 }
716 return it->second(index, vBucket, field, updateStmt);
717 }
718
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)719 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
720 const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
721 {
722 int errCode = E_OK;
723 int index = 0;
724 for (const auto &field : fields) {
725 index++;
726 errCode = BindOneField(index, vBucket, field, upsertStmt);
727 if (errCode != E_OK) {
728 return errCode;
729 }
730 }
731 return errCode;
732 }
733
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey)734 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
735 std::vector<uint8_t> &hashKey)
736 {
737 int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 12, hashKey); // 12 is hash_key
738 if (errCode != E_OK) {
739 LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
740 return errCode;
741 }
742
743 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 13, hashKey); // 13 is hash_key
744 if (errCode != E_OK) {
745 LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
746 return errCode;
747 }
748 return errCode;
749 }
750
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)751 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
752 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
753 {
754 std::vector<uint8_t> hashKey;
755 int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
756 if (errCode != E_OK) {
757 return errCode;
758 }
759 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
760 if (errCode != E_OK) {
761 LOGE("Bind hash_key to insert log statement failed, %d", errCode);
762 return errCode;
763 }
764
765 std::string cloudGid;
766 if (putDataMode_ == PutDataMode::SYNC) {
767 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
768 if (errCode != E_OK) {
769 LOGE("get gid for insert log statement failed, %d", errCode);
770 return -E_CLOUD_ERROR;
771 }
772 }
773
774 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
775 if (errCode != E_OK) {
776 LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
777 return errCode;
778 }
779
780 if (trackerTable.GetExtendName().empty() || vBucket.find(trackerTable.GetExtendName()) == vBucket.end()) {
781 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 9, ""); // 9 is extend_field
782 } else {
783 Type extendValue = vBucket.at(trackerTable.GetExtendName());
784 errCode = SQLiteRelationalUtils::BindStatementByType(insertLogStmt, 9, extendValue); // 9 is extend_field
785 }
786 if (errCode != E_OK) {
787 LOGE("Bind extend_field to insert log statement failed, %d", errCode);
788 return errCode;
789 }
790
791 errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
792 if (errCode != E_OK) {
793 return errCode;
794 }
795 return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey);
796 }
797
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)798 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
799 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
800 {
801 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
802 int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
803 if (errCode != E_OK) {
804 LOGE("Bind rowid to insert log statement failed, %d", errCode);
805 return errCode;
806 }
807
808 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, GetDev()); // 2 is device
809 if (errCode != E_OK) {
810 LOGE("Bind device to insert log statement failed, %d", errCode);
811 return errCode;
812 }
813
814 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, GetDev()); // 3 is ori_device
815 if (errCode != E_OK) {
816 LOGE("Bind ori_device to insert log statement failed, %d", errCode);
817 return errCode;
818 }
819
820 int64_t val = 0;
821 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
822 if (errCode != E_OK) {
823 LOGE("get modify time for insert log statement failed, %d", errCode);
824 return -E_CLOUD_ERROR;
825 }
826
827 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
828 if (errCode != E_OK) {
829 LOGE("Bind timestamp to insert log statement failed, %d", errCode);
830 return errCode;
831 }
832
833 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
834 if (errCode != E_OK) {
835 LOGE("get create time for insert log statement failed, %d", errCode);
836 return -E_CLOUD_ERROR;
837 }
838
839 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
840 if (errCode != E_OK) {
841 LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
842 return errCode;
843 }
844
845 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, GetDataFlag())); // 6 is flag
846 if (errCode != E_OK) {
847 LOGE("Bind flag to insert log statement failed, %d", errCode);
848 return errCode;
849 }
850
851 vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
852 return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
853 }
854
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)855 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
856 const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
857 {
858 std::string where = " WHERE";
859 if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
860 where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
861 DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
862 }
863 if (!pkSet.empty() && queryByPk) {
864 if (!gidStr.empty()) {
865 where += " OR";
866 }
867 where += " (1 = 1";
868 for (const auto &pk : pkSet) {
869 where += (" AND " + pk + " = ?");
870 }
871 where += ");";
872 }
873 return where;
874 }
875
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)876 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
877 const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
878 std::string &updateSql)
879 {
880 if (pkSet.empty() && gidStr.empty()) {
881 LOGE("update data fail because both primary key and gid is empty.");
882 return -E_CLOUD_ERROR;
883 }
884 std::string sql = "UPDATE " + tableSchema.name + " SET";
885 for (const auto &field : updateFields) {
886 sql += " " + field.colName + " = ?,";
887 }
888 sql.pop_back();
889 sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
890 updateSql = sql;
891 return E_OK;
892 }
893
IsGidValid(const std::string & gidStr)894 static inline bool IsGidValid(const std::string &gidStr)
895 {
896 if (!gidStr.empty()) {
897 return gidStr.find("'") == std::string::npos;
898 }
899 return true;
900 }
901
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)902 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
903 const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
904 {
905 std::string gidStr;
906 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
907 if (errCode != E_OK) {
908 LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
909 return errCode;
910 }
911 if (!IsGidValid(gidStr)) {
912 LOGE("invalid char in cloud gid");
913 return -E_CLOUD_ERROR;
914 }
915
916 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
917 auto updateFields = GetUpdateField(vBucket, tableSchema);
918 std::string updateSql;
919 errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
920 if (errCode != E_OK) {
921 return errCode;
922 }
923
924 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
925 if (errCode != E_OK) {
926 LOGE("Get update statement failed when update cloud data, %d", errCode);
927 return errCode;
928 }
929
930 // bind value
931 if (!pkSet.empty()) {
932 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
933 updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
934 }
935 errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
936 if (errCode != E_OK) {
937 LOGE("bind value to update statement failed when update cloud data, %d", errCode);
938 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
939 }
940 return errCode;
941 }
942
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)943 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
944 {
945 if (putDataMode_ == PutDataMode::SYNC) {
946 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
947 }
948 sqlite3_stmt *updateStmt = nullptr;
949 int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
950 if (errCode != E_OK) {
951 LOGE("Get update data table statement fail, %d", errCode);
952 return errCode;
953 }
954
955 // update data
956 errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
957 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
958 errCode = E_OK;
959 } else {
960 LOGE("update data failed when save cloud data:%d", errCode);
961 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
962 return errCode;
963 }
964 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
965
966 // update log
967 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
968 if (errCode != E_OK) {
969 LOGE("update log record failed when update cloud data, errCode = %d", errCode);
970 }
971 return errCode;
972 }
973
IsAllowWithPrimaryKey(OpType opType)974 static inline bool IsAllowWithPrimaryKey(OpType opType)
975 {
976 return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
977 opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
978 }
979
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)980 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
981 OpType opType)
982 {
983 sqlite3_stmt *updateLogStmt = nullptr;
984 std::vector<std::string> updateColName;
985 int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
986 if (errCode != E_OK) {
987 LOGE("Get update log statement failed, errCode = %d", errCode);
988 return errCode;
989 }
990
991 errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
992 updateLogStmt);
993 int ret = E_OK;
994 if (errCode != E_OK) {
995 LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
996 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
997 return errCode != E_OK ? errCode : ret;
998 }
999
1000 errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1001 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1002 errCode = E_OK;
1003 } else {
1004 LOGE("update log record failed when update cloud data:%d", errCode);
1005 }
1006 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1007 return errCode != E_OK ? errCode : ret;
1008 }
1009
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1010 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1011 const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1012 sqlite3_stmt *updateLogStmt)
1013 {
1014 int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1015 if (errCode != E_OK) {
1016 return errCode;
1017 }
1018 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1019 if (pkMap.empty()) {
1020 return E_OK;
1021 }
1022
1023 std::vector<uint8_t> hashKey;
1024 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1025 if (errCode != E_OK) {
1026 return errCode;
1027 }
1028 return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1029 }
1030
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1031 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1032 const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1033 {
1034 std::string gidStr;
1035 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1036 if (errCode != E_OK) {
1037 LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1038 return errCode;
1039 }
1040 if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1041 LOGE("empty or invalid char in cloud gid");
1042 return -E_CLOUD_ERROR;
1043 }
1044
1045 bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1046 std::string deleteSql = "DELETE FROM " + tableSchema.name;
1047 deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1048 errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1049 if (errCode != E_OK) {
1050 LOGE("Get delete statement failed when delete data, %d", errCode);
1051 return errCode;
1052 }
1053
1054 int ret = E_OK;
1055 if (!pkSet.empty() && queryByPk) {
1056 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1057 errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1058 if (errCode != E_OK) {
1059 LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1060 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1061 }
1062 }
1063 return errCode != E_OK ? errCode : ret;
1064 }
1065
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1066 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1067 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1068 {
1069 if (isLogicDelete_) {
1070 return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1071 }
1072 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1073 sqlite3_stmt *deleteStmt = nullptr;
1074 int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1075 if (errCode != E_OK) {
1076 return errCode;
1077 }
1078 errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1079 int ret = E_OK;
1080 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1081 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1082 LOGE("delete data failed when sync with cloud:%d", errCode);
1083 return errCode;
1084 }
1085 if (ret != E_OK) {
1086 LOGE("reset delete statement failed:%d", ret);
1087 return ret;
1088 }
1089
1090 // update log
1091 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1092 if (errCode != E_OK) {
1093 LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1094 }
1095 return errCode;
1096 }
1097
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1098 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1099 const TableSchema &tableSchema, OpType opType)
1100 {
1101 return UpdateLogRecord(vBucket, tableSchema, opType);
1102 }
1103
DeleteTableTrigger(const std::string & missTable) const1104 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1105 {
1106 static const char *triggerEndName[] = {
1107 "_ON_INSERT",
1108 "_ON_UPDATE",
1109 "_ON_DELETE"
1110 };
1111 std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1112 for (const auto &endName : triggerEndName) {
1113 std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1114 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1115 if (errCode != E_OK) {
1116 LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1117 return errCode;
1118 }
1119 }
1120 return E_OK;
1121 }
1122
SetLogicDelete(bool isLogicDelete)1123 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1124 {
1125 isLogicDelete_ = isLogicDelete;
1126 }
1127
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1128 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1129 const std::string &status, const Key &hashKey)
1130 {
1131 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1132 sqlite3_stmt *stmt = nullptr;
1133 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1134 if (errCode != E_OK) {
1135 LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1136 return errCode;
1137 }
1138 int ret = E_OK;
1139 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1140 if (errCode != E_OK) {
1141 LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1142 SQLiteUtils::ResetStatement(stmt, true, ret);
1143 return errCode;
1144 }
1145 errCode = SQLiteUtils::StepWithRetry(stmt);
1146 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1147 errCode = E_OK;
1148 } else {
1149 LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1150 }
1151 SQLiteUtils::ResetStatement(stmt, true, ret);
1152 return errCode == E_OK ? ret : errCode;
1153 }
1154
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1155 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1156 {
1157 maxUploadCount_ = maxUploadCount;
1158 maxUploadSize_ = maxUploadSize;
1159 }
1160
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1161 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1162 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1163 {
1164 LOGD("[RDBExecutor] logic delete skip delete data");
1165 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1166 if (errCode != E_OK) {
1167 return errCode;
1168 }
1169 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1170 if (errCode != E_OK) {
1171 return errCode;
1172 }
1173 if (!trackerTable.IsEmpty()) {
1174 return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1175 }
1176 return E_OK;
1177 }
1178
InitCursorToMeta(const std::string & tableName)1179 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1180 {
1181 Value key;
1182 Value cursor;
1183 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1184 int errCode = GetKvData(key, cursor);
1185 if (errCode == -E_NOT_FOUND) {
1186 DBCommon::StringToVector(std::string("0"), cursor);
1187 errCode = PutKvData(key, cursor);
1188 if (errCode != E_OK) {
1189 LOGE("Init cursor to meta table failed. %d", errCode);
1190 }
1191 return errCode;
1192 }
1193 if (errCode != E_OK) {
1194 LOGE("Get cursor from meta table failed. %d", errCode);
1195 }
1196 return errCode;
1197 }
1198
SetTableSchema(const TableSchema & tableSchema)1199 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1200 {
1201 tableSchema_ = tableSchema;
1202 }
1203 } // namespace DistributedDB
1204 #endif
1205