• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32 
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35     const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37     std::string querySql;
38     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39     std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40     int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41     if (errCode != E_OK) {
42         LOGE("Get query log sql fail, %d", errCode);
43         return errCode;
44     }
45     if (!pkSet.empty()) {
46         errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47         if (errCode != E_OK) {
48             LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49             return errCode;
50         }
51     }
52     sqlite3_stmt *selectStmt = nullptr;
53     errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54     if (errCode != E_OK) {
55         LOGE("Get query log statement fail, %d", errCode);
56         return errCode;
57     }
58 
59     bool alreadyFound = false;
60     do {
61         errCode = SQLiteUtils::StepWithRetry(selectStmt);
62         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63             if (alreadyFound) {
64                 LOGE("found more than one records in log table for one primary key or gid.");
65                 errCode = -E_CLOUD_ERROR;
66                 break;
67             }
68             alreadyFound = true;
69             std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70             errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71             if (errCode != E_OK) {
72                 LOGE("Get info by statement fail, %d", errCode);
73                 break;
74             }
75         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76             errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77             break;
78         } else {
79             LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80             break;
81         }
82     } while (errCode == E_OK);
83 
84     int ret = E_OK;
85     SQLiteUtils::ResetStatement(selectStmt, true, ret);
86     return errCode != E_OK ? errCode : ret;
87 }
88 
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91     int index = 0;
92     logInfo.dataKey = sqlite3_column_int64(statement, index++);
93     std::vector<uint8_t> device;
94     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device);    // 1 is device
95     DBCommon::VectorToString(device, logInfo.device);
96     std::vector<uint8_t> originDev;
97     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98     DBCommon::VectorToString(originDev, logInfo.originDev);
99     logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100     logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105     logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107     return index;
108 }
109 
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111     const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112     VBucket &assetInfo)
113 {
114     int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115     int errCode = E_OK;
116     for (const auto &field: assetFields) {
117         Type cloudValue;
118         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119         if (errCode != E_OK) {
120             break;
121         }
122         errCode = PutVBucketByType(assetInfo, field, cloudValue);
123         if (errCode != E_OK) {
124             break;
125         }
126     }
127     if (errCode != E_OK) {
128         LOGE("set asset field failed, errCode = %d", errCode);
129         return errCode;
130     }
131 
132     // fill primary key
133     for (const auto &item : pkMap) {
134         Type cloudValue;
135         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136         if (errCode != E_OK) {
137             break;
138         }
139         errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140         if (errCode != E_OK) {
141             break;
142         }
143     }
144     return errCode;
145 }
146 
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149     std::string sql = "insert into " + tableSchema.name + "(";
150     for (const auto &field : tableSchema.fields) {
151         sql += field.colName + ",";
152     }
153     sql.pop_back();
154     sql += ") values(";
155     for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156         sql += "?,";
157     }
158     sql.pop_back();
159     sql += ");";
160     return sql;
161 }
162 
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164     const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166     int errCode = E_OK;
167     TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168     // table name in cloud schema is in lower case
169     if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170         LOGE("localSchema doesn't contain table from cloud");
171         return -E_INTERNAL_ERROR;
172     }
173 
174     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175     if (pkMap.size() == 0) {
176         int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177         std::vector<uint8_t> value;
178         DBCommon::StringToVector(std::to_string(rowid), value);
179         errCode = DBCommon::CalcValueHash(value, hashValue);
180     } else {
181         std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182             tableSchema, localTable, pkMap, allowEmpty);
183     }
184     return errCode;
185 }
186 
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188     const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190     int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191     if (errCode != E_OK) {
192         LOGE("Get select log statement failed, %d", errCode);
193         return errCode;
194     }
195 
196     std::string cloudGid;
197     int ret = E_OK;
198     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200         SQLiteUtils::ResetStatement(selectStmt, true, ret);
201         LOGE("Get cloud gid fail when bind query log statement.");
202         return errCode;
203     }
204 
205     int index = 0;
206     if (!cloudGid.empty()) {
207         index++;
208         errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209         if (errCode != E_OK) {
210             LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212             return errCode;
213         }
214     }
215 
216     index++;
217     errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218     if (errCode != E_OK) {
219         LOGE("Bind hash key to query log statement failed. %d", errCode);
220         SQLiteUtils::ResetStatement(selectStmt, true, ret);
221     }
222     return errCode != E_OK ? errCode : ret;
223 }
224 
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226     const std::set<std::string> &pkSet, std::string &querySql)
227 {
228     std::string cloudGid;
229     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230     if (errCode != E_OK) {
231         LOGE("Get cloud gid fail when query log table.");
232         return errCode;
233     }
234 
235     if (pkSet.empty() && cloudGid.empty()) {
236         LOGE("query log table failed because of both primary key and gid are empty.");
237         return -E_CLOUD_ERROR;
238     }
239     std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240         " sharing_resource, status, version FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
241     if (!cloudGid.empty()) {
242         sql += "cloud_gid = ? OR ";
243     }
244     sql += "hash_key = ?";
245 
246     querySql = sql;
247     return E_OK;
248 }
249 
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)250 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
251     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
252     std::map<int, int> &statisticMap)
253 {
254     int index = 0;
255     int errCode = E_OK;
256     for (OpType op : downloadData.opType) {
257         VBucket &vBucket = downloadData.data[index];
258         switch (op) {
259             case OpType::INSERT:
260                 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
261                 break;
262             case OpType::UPDATE:
263                 errCode = UpdateCloudData(vBucket, tableSchema);
264                 break;
265             case OpType::DELETE:
266                 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
267                 break;
268             case OpType::ONLY_UPDATE_GID:
269             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
270             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
271             case OpType::UPDATE_TIMESTAMP:
272             case OpType::CLEAR_GID:
273             case OpType::LOCKED_NOT_HANDLE:
274                 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
275                 [[fallthrough]];
276             case OpType::NOT_HANDLE:
277                 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
278                     GetLocalDataKey(index, downloadData), op) : errCode;
279                 break;
280             default:
281                 errCode = -E_CLOUD_ERROR;
282                 break;
283         }
284         if (errCode != E_OK) {
285             LOGE("put cloud sync data fail: %d", errCode);
286             return errCode;
287         }
288         statisticMap[static_cast<int>(op)]++;
289         index++;
290     }
291     return errCode;
292 }
293 
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)294 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
295     const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
296     std::vector<Asset> &assets, std::vector<std::string> &notifyTableList)
297 {
298     int errCode = SetLogTriggerStatus(false);
299     if (errCode != E_OK) {
300         LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
301         return errCode;
302     }
303     if (mode == FLAG_ONLY) {
304         errCode = DoCleanLogs(tableNameList, localSchema);
305         if (errCode != E_OK) {
306             LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
307             return errCode;
308         }
309         notifyTableList = tableNameList;
310     } else if (mode == FLAG_AND_DATA) {
311         errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
312         if (errCode != E_OK) {
313             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
314             return errCode;
315         }
316         notifyTableList = tableNameList;
317     } else if (mode == CLEAR_SHARED_TABLE) {
318         errCode = DoCleanShareTableDataAndLog(tableNameList);
319         if (errCode != E_OK) {
320             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
321             return errCode;
322         }
323     }
324     errCode = SetLogTriggerStatus(true);
325     if (errCode != E_OK) {
326         LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
327     }
328 
329     return errCode;
330 }
331 
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)332 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
333     const RelationalSchemaObject &localSchema)
334 {
335     int errCode = E_OK;
336     int i = 1;
337     for (const auto &tableName: tableNameList) {
338         std::string logTableName = DBCommon::GetLogTableName(tableName);
339         LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
340         errCode = DoCleanAssetId(tableName, localSchema);
341         if (errCode != E_OK) {
342             LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
343             return errCode;
344         }
345         errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
346         if (errCode != E_OK) {
347             LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
348             return errCode;
349         }
350         i++;
351     }
352 
353     return errCode;
354 }
355 
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)356 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
357 {
358     if (ctx == nullptr || argc != 0 || argv == nullptr) {
359         LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
360         return;
361     }
362     auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
363     if (context == nullptr) {
364         LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
365         return;
366     }
367     context->cursor++;
368     sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
369 }
370 
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const371 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
372     void (*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
373 {
374     std::string sql = "update_cursor";
375     int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
376         &context, updateCursor, nullptr, nullptr, nullptr);
377     if (errCode != SQLITE_OK) {
378         LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
379         return SQLiteUtils::MapSQLiteErrno(errCode);
380     }
381     return E_OK;
382 }
383 
GetCursor(const std::string & tableName)384 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName)
385 {
386     int cursor = -1;
387     std::string sql = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX + "metadata where key = ?;";
388     sqlite3_stmt *stmt = nullptr;
389     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
390     if (errCode != E_OK) {
391         LOGE("[Storage Executor]get cursor failed=%d", errCode);
392         return cursor;
393     }
394     ResFinalizer finalizer([stmt]() {
395         sqlite3_stmt *statement = stmt;
396         int ret = E_OK;
397         SQLiteUtils::ResetStatement(statement, true, ret);
398         if (ret != E_OK) {
399             LOGW("Reset stmt failed %d when get cursor", ret);
400         }
401     });
402     Key key;
403     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
404     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, key, false); // first arg.
405     if (errCode != E_OK) {
406         return cursor;
407     }
408     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
409     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
410         cursor = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
411     }
412     return cursor;
413 }
414 
SetCursor(const std::string & tableName,int cursor)415 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, int cursor)
416 {
417     std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + "metadata SET VALUE = ? where KEY = ?;";
418     sqlite3_stmt *stmt = nullptr;
419     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
420     if (errCode != E_OK) {
421         LOGE("Set cursor sql failed=%d", errCode);
422         return cursor;
423     }
424     ResFinalizer finalizer([stmt]() {
425         sqlite3_stmt *statement = stmt;
426         int ret = E_OK;
427         SQLiteUtils::ResetStatement(statement, true, ret);
428         if (ret != E_OK) {
429             LOGW("Reset stmt failed %d when set cursor", ret);
430         }
431     });
432     int index = 1;
433     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
434     if (errCode != E_OK) {
435         LOGE("Bind saved cursor failed:%d", errCode);
436         return errCode;
437     }
438     Key key;
439     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
440     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
441     if (errCode != E_OK) {
442         return cursor;
443     }
444     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
445     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
446         errCode = E_OK;
447     }
448     return errCode;
449 }
450 
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)451 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
452     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
453 {
454     int errCode = E_OK;
455     for (size_t i = 0; i < tableNameList.size(); i++) {
456         std::string tableName = tableNameList[i];
457         std::string logTableName = DBCommon::GetLogTableName(tableName);
458         std::vector<int64_t> dataKeys;
459         errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
460         if (errCode != E_OK) {
461             LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
462             return errCode;
463         }
464 
465         std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
466         errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
467         if (errCode != E_OK) {
468             LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
469             return errCode;
470         }
471         if (isLogicDelete_) {
472             errCode = SetDataOnUserTablWithLogicDelete(tableName, logTableName);
473         } else {
474             errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
475         }
476         if (errCode != E_OK) {
477             LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
478             return errCode;
479         }
480     }
481 
482     return errCode;
483 }
484 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)485 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
486     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
487 {
488     int errCode = E_OK;
489     int ret = E_OK;
490     sqlite3_stmt *selectStmt = nullptr;
491     for (const auto &rowId : dataKeys) {
492         std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
493             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
494         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
495         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
496             LOGE("Get select asset statement failed, %d", errCode);
497             return errCode;
498         }
499         errCode = SQLiteUtils::StepWithRetry(selectStmt);
500         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
501             std::vector<uint8_t> blobValue;
502             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
503             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
504                 LOGE("Get column blob value failed, %d", errCode);
505                 goto END;
506             }
507             Asset asset;
508             errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
509             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
510                 LOGE("Transfer blob to asset failed, %d", errCode);
511                 goto END;
512             }
513             assets.push_back(asset);
514         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
515             errCode = E_OK;
516             Asset asset;
517             assets.push_back(asset);
518         }
519         SQLiteUtils::ResetStatement(selectStmt, true, ret);
520     }
521     return errCode != E_OK ? errCode : ret;
522 END:
523     SQLiteUtils::ResetStatement(selectStmt, true, ret);
524     return errCode != E_OK ? errCode : ret;
525 }
526 
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)527 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
528     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
529 {
530     int errCode = E_OK;
531     int ret = E_OK;
532     sqlite3_stmt *selectStmt = nullptr;
533     for (const auto &rowId : dataKeys) {
534         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
535             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
536         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
537         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
538             LOGE("Get select assets statement failed, %d", errCode);
539             goto END;
540         }
541         errCode = SQLiteUtils::StepWithRetry(selectStmt);
542         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
543             std::vector<uint8_t> blobValue;
544             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
545             if (errCode != E_OK) {
546                 goto END;
547             }
548             Assets tmpAssets;
549             errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
550             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
551                 goto END;
552             }
553             for (const auto &asset: tmpAssets) {
554                 assets.push_back(asset);
555             }
556         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
557             errCode = E_OK;
558         }
559         SQLiteUtils::ResetStatement(selectStmt, true, ret);
560     }
561     return errCode != E_OK ? errCode : ret;
562 END:
563     SQLiteUtils::ResetStatement(selectStmt, true, ret);
564     return errCode != E_OK ? errCode : ret;
565 }
566 
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)567 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
568     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
569 {
570     int errCode = E_OK;
571     for (const auto &fieldInfo: fieldInfos) {
572         if (fieldInfo.IsAssetType()) {
573             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
574             if (errCode != E_OK) {
575                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
576                 return errCode;
577             }
578         } else if (fieldInfo.IsAssetsType()) {
579             errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
580             if (errCode != E_OK) {
581                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
582                 return errCode;
583             }
584         }
585     }
586     return errCode;
587 }
588 
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)589 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
590     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
591 {
592     if (downloadData.data.size() != downloadData.opType.size()) {
593         LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
594              downloadData.opType.size());
595         return -E_CLOUD_ERROR;
596     }
597 
598     int errCode = SetLogTriggerStatus(false);
599     if (errCode != E_OK) {
600         LOGE("Fail to set log trigger off, %d", errCode);
601         return errCode;
602     }
603 
604     std::map<int, int> statisticMap = {};
605     errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
606     int ret = SetLogTriggerStatus(true);
607     if (ret != E_OK) {
608         LOGE("Fail to set log trigger on, %d", ret);
609     }
610     LOGI("save cloud data:%d, ins:%d, upd:%d, del:%d, only gid:%d, flag zero:%d, flag one:%d, upd timestamp:%d,"
611          "clear gid:%d, not handle:%d, lock:%d",
612          errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
613          statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
614          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
615          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
616          statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
617          statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
618     return errCode == E_OK ? ret : errCode;
619 }
620 
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)621 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
622     const TrackerTable &trackerTable, int64_t dataKey)
623 {
624     int errCode = E_OK;
625     if (dataKey > 0) {
626         errCode = RemoveDataAndLog(tableSchema.name, dataKey);
627         if (errCode != E_OK) {
628             return errCode;
629         }
630     }
631     std::string sql = GetInsertSqlForCloudSync(tableSchema);
632     sqlite3_stmt *insertStmt = nullptr;
633     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
634     if (errCode != E_OK) {
635         LOGE("Get insert statement failed when save cloud data, %d", errCode);
636         return errCode;
637     }
638     if (putDataMode_ == PutDataMode::SYNC) {
639         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
640     }
641     errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
642     if (errCode != E_OK) {
643         SQLiteUtils::ResetStatement(insertStmt, true, errCode);
644         return errCode;
645     }
646     // insert data
647     errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
648     int ret = E_OK;
649     SQLiteUtils::ResetStatement(insertStmt, true, ret);
650     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
651         LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
652         return errCode;
653     }
654 
655     // insert log
656     return InsertLogRecord(tableSchema, trackerTable, vBucket);
657 }
658 
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)659 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
660     const TrackerTable &trackerTable, VBucket &vBucket)
661 {
662     if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
663         // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
664         // so we need to delete the old log record according to the gid first
665         std::string gidStr;
666         int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
667         if (errCode != E_OK || gidStr.empty()) {
668             LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
669             return errCode;
670         }
671         std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
672             + gidStr + "';";
673         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
674         if (errCode != E_OK) {
675             LOGE("delete log record according gid fail, errCode = %d", errCode);
676             return errCode;
677         }
678     }
679 
680     std::string sql = "INSERT OR REPLACE INTO " + DBCommon::GetLogTableName(tableSchema.name) +
681         " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " + "CASE WHEN (SELECT status FROM " +
682         DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
683         "(SELECT status FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) " + "END)";
684     sqlite3_stmt *insertLogStmt = nullptr;
685     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
686     if (errCode != E_OK) {
687         LOGE("Get insert log statement failed when save cloud data, %d", errCode);
688         return errCode;
689     }
690 
691     errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
692     if (errCode != E_OK) {
693         SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
694         return errCode;
695     }
696 
697     errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
698     int ret = E_OK;
699     SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
700     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
701         return ret;
702     } else {
703         LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
704         return errCode;
705     }
706 }
707 
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)708 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
709     sqlite3_stmt *updateStmt)
710 {
711     auto it = bindCloudFieldFuncMap_.find(field.type);
712     if (it == bindCloudFieldFuncMap_.end()) {
713         LOGE("unknown cloud type when bind one field.");
714         return -E_CLOUD_ERROR;
715     }
716     return it->second(index, vBucket, field, updateStmt);
717 }
718 
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)719 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
720     const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
721 {
722     int errCode = E_OK;
723     int index = 0;
724     for (const auto &field : fields) {
725         index++;
726         errCode = BindOneField(index, vBucket, field, upsertStmt);
727         if (errCode != E_OK) {
728             return errCode;
729         }
730     }
731     return errCode;
732 }
733 
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey)734 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
735     std::vector<uint8_t> &hashKey)
736 {
737     int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 12, hashKey); // 12 is hash_key
738     if (errCode != E_OK) {
739         LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
740         return errCode;
741     }
742 
743     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 13, hashKey); // 13 is hash_key
744     if (errCode != E_OK) {
745         LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
746         return errCode;
747     }
748     return errCode;
749 }
750 
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)751 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
752     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
753 {
754     std::vector<uint8_t> hashKey;
755     int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
756     if (errCode != E_OK) {
757         return errCode;
758     }
759     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
760     if (errCode != E_OK) {
761         LOGE("Bind hash_key to insert log statement failed, %d", errCode);
762         return errCode;
763     }
764 
765     std::string cloudGid;
766     if (putDataMode_ == PutDataMode::SYNC) {
767         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
768         if (errCode != E_OK) {
769             LOGE("get gid for insert log statement failed, %d", errCode);
770             return -E_CLOUD_ERROR;
771         }
772     }
773 
774     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
775     if (errCode != E_OK) {
776         LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
777         return errCode;
778     }
779 
780     if (trackerTable.GetExtendName().empty() || vBucket.find(trackerTable.GetExtendName()) == vBucket.end()) {
781         errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 9, ""); // 9 is extend_field
782     } else {
783         Type extendValue = vBucket.at(trackerTable.GetExtendName());
784         errCode = SQLiteRelationalUtils::BindStatementByType(insertLogStmt, 9, extendValue); // 9 is extend_field
785     }
786     if (errCode != E_OK) {
787         LOGE("Bind extend_field to insert log statement failed, %d", errCode);
788         return errCode;
789     }
790 
791     errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
792     if (errCode != E_OK) {
793         return errCode;
794     }
795     return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey);
796 }
797 
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)798 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
799     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
800 {
801     int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
802     int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
803     if (errCode != E_OK) {
804         LOGE("Bind rowid to insert log statement failed, %d", errCode);
805         return errCode;
806     }
807 
808     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, GetDev()); // 2 is device
809     if (errCode != E_OK) {
810         LOGE("Bind device to insert log statement failed, %d", errCode);
811         return errCode;
812     }
813 
814     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, GetDev()); // 3 is ori_device
815     if (errCode != E_OK) {
816         LOGE("Bind ori_device to insert log statement failed, %d", errCode);
817         return errCode;
818     }
819 
820     int64_t val = 0;
821     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
822     if (errCode != E_OK) {
823         LOGE("get modify time for insert log statement failed, %d", errCode);
824         return -E_CLOUD_ERROR;
825     }
826 
827     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
828     if (errCode != E_OK) {
829         LOGE("Bind timestamp to insert log statement failed, %d", errCode);
830         return errCode;
831     }
832 
833     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
834     if (errCode != E_OK) {
835         LOGE("get create time for insert log statement failed, %d", errCode);
836         return -E_CLOUD_ERROR;
837     }
838 
839     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
840     if (errCode != E_OK) {
841         LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
842         return errCode;
843     }
844 
845     errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, GetDataFlag())); // 6 is flag
846     if (errCode != E_OK) {
847         LOGE("Bind flag to insert log statement failed, %d", errCode);
848         return errCode;
849     }
850 
851     vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
852     return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
853 }
854 
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)855 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
856     const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
857 {
858     std::string where = " WHERE";
859     if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
860         where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
861             DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
862     }
863     if (!pkSet.empty() && queryByPk) {
864         if (!gidStr.empty()) {
865             where += " OR";
866         }
867         where += " (1 = 1";
868         for (const auto &pk : pkSet) {
869             where += (" AND " + pk + " = ?");
870         }
871         where += ");";
872     }
873     return where;
874 }
875 
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)876 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
877     const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
878     std::string &updateSql)
879 {
880     if (pkSet.empty() && gidStr.empty()) {
881         LOGE("update data fail because both primary key and gid is empty.");
882         return -E_CLOUD_ERROR;
883     }
884     std::string sql = "UPDATE " + tableSchema.name + " SET";
885     for (const auto &field : updateFields) {
886         sql +=  " " + field.colName + " = ?,";
887     }
888     sql.pop_back();
889     sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
890     updateSql = sql;
891     return E_OK;
892 }
893 
IsGidValid(const std::string & gidStr)894 static inline bool IsGidValid(const std::string &gidStr)
895 {
896     if (!gidStr.empty()) {
897         return gidStr.find("'") == std::string::npos;
898     }
899     return true;
900 }
901 
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)902 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
903     const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
904 {
905     std::string gidStr;
906     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
907     if (errCode != E_OK) {
908         LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
909         return errCode;
910     }
911     if (!IsGidValid(gidStr)) {
912         LOGE("invalid char in cloud gid");
913         return -E_CLOUD_ERROR;
914     }
915 
916     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
917     auto updateFields = GetUpdateField(vBucket, tableSchema);
918     std::string updateSql;
919     errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
920     if (errCode != E_OK) {
921         return errCode;
922     }
923 
924     errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
925     if (errCode != E_OK) {
926         LOGE("Get update statement failed when update cloud data, %d", errCode);
927         return errCode;
928     }
929 
930     // bind value
931     if (!pkSet.empty()) {
932         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
933         updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
934     }
935     errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
936     if (errCode != E_OK) {
937         LOGE("bind value to update statement failed when update cloud data, %d", errCode);
938         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
939     }
940     return errCode;
941 }
942 
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)943 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
944 {
945     if (putDataMode_ == PutDataMode::SYNC) {
946         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
947     }
948     sqlite3_stmt *updateStmt = nullptr;
949     int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
950     if (errCode != E_OK) {
951         LOGE("Get update data table statement fail, %d", errCode);
952         return errCode;
953     }
954 
955     // update data
956     errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
957     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
958         errCode = E_OK;
959     } else {
960         LOGE("update data failed when save cloud data:%d", errCode);
961         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
962         return errCode;
963     }
964     SQLiteUtils::ResetStatement(updateStmt, true, errCode);
965 
966     // update log
967     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
968     if (errCode != E_OK) {
969         LOGE("update log record failed when update cloud data, errCode = %d", errCode);
970     }
971     return errCode;
972 }
973 
IsAllowWithPrimaryKey(OpType opType)974 static inline bool IsAllowWithPrimaryKey(OpType opType)
975 {
976     return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
977         opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
978 }
979 
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)980 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
981     OpType opType)
982 {
983     sqlite3_stmt *updateLogStmt = nullptr;
984     std::vector<std::string> updateColName;
985     int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
986     if (errCode != E_OK) {
987         LOGE("Get update log statement failed, errCode = %d", errCode);
988         return errCode;
989     }
990 
991     errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
992         updateLogStmt);
993     int ret = E_OK;
994     if (errCode != E_OK) {
995         LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
996         SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
997         return errCode != E_OK ? errCode : ret;
998     }
999 
1000     errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1001     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1002         errCode = E_OK;
1003     } else {
1004         LOGE("update log record failed when update cloud data:%d", errCode);
1005     }
1006     SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1007     return errCode != E_OK ? errCode : ret;
1008 }
1009 
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1010 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1011     const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1012     sqlite3_stmt *updateLogStmt)
1013 {
1014     int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1015     if (errCode != E_OK) {
1016         return errCode;
1017     }
1018     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1019     if (pkMap.empty()) {
1020         return E_OK;
1021     }
1022 
1023     std::vector<uint8_t> hashKey;
1024     errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1025     if (errCode != E_OK) {
1026         return errCode;
1027     }
1028     return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1029 }
1030 
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1031 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1032     const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1033 {
1034     std::string gidStr;
1035     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1036     if (errCode != E_OK) {
1037         LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1038         return errCode;
1039     }
1040     if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1041         LOGE("empty or invalid char in cloud gid");
1042         return -E_CLOUD_ERROR;
1043     }
1044 
1045     bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1046     std::string deleteSql = "DELETE FROM " + tableSchema.name;
1047     deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1048     errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1049     if (errCode != E_OK) {
1050         LOGE("Get delete statement failed when delete data, %d", errCode);
1051         return errCode;
1052     }
1053 
1054     int ret = E_OK;
1055     if (!pkSet.empty() && queryByPk) {
1056         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1057         errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1058         if (errCode != E_OK) {
1059             LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1060             SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1061         }
1062     }
1063     return errCode != E_OK ? errCode : ret;
1064 }
1065 
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1066 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1067     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1068 {
1069     if (isLogicDelete_) {
1070         return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1071     }
1072     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1073     sqlite3_stmt *deleteStmt = nullptr;
1074     int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1075     if (errCode != E_OK) {
1076         return errCode;
1077     }
1078     errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1079     int ret = E_OK;
1080     SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1081     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1082         LOGE("delete data failed when sync with cloud:%d", errCode);
1083         return errCode;
1084     }
1085     if (ret != E_OK) {
1086         LOGE("reset delete statement failed:%d", ret);
1087         return ret;
1088     }
1089 
1090     // update log
1091     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1092     if (errCode != E_OK) {
1093         LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1094     }
1095     return errCode;
1096 }
1097 
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1098 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1099     const TableSchema &tableSchema, OpType opType)
1100 {
1101     return UpdateLogRecord(vBucket, tableSchema, opType);
1102 }
1103 
DeleteTableTrigger(const std::string & missTable) const1104 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1105 {
1106     static const char *triggerEndName[] = {
1107         "_ON_INSERT",
1108         "_ON_UPDATE",
1109         "_ON_DELETE"
1110     };
1111     std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1112     for (const auto &endName : triggerEndName) {
1113         std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1114         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1115         if (errCode != E_OK) {
1116             LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1117             return errCode;
1118         }
1119     }
1120     return E_OK;
1121 }
1122 
SetLogicDelete(bool isLogicDelete)1123 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1124 {
1125     isLogicDelete_ = isLogicDelete;
1126 }
1127 
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1128 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1129     const std::string &status, const Key &hashKey)
1130 {
1131     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1132     sqlite3_stmt *stmt = nullptr;
1133     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1134     if (errCode != E_OK) {
1135         LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1136         return errCode;
1137     }
1138     int ret = E_OK;
1139     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1140     if (errCode != E_OK) {
1141         LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1142         SQLiteUtils::ResetStatement(stmt, true, ret);
1143         return errCode;
1144     }
1145     errCode = SQLiteUtils::StepWithRetry(stmt);
1146     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1147         errCode = E_OK;
1148     } else {
1149         LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1150     }
1151     SQLiteUtils::ResetStatement(stmt, true, ret);
1152     return errCode == E_OK ? ret : errCode;
1153 }
1154 
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1155 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1156 {
1157     maxUploadCount_ = maxUploadCount;
1158     maxUploadSize_ = maxUploadSize;
1159 }
1160 
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1161 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1162     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1163 {
1164     LOGD("[RDBExecutor] logic delete skip delete data");
1165     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1166     if (errCode != E_OK) {
1167         return errCode;
1168     }
1169     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1170     if (errCode != E_OK) {
1171         return errCode;
1172     }
1173     if (!trackerTable.IsEmpty()) {
1174         return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1175     }
1176     return E_OK;
1177 }
1178 
InitCursorToMeta(const std::string & tableName)1179 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1180 {
1181     Value key;
1182     Value cursor;
1183     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1184     int errCode = GetKvData(key, cursor);
1185     if (errCode == -E_NOT_FOUND) {
1186         DBCommon::StringToVector(std::string("0"), cursor);
1187         errCode = PutKvData(key, cursor);
1188         if (errCode != E_OK) {
1189             LOGE("Init cursor to meta table failed. %d", errCode);
1190         }
1191         return errCode;
1192     }
1193     if (errCode != E_OK) {
1194         LOGE("Get cursor from meta table failed. %d", errCode);
1195     }
1196     return errCode;
1197 }
1198 
SetTableSchema(const TableSchema & tableSchema)1199 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1200 {
1201     tableSchema_ = tableSchema;
1202 }
1203 } // namespace DistributedDB
1204 #endif
1205