• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32 
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35     const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37     std::string querySql;
38     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39     std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40     int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41     if (errCode != E_OK) {
42         LOGE("Get query log sql fail, %d", errCode);
43         return errCode;
44     }
45     if (!pkSet.empty()) {
46         errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47         if (errCode != E_OK) {
48             LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49             return errCode;
50         }
51     }
52     sqlite3_stmt *selectStmt = nullptr;
53     errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54     if (errCode != E_OK) {
55         LOGE("Get query log statement fail, %d", errCode);
56         return errCode;
57     }
58 
59     bool alreadyFound = false;
60     do {
61         errCode = SQLiteUtils::StepWithRetry(selectStmt);
62         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63             if (alreadyFound) {
64                 LOGE("found more than one records in log table for one primary key or gid.");
65                 errCode = -E_CLOUD_ERROR;
66                 break;
67             }
68             alreadyFound = true;
69             std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70             errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71             if (errCode != E_OK) {
72                 LOGE("Get info by statement fail, %d", errCode);
73                 break;
74             }
75         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76             errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77             break;
78         } else {
79             LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80             break;
81         }
82     } while (errCode == E_OK);
83 
84     int ret = E_OK;
85     SQLiteUtils::ResetStatement(selectStmt, true, ret);
86     return errCode != E_OK ? errCode : ret;
87 }
88 
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91     int index = 0;
92     logInfo.dataKey = sqlite3_column_int64(statement, index++);
93     std::vector<uint8_t> device;
94     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device);    // 1 is device
95     DBCommon::VectorToString(device, logInfo.device);
96     std::vector<uint8_t> originDev;
97     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98     DBCommon::VectorToString(originDev, logInfo.originDev);
99     logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100     logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105     logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107     return index;
108 }
109 
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111     const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112     VBucket &assetInfo)
113 {
114     int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115     int errCode = E_OK;
116     for (const auto &field: assetFields) {
117         Type cloudValue;
118         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119         if (errCode != E_OK) {
120             break;
121         }
122         errCode = PutVBucketByType(assetInfo, field, cloudValue);
123         if (errCode != E_OK) {
124             break;
125         }
126     }
127     if (errCode != E_OK) {
128         LOGE("set asset field failed, errCode = %d", errCode);
129         return errCode;
130     }
131 
132     // fill primary key
133     for (const auto &item : pkMap) {
134         Type cloudValue;
135         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136         if (errCode != E_OK) {
137             break;
138         }
139         errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140         if (errCode != E_OK) {
141             break;
142         }
143     }
144     return errCode;
145 }
146 
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149     std::string sql = "insert into " + tableSchema.name + "(";
150     for (const auto &field : tableSchema.fields) {
151         sql += field.colName + ",";
152     }
153     sql.pop_back();
154     sql += ") values(";
155     for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156         sql += "?,";
157     }
158     sql.pop_back();
159     sql += ");";
160     return sql;
161 }
162 
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164     const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166     int errCode = E_OK;
167     TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168     // table name in cloud schema is in lower case
169     if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170         LOGE("localSchema doesn't contain table from cloud");
171         return -E_INTERNAL_ERROR;
172     }
173 
174     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175     if (pkMap.size() == 0) {
176         int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177         std::vector<uint8_t> value;
178         DBCommon::StringToVector(std::to_string(rowid), value);
179         errCode = DBCommon::CalcValueHash(value, hashValue);
180     } else {
181         std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182             tableSchema, localTable, pkMap, allowEmpty);
183     }
184     return errCode;
185 }
186 
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188     const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190     int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191     if (errCode != E_OK) {
192         LOGE("Get select log statement failed, %d", errCode);
193         return errCode;
194     }
195 
196     std::string cloudGid;
197     int ret = E_OK;
198     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200         SQLiteUtils::ResetStatement(selectStmt, true, ret);
201         LOGE("Get cloud gid fail when bind query log statement.");
202         return errCode;
203     }
204 
205     int index = 0;
206     if (!cloudGid.empty()) {
207         index++;
208         errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209         if (errCode != E_OK) {
210             LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211             SQLiteUtils::ResetStatement(selectStmt, true, ret);
212             return errCode;
213         }
214     }
215 
216     index++;
217     errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218     if (errCode != E_OK) {
219         LOGE("Bind hash key to query log statement failed. %d", errCode);
220         SQLiteUtils::ResetStatement(selectStmt, true, ret);
221     }
222     return errCode != E_OK ? errCode : ret;
223 }
224 
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226     const std::set<std::string> &pkSet, std::string &querySql)
227 {
228     std::string cloudGid;
229     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230     if (errCode != E_OK) {
231         LOGE("Get cloud gid fail when query log table.");
232         return errCode;
233     }
234 
235     if (pkSet.empty() && cloudGid.empty()) {
236         LOGE("query log table failed because of both primary key and gid are empty.");
237         return -E_CLOUD_ERROR;
238     }
239     std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240         " sharing_resource, status, version FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
241         "_log WHERE ";
242     if (!cloudGid.empty()) {
243         sql += "cloud_gid = ? OR ";
244     }
245     sql += "hash_key = ?";
246 
247     querySql = sql;
248     return E_OK;
249 }
250 
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)251 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
252     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
253     std::map<int, int> &statisticMap)
254 {
255     int index = 0;
256     int errCode = E_OK;
257     for (OpType op : downloadData.opType) {
258         VBucket &vBucket = downloadData.data[index];
259         switch (op) {
260             case OpType::INSERT:
261                 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
262                 break;
263             case OpType::UPDATE:
264                 errCode = UpdateCloudData(vBucket, tableSchema);
265                 break;
266             case OpType::DELETE:
267                 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
268                 break;
269             case OpType::ONLY_UPDATE_GID:
270             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
271             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
272             case OpType::UPDATE_TIMESTAMP:
273             case OpType::CLEAR_GID:
274             case OpType::LOCKED_NOT_HANDLE:
275                 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
276                 [[fallthrough]];
277             case OpType::NOT_HANDLE:
278                 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
279                     GetLocalDataKey(index, downloadData), op) : errCode;
280                 break;
281             default:
282                 errCode = -E_CLOUD_ERROR;
283                 break;
284         }
285         if (errCode != E_OK) {
286             LOGE("put cloud sync data fail: %d", errCode);
287             return errCode;
288         }
289         statisticMap[static_cast<int>(op)]++;
290         index++;
291     }
292     return errCode;
293 }
294 
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)295 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
296     const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
297     std::vector<Asset> &assets, std::vector<std::string> &notifyTableList)
298 {
299     int errCode = SetLogTriggerStatus(false);
300     if (errCode != E_OK) {
301         LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
302         return errCode;
303     }
304     if (mode == FLAG_ONLY) {
305         errCode = DoCleanLogs(tableNameList, localSchema);
306         if (errCode != E_OK) {
307             LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
308             return errCode;
309         }
310         notifyTableList = tableNameList;
311     } else if (mode == FLAG_AND_DATA) {
312         errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
313         if (errCode != E_OK) {
314             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
315             return errCode;
316         }
317         notifyTableList = tableNameList;
318     } else if (mode == CLEAR_SHARED_TABLE) {
319         errCode = DoCleanShareTableDataAndLog(tableNameList);
320         if (errCode != E_OK) {
321             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
322             return errCode;
323         }
324         notifyTableList = tableNameList;
325     }
326     for (const auto &tableName: tableNameList) {
327         errCode = CleanDownloadingFlag(tableName);
328         if (errCode != E_OK) {
329             LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu",
330                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
331             return errCode;
332         }
333     }
334     errCode = SetLogTriggerStatus(true);
335     if (errCode != E_OK) {
336         LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
337     }
338 
339     return errCode;
340 }
341 
DoClearCloudLogVersion(const std::vector<std::string> & tableNameList)342 int SQLiteSingleVerRelationalStorageExecutor::DoClearCloudLogVersion(const std::vector<std::string> &tableNameList)
343 {
344     int errCode = E_OK;
345     for (const auto &tableName: tableNameList) {
346         std::string logTableName = DBCommon::GetLogTableName(tableName);
347         LOGD("[Storage Executor] Start clear version on log table.");
348         errCode = ClearVersionOnLogTable(logTableName);
349         if (errCode != E_OK) {
350             std::string maskedName = DBCommon::StringMiddleMasking(tableName);
351             LOGE("[Storage Executor] failed to clear version of cloud data on log table %s (name length is %zu), %d",
352                 maskedName.c_str(), maskedName.length(), errCode);
353             return errCode;
354         }
355     }
356     return errCode;
357 }
358 
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)359 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
360     const RelationalSchemaObject &localSchema)
361 {
362     int errCode = E_OK;
363     int i = 1;
364     for (const auto &tableName: tableNameList) {
365         std::string logTableName = DBCommon::GetLogTableName(tableName);
366         LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
367         errCode = DoCleanAssetId(tableName, localSchema);
368         if (errCode != E_OK) {
369             LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
370             return errCode;
371         }
372         int32_t count = 0;
373         (void)GetFlagIsLocalCount(logTableName, count);
374         LOGI("[DoCleanLogs]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
375             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
376         errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
377         if (errCode != E_OK) {
378             LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
379             return errCode;
380         }
381         (void)GetFlagIsLocalCount(logTableName, count);
382         LOGI("[DoCleanLogs]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
383             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
384         i++;
385     }
386 
387     return errCode;
388 }
389 
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)390 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
391 {
392     if (ctx == nullptr || argc != 0 || argv == nullptr) {
393         LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
394         return;
395     }
396     auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
397     if (context == nullptr) {
398         LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
399         return;
400     }
401     context->cursor++;
402     sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
403 }
404 
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const405 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
406     void(*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
407 {
408     std::string sql = "update_cursor";
409     int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
410         &context, updateCursor, nullptr, nullptr, nullptr);
411     if (errCode != SQLITE_OK) {
412         LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
413         return SQLiteUtils::MapSQLiteErrno(errCode);
414     }
415     return E_OK;
416 }
417 
GetCursor(const std::string & tableName,uint64_t & cursor)418 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName, uint64_t &cursor)
419 {
420     return SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
421 }
422 
SetCursor(const std::string & tableName,uint64_t cursor)423 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, uint64_t cursor)
424 {
425     std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata SET VALUE = ? where KEY = ?;";
426     sqlite3_stmt *stmt = nullptr;
427     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
428     if (errCode != E_OK) {
429         LOGE("Set cursor sql failed=%d", errCode);
430         return errCode;
431     }
432     ResFinalizer finalizer([stmt]() {
433         sqlite3_stmt *statement = stmt;
434         int ret = E_OK;
435         SQLiteUtils::ResetStatement(statement, true, ret);
436         if (ret != E_OK) {
437             LOGW("Reset stmt failed %d when set cursor", ret);
438         }
439     });
440     int index = 1;
441     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
442     if (errCode != E_OK) {
443         LOGE("Bind saved cursor failed:%d", errCode);
444         return errCode;
445     }
446     Key key;
447     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
448     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
449     if (errCode != E_OK) {
450         return errCode;
451     }
452     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
453     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
454         errCode = E_OK;
455     }
456     return errCode;
457 }
458 
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)459 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
460     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
461 {
462     int errCode = E_OK;
463     for (size_t i = 0; i < tableNameList.size(); i++) {
464         std::string tableName = tableNameList[i];
465         std::string logTableName = DBCommon::GetLogTableName(tableName);
466         std::vector<int64_t> dataKeys;
467         errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
468         if (errCode != E_OK) {
469             LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
470             return errCode;
471         }
472 
473         std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
474         errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
475         if (errCode != E_OK) {
476             LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
477             return errCode;
478         }
479         int32_t count = 0;
480         (void)GetFlagIsLocalCount(logTableName, count);
481         LOGI("[DoCleanLogAndData]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
482             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
483         if (isLogicDelete_) {
484             errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
485         } else {
486             errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
487         }
488         if (errCode != E_OK) {
489             LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
490             return errCode;
491         }
492         (void)GetFlagIsLocalCount(logTableName, count);
493         LOGI("[DoCleanLogAndData]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
494             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
495     }
496     return errCode;
497 }
498 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)499 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
500     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
501 {
502     int errCode = E_OK;
503     int ret = E_OK;
504     sqlite3_stmt *selectStmt = nullptr;
505     for (const auto &rowId : dataKeys) {
506         std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
507             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
508         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
509         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
510             LOGE("Get select asset statement failed, %d", errCode);
511             return errCode;
512         }
513         errCode = SQLiteUtils::StepWithRetry(selectStmt);
514         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
515             std::vector<uint8_t> blobValue;
516             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
517             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
518                 LOGE("Get column blob value failed, %d", errCode);
519                 goto END;
520             }
521             if (blobValue.empty()) {
522                 SQLiteUtils::ResetStatement(selectStmt, true, ret);
523                 continue;
524             }
525             Asset asset;
526             errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
527             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
528                 LOGE("Transfer blob to asset failed, %d", errCode);
529                 goto END;
530             }
531             assets.push_back(asset);
532         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
533             errCode = E_OK;
534             Asset asset;
535             assets.push_back(asset);
536         }
537         SQLiteUtils::ResetStatement(selectStmt, true, ret);
538     }
539     return errCode != E_OK ? errCode : ret;
540 END:
541     SQLiteUtils::ResetStatement(selectStmt, true, ret);
542     return errCode != E_OK ? errCode : ret;
543 }
544 
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)545 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
546     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
547 {
548     int errCode = E_OK;
549     int ret = E_OK;
550     sqlite3_stmt *selectStmt = nullptr;
551     for (const auto &rowId : dataKeys) {
552         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
553             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
554         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
555         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
556             LOGE("Get select assets statement failed, %d", errCode);
557             goto END;
558         }
559         errCode = SQLiteUtils::StepWithRetry(selectStmt);
560         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
561             std::vector<uint8_t> blobValue;
562             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
563             if (errCode != E_OK) {
564                 goto END;
565             }
566             Assets tmpAssets;
567             errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
568             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
569                 goto END;
570             }
571             for (const auto &asset: tmpAssets) {
572                 assets.push_back(asset);
573             }
574         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
575             errCode = E_OK;
576         }
577         SQLiteUtils::ResetStatement(selectStmt, true, ret);
578     }
579     return errCode != E_OK ? errCode : ret;
580 END:
581     SQLiteUtils::ResetStatement(selectStmt, true, ret);
582     return errCode != E_OK ? errCode : ret;
583 }
584 
GetCloudDataCount(const std::string & tableName,DownloadData & downloadData,int64_t & count)585 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataCount(const std::string &tableName,
586     DownloadData &downloadData, int64_t &count)
587 {
588     if (downloadData.data.empty()) {
589         return E_OK;
590     }
591     int errCode = E_OK;
592     sqlite3_stmt *queryStmt = nullptr;
593     std::string querySql = "SELECT COUNT(*) FROM '" + DBCommon::GetLogTableName(tableName) +
594         "' WHERE FLAG & 0x02 = 0 AND CLOUD_GID IN (";
595     for (const VBucket &vBucket : downloadData.data) {
596         std::string cloudGid;
597         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
598         if (errCode != E_OK) {
599             LOGE("get gid for query log statement failed, %d", errCode);
600             return -E_CLOUD_ERROR;
601         }
602         querySql += "'" + cloudGid + "',";
603     }
604     querySql.pop_back();
605     querySql += ");";
606     errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, queryStmt);
607     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
608         LOGE("Get query count statement failed, %d", errCode);
609         return errCode;
610     }
611     errCode = SQLiteUtils::StepWithRetry(queryStmt);
612     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
613         count = static_cast<int64_t>(sqlite3_column_int64(queryStmt, 0));
614         errCode = E_OK;
615     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
616         errCode = E_OK;
617     }
618     int ret = E_OK;
619     SQLiteUtils::ResetStatement(queryStmt, true, ret);
620     return errCode != E_OK ? errCode : ret;
621 }
622 
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)623 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
624     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
625 {
626     int errCode = E_OK;
627     for (const auto &fieldInfo: fieldInfos) {
628         if (fieldInfo.IsAssetType()) {
629             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
630             if (errCode != E_OK) {
631                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
632                 return errCode;
633             }
634         } else if (fieldInfo.IsAssetsType()) {
635             errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
636             if (errCode != E_OK) {
637                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
638                 return errCode;
639             }
640         }
641     }
642     return errCode;
643 }
644 
SetCursorIncFlag(bool flag)645 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
646 {
647     std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
648         " VALUES ('cursor_inc_flag', ";
649     if (flag) {
650         sql += "'true'";
651     } else {
652         sql += "'false'";
653     }
654     sql += ");";
655     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
656     if (errCode != E_OK) {
657         LOGE("set cursor inc flag fail, errCode = %d", errCode);
658     }
659     return errCode;
660 }
661 
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)662 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
663     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
664 {
665     if (downloadData.data.size() != downloadData.opType.size()) {
666         LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
667              downloadData.opType.size());
668         return -E_CLOUD_ERROR;
669     }
670 
671     int errCode = SetLogTriggerStatus(false);
672     if (errCode != E_OK) {
673         LOGE("Fail to set log trigger off, %d", errCode);
674         return errCode;
675     }
676 
677     std::map<int, int> statisticMap = {};
678     errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
679     if (errCode != E_OK) {
680         LOGE("ExecutePutCloudData failed, %d", errCode);
681     }
682     int ret = SetLogTriggerStatus(true);
683     if (ret != E_OK) {
684         LOGE("Fail to set log trigger on, %d", ret);
685     }
686     int64_t count = 0;
687     int errCodeCount = GetCloudDataCount(tableName, downloadData, count);
688     if (errCodeCount != E_OK) {
689         LOGW("get cloud data count failed, %d", errCodeCount);
690     }
691     LOGI("save cloud data of table %s [length %zu]:%d, cloud data count:%lld, ins:%d, upd:%d, del:%d, only gid:%d,"
692         "flag zero:%d, flag one:%d, upd timestamp:%d, clear gid:%d, not handle:%d, lock:%d",
693          DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode, count,
694          statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
695          statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
696          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
697          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
698          statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
699          statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
700     return errCode == E_OK ? ret : errCode;
701 }
702 
UpdateAssetStatusForAssetOnly(const TableSchema & tableSchema,VBucket & vBucket)703 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetStatusForAssetOnly(
704     const TableSchema &tableSchema, VBucket &vBucket)
705 {
706     std::string cloudGid;
707     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
708     if (errCode != E_OK) {
709         LOGE("Miss gid when fill Asset %d.", errCode);
710         return errCode;
711     }
712     std::vector<Field> assetsField;
713     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
714     if (errCode != E_OK) {
715         LOGE("No assets need to be filled when download assets only, err:%d.", errCode);
716         return errCode;
717     }
718 
719     sqlite3_stmt *stmt = nullptr;
720     errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
721     if (errCode != E_OK) {
722         LOGE("can not get assetsField from tableSchema err:%d when download assets only.", errCode);
723         return errCode;
724     }
725     return ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
726 }
727 
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)728 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
729     const TrackerTable &trackerTable, int64_t dataKey)
730 {
731     int errCode = E_OK;
732     if (dataKey > 0) {
733         errCode = RemoveDataAndLog(tableSchema.name, dataKey);
734         if (errCode != E_OK) {
735             return errCode;
736         }
737     }
738     std::string sql = GetInsertSqlForCloudSync(tableSchema);
739     sqlite3_stmt *insertStmt = nullptr;
740     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
741     if (errCode != E_OK) {
742         LOGE("Get insert statement failed when save cloud data, %d", errCode);
743         return errCode;
744     }
745     if (putDataMode_ == PutDataMode::SYNC) {
746         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
747     }
748     int ret = E_OK;
749     errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
750     if (errCode != E_OK) {
751         SQLiteUtils::ResetStatement(insertStmt, true, ret);
752         return errCode;
753     }
754     // insert data
755     errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
756     SQLiteUtils::ResetStatement(insertStmt, true, ret);
757     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
758         LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
759         return errCode;
760     }
761 
762     // insert log
763     return InsertLogRecord(tableSchema, trackerTable, vBucket);
764 }
765 
GetInsertLogSql(const std::string & logTableName,const std::set<std::string> & extendColNames)766 std::string GetInsertLogSql(const std::string &logTableName, const std::set<std::string> &extendColNames)
767 {
768     if (extendColNames.empty()) {
769         return "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " +
770             "CASE WHEN (SELECT status FROM " + logTableName + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
771             "(SELECT status FROM " + logTableName + " WHERE hash_key=?) " + "END)";
772     }
773     std::string sql = "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, json_object(";
774     for (const auto &extendColName : extendColNames) {
775         sql += "'" + extendColName + "',?,";
776     }
777     sql.pop_back();
778     sql += "), 0, ?, ?, CASE WHEN (SELECT status FROM " + logTableName +
779         " WHERE hash_key=?) IS NULL THEN 0 ELSE " + "(SELECT status FROM " + logTableName +
780         " WHERE hash_key=?) " + "END)";
781     return sql;
782 }
783 
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)784 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
785     const TrackerTable &trackerTable, VBucket &vBucket)
786 {
787     if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
788         // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
789         // so we need to delete the old log record according to the gid first
790         std::string gidStr;
791         int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
792         if (errCode != E_OK || gidStr.empty()) {
793             LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
794             return errCode;
795         }
796         std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
797             + gidStr + "';";
798         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
799         if (errCode != E_OK) {
800             LOGE("delete log record according gid fail, errCode = %d", errCode);
801             return errCode;
802         }
803     }
804 
805     std::string sql = GetInsertLogSql(DBCommon::GetLogTableName(tableSchema.name), trackerTable.GetExtendNames());
806     sqlite3_stmt *insertLogStmt = nullptr;
807     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
808     if (errCode != E_OK) {
809         LOGE("Get insert log statement failed when save cloud data, %d", errCode);
810         return errCode;
811     }
812 
813     int ret = E_OK;
814     errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
815     if (errCode != E_OK) {
816         SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
817         return errCode;
818     }
819 
820     errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
821     SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
822     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
823         return ret;
824     } else {
825         LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
826         return errCode;
827     }
828 }
829 
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)830 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
831     sqlite3_stmt *updateStmt)
832 {
833     auto it = bindCloudFieldFuncMap_.find(field.type);
834     if (it == bindCloudFieldFuncMap_.end()) {
835         LOGE("unknown cloud type when bind one field.");
836         return -E_CLOUD_ERROR;
837     }
838     return it->second(index, vBucket, field, updateStmt);
839 }
840 
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)841 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
842     const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
843 {
844     int errCode = E_OK;
845     int index = 0;
846     for (const auto &field : fields) {
847         index++;
848         errCode = BindOneField(index, vBucket, field, upsertStmt);
849         if (errCode != E_OK) {
850             return errCode;
851         }
852     }
853     return errCode;
854 }
855 
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey,int & index)856 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
857     std::vector<uint8_t> &hashKey, int &index)
858 {
859     int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
860     if (errCode != E_OK) {
861         LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
862         return errCode;
863     }
864 
865     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
866     if (errCode != E_OK) {
867         LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
868         return errCode;
869     }
870     return errCode;
871 }
872 
BindExtendValue(const VBucket & vBucket,const TrackerTable & trackerTable,sqlite3_stmt * stmt,int & index)873 int BindExtendValue(const VBucket &vBucket, const TrackerTable &trackerTable, sqlite3_stmt *stmt, int &index)
874 {
875     const std::set<std::string> &extendColNames = trackerTable.GetExtendNames();
876     int errCode = E_OK;
877     int extendValueIndex = index;
878     if (extendColNames.empty()) {
879         return SQLiteUtils::BindTextToStatement(stmt, index++, "");
880     }
881     for (const auto &extendColName : extendColNames) {
882         if (vBucket.find(extendColName) == vBucket.end()) {
883             errCode = SQLiteUtils::BindTextToStatement(stmt, extendValueIndex++, "");
884         } else {
885             Type extendValue = vBucket.at(extendColName);
886             errCode = SQLiteRelationalUtils::BindStatementByType(stmt, extendValueIndex++, extendValue);
887         }
888         if (errCode != E_OK) {
889             const std::string tableName = DBCommon::StringMiddleMasking(trackerTable.GetTableName());
890             size_t nameLength = trackerTable.GetTableName().size();
891             LOGE("[%s [%zu]] Bind extend field failed: %d", tableName.c_str(), nameLength, errCode);
892             return errCode;
893         }
894     }
895     index = extendValueIndex;
896     return E_OK;
897 }
898 
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt,int & index)899 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
900     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt, int &index)
901 {
902     std::vector<uint8_t> hashKey;
903     int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
904     if (errCode != E_OK) {
905         return errCode;
906     }
907     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
908     if (errCode != E_OK) {
909         LOGE("Bind hash_key to insert log statement failed, %d", errCode);
910         return errCode;
911     }
912 
913     std::string cloudGid;
914     if (putDataMode_ == PutDataMode::SYNC) {
915         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
916         if (errCode != E_OK) {
917             LOGE("get gid for insert log statement failed, %d", errCode);
918             return -E_CLOUD_ERROR;
919         }
920     }
921 
922     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, cloudGid); // next is cloud_gid
923     if (errCode != E_OK) {
924         LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
925         return errCode;
926     }
927 
928     errCode = BindExtendValue(vBucket, trackerTable, insertLogStmt, index); // next is extend_field
929     if (errCode != E_OK) {
930         LOGE("Bind extend_field to insert log statement failed, %d", errCode);
931         return errCode;
932     }
933 
934     errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt, index);
935     if (errCode != E_OK) {
936         return errCode;
937     }
938     return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey, index);
939 }
940 
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)941 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
942     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
943 {
944     int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
945     int bindIndex = 1; // 1 is rowid
946     int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, rowid);
947     if (errCode != E_OK) {
948         LOGE("Bind rowid to insert log statement failed, %d", errCode);
949         return errCode;
950     }
951 
952     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is device
953     if (errCode != E_OK) {
954         LOGE("Bind device to insert log statement failed, %d", errCode);
955         return errCode;
956     }
957 
958     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is ori_device
959     if (errCode != E_OK) {
960         LOGE("Bind ori_device to insert log statement failed, %d", errCode);
961         return errCode;
962     }
963 
964     int64_t val = 0;
965     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
966     if (errCode != E_OK) {
967         LOGE("get modify time for insert log statement failed, %d", errCode);
968         return -E_CLOUD_ERROR;
969     }
970 
971     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is timestamp
972     if (errCode != E_OK) {
973         LOGE("Bind timestamp to insert log statement failed, %d", errCode);
974         return errCode;
975     }
976 
977     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
978     if (errCode != E_OK) {
979         LOGE("get create time for insert log statement failed, %d", errCode);
980         return -E_CLOUD_ERROR;
981     }
982 
983     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is wtimestamp
984     if (errCode != E_OK) {
985         LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
986         return errCode;
987     }
988 
989     errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, bindIndex++, GetDataFlag())); // next is flag
990     if (errCode != E_OK) {
991         LOGE("Bind flag to insert log statement failed, %d", errCode);
992         return errCode;
993     }
994 
995     vBucket[DBConstant::ROWID] = rowid; // fill rowid to cloud data to notify user
996     return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt, bindIndex);
997 }
998 
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)999 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
1000     const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
1001 {
1002     std::string where = " WHERE";
1003     if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
1004         where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
1005             DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
1006     }
1007     if (!pkSet.empty() && queryByPk) {
1008         if (!gidStr.empty()) {
1009             where += " OR";
1010         }
1011         where += " (1 = 1";
1012         for (const auto &pk : pkSet) {
1013             where += (" AND " + pk + " = ?");
1014         }
1015         where += ");";
1016     }
1017     return where;
1018 }
1019 
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)1020 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
1021     const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
1022     std::string &updateSql)
1023 {
1024     if (pkSet.empty() && gidStr.empty()) {
1025         LOGE("update data fail because both primary key and gid is empty.");
1026         return -E_CLOUD_ERROR;
1027     }
1028     std::string sql = "UPDATE " + tableSchema.name + " SET";
1029     for (const auto &field : updateFields) {
1030         sql +=  " " + field.colName + " = ?,";
1031     }
1032     sql.pop_back();
1033     sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
1034     updateSql = sql;
1035     return E_OK;
1036 }
1037 
IsGidValid(const std::string & gidStr)1038 static inline bool IsGidValid(const std::string &gidStr)
1039 {
1040     if (!gidStr.empty()) {
1041         return gidStr.find("'") == std::string::npos;
1042     }
1043     return true;
1044 }
1045 
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)1046 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
1047     const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
1048 {
1049     std::string gidStr;
1050     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1051     if (errCode != E_OK) {
1052         LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
1053         return errCode;
1054     }
1055     if (!IsGidValid(gidStr)) {
1056         LOGE("invalid char in cloud gid");
1057         return -E_CLOUD_ERROR;
1058     }
1059 
1060     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1061     auto updateFields = GetUpdateField(vBucket, tableSchema);
1062     std::string updateSql;
1063     errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
1064     if (errCode != E_OK) {
1065         return errCode;
1066     }
1067 
1068     errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
1069     if (errCode != E_OK) {
1070         LOGE("Get update statement failed when update cloud data, %d", errCode);
1071         return errCode;
1072     }
1073 
1074     // bind value
1075     if (!pkSet.empty()) {
1076         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1077         updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
1078     }
1079     errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
1080     if (errCode != E_OK) {
1081         LOGE("bind value to update statement failed when update cloud data, %d", errCode);
1082         int ret = E_OK;
1083         SQLiteUtils::ResetStatement(updateStmt, true, ret);
1084     }
1085     return errCode;
1086 }
1087 
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)1088 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
1089 {
1090     if (putDataMode_ == PutDataMode::SYNC) {
1091         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
1092     }
1093     sqlite3_stmt *updateStmt = nullptr;
1094     int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
1095     if (errCode != E_OK) {
1096         LOGE("Get update data table statement fail, %d", errCode);
1097         return errCode;
1098     }
1099 
1100     // update data
1101     int ret = E_OK;
1102     errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
1103     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1104         errCode = E_OK;
1105     } else {
1106         LOGE("update data failed when save cloud data:%d", errCode);
1107         SQLiteUtils::ResetStatement(updateStmt, true, ret);
1108         return errCode;
1109     }
1110     SQLiteUtils::ResetStatement(updateStmt, true, ret);
1111 
1112     // update log
1113     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
1114     if (errCode != E_OK) {
1115         LOGE("update log record failed when update cloud data, errCode = %d", errCode);
1116     }
1117     return errCode;
1118 }
1119 
IsAllowWithPrimaryKey(OpType opType)1120 static inline bool IsAllowWithPrimaryKey(OpType opType)
1121 {
1122     return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
1123         opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
1124 }
1125 
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1126 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1127     OpType opType)
1128 {
1129     sqlite3_stmt *updateLogStmt = nullptr;
1130     std::vector<std::string> updateColName;
1131     int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1132     if (errCode != E_OK) {
1133         LOGE("Get update log statement failed, errCode = %d", errCode);
1134         return errCode;
1135     }
1136 
1137     errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1138         updateLogStmt);
1139     int ret = E_OK;
1140     if (errCode != E_OK) {
1141         LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1142         SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1143         return errCode != E_OK ? errCode : ret;
1144     }
1145 
1146     errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1147     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1148         errCode = E_OK;
1149     } else {
1150         LOGE("update log record failed when update cloud data:%d", errCode);
1151     }
1152     SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1153     return errCode != E_OK ? errCode : ret;
1154 }
1155 
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1156 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1157     const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1158     sqlite3_stmt *updateLogStmt)
1159 {
1160     int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1161     if (errCode != E_OK) {
1162         return errCode;
1163     }
1164     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1165     if (pkMap.empty()) {
1166         return E_OK;
1167     }
1168 
1169     std::vector<uint8_t> hashKey;
1170     errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1171     if (errCode != E_OK) {
1172         return errCode;
1173     }
1174     return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1175 }
1176 
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1177 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1178     const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1179 {
1180     std::string gidStr;
1181     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1182     if (errCode != E_OK) {
1183         LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1184         return errCode;
1185     }
1186     if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1187         LOGE("empty or invalid char in cloud gid");
1188         return -E_CLOUD_ERROR;
1189     }
1190 
1191     bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1192     std::string deleteSql = "DELETE FROM " + tableSchema.name;
1193     deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1194     errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1195     if (errCode != E_OK) {
1196         LOGE("Get delete statement failed when delete data, %d", errCode);
1197         return errCode;
1198     }
1199 
1200     int ret = E_OK;
1201     if (!pkSet.empty() && queryByPk) {
1202         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1203         errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1204         if (errCode != E_OK) {
1205             LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1206             SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1207         }
1208     }
1209     return errCode != E_OK ? errCode : ret;
1210 }
1211 
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1212 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1213     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1214 {
1215     if (isLogicDelete_) {
1216         return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1217     }
1218     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1219     sqlite3_stmt *deleteStmt = nullptr;
1220     int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1221     if (errCode != E_OK) {
1222         return errCode;
1223     }
1224     errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1225     int ret = E_OK;
1226     SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1227     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1228         LOGE("delete data failed when sync with cloud:%d", errCode);
1229         return errCode;
1230     }
1231     if (ret != E_OK) {
1232         LOGE("reset delete statement failed:%d", ret);
1233         return ret;
1234     }
1235 
1236     // update log
1237     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1238     if (errCode != E_OK) {
1239         LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1240     }
1241     return errCode;
1242 }
1243 
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1244 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1245     const TableSchema &tableSchema, OpType opType)
1246 {
1247     return UpdateLogRecord(vBucket, tableSchema, opType);
1248 }
1249 
DeleteTableTrigger(const std::string & missTable) const1250 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1251 {
1252     static const char *triggerEndName[] = {
1253         "_ON_INSERT",
1254         "_ON_UPDATE",
1255         "_ON_DELETE"
1256     };
1257     std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1258     for (const auto &endName : triggerEndName) {
1259         std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1260         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1261         if (errCode != E_OK) {
1262             LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1263             return errCode;
1264         }
1265     }
1266     return E_OK;
1267 }
1268 
SetLogicDelete(bool isLogicDelete)1269 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1270 {
1271     isLogicDelete_ = isLogicDelete;
1272 }
1273 
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1274 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1275     const std::string &status, const Key &hashKey)
1276 {
1277     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1278     sqlite3_stmt *stmt = nullptr;
1279     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1280     if (errCode != E_OK) {
1281         LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1282         return errCode;
1283     }
1284     int ret = E_OK;
1285     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1286     if (errCode != E_OK) {
1287         LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1288         SQLiteUtils::ResetStatement(stmt, true, ret);
1289         return errCode;
1290     }
1291     errCode = SQLiteUtils::StepWithRetry(stmt);
1292     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1293         errCode = E_OK;
1294     } else {
1295         LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1296     }
1297     SQLiteUtils::ResetStatement(stmt, true, ret);
1298     return errCode == E_OK ? ret : errCode;
1299 }
1300 
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1301 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1302 {
1303     maxUploadCount_ = maxUploadCount;
1304     maxUploadSize_ = maxUploadSize;
1305 }
1306 
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1307 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1308     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1309 {
1310     LOGD("[RDBExecutor] logic delete skip delete data");
1311     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1312     if (errCode != E_OK) {
1313         return errCode;
1314     }
1315     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1316     if (errCode != E_OK) {
1317         return errCode;
1318     }
1319     if (!trackerTable.IsEmpty()) {
1320         return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1321     }
1322     return E_OK;
1323 }
1324 
InitCursorToMeta(const std::string & tableName) const1325 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName) const
1326 {
1327     return SQLiteRelationalUtils::InitCursorToMeta(dbHandle_, isMemDb_, tableName);
1328 }
1329 
SetTableSchema(const TableSchema & tableSchema)1330 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1331 {
1332     tableSchema_ = tableSchema;
1333 }
1334 
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)1335 int SQLiteSingleVerRelationalStorageExecutor::ReviseLocalModTime(const std::string &tableName,
1336     const std::vector<ReviseModTimeInfo> &revisedData)
1337 {
1338     sqlite3_stmt *stmt = nullptr;
1339     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1340         " SET timestamp=? where hash_key=? AND timestamp=?";
1341     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1342     if (errCode != E_OK) {
1343         LOGE("[RDBExecutor][ReviseLocalModTime] Get stmt failed: %d", errCode);
1344         return errCode;
1345     }
1346     ResFinalizer finalizer([stmt]() {
1347         sqlite3_stmt *statement = stmt;
1348         int ret = E_OK;
1349         SQLiteUtils::ResetStatement(statement, true, ret);
1350         if (ret != E_OK) {
1351             LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed %d", ret);
1352         }
1353     });
1354     for (auto &data : revisedData) {
1355         int resetCode = E_OK;
1356         errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, data.curTime); // 1st bind modify time
1357         if (errCode != E_OK) {
1358             LOGE("[RDBExecutor][ReviseLocalModTime] Bind revised modify time failed: %d", errCode);
1359             return errCode;
1360         }
1361         errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, data.hashKey); // 2nd bind hash key
1362         if (errCode != E_OK) {
1363             LOGE("[RDBExecutor][ReviseLocalModTime] Bind hash key failed: %d", errCode);
1364             return errCode;
1365         }
1366         errCode = SQLiteUtils::BindInt64ToStatement(stmt, 3, data.invalidTime); // 3rd bind modify time
1367         if (errCode != E_OK) {
1368             LOGE("[RDBExecutor][ReviseLocalModTime] Bind modify time failed: %d", errCode);
1369             return errCode;
1370         }
1371         errCode = SQLiteUtils::StepWithRetry(stmt);
1372         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1373             LOGE("[RDBExecutor][ReviseLocalModTime] Revise failed: %d", errCode);
1374             return errCode;
1375         }
1376         LOGI("[RDBExecutor][ReviseLocalModTime] Local data mod time revised from %lld to %lld",
1377             data.invalidTime, data.curTime);
1378         SQLiteUtils::ResetStatement(stmt, false, resetCode);
1379         if (resetCode != E_OK) {
1380             LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed: %d", resetCode);
1381             return resetCode;
1382         }
1383     }
1384     return E_OK;
1385 }
1386 
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo,bool & isNotIncCursor)1387 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1388     const VBucket &vBucket, const Field &field, VBucket &assetInfo, bool &isNotIncCursor)
1389 {
1390     if (field.type == TYPE_INDEX<Asset>) {
1391         Asset asset;
1392         UpdateLocalAssetId(vBucket, field.colName, asset);
1393         Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1394         if (assetDBPtr == nullptr) {
1395             isNotIncCursor = true;
1396             return true;
1397         }
1398         const Asset &assetDB = *assetDBPtr;
1399         if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1400             isNotIncCursor = true;
1401             return true;
1402         }
1403     }
1404     if (field.type == TYPE_INDEX<Assets>) {
1405         Assets assets;
1406         UpdateLocalAssetsId(vBucket, field.colName, assets);
1407         Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1408         if (assetsDBPtr == nullptr) {
1409             isNotIncCursor = true;
1410             return true;
1411         }
1412         Assets &assetsDB = *assetsDBPtr;
1413         if (assets.size() != assetsDB.size()) {
1414             isNotIncCursor = true;
1415             return true;
1416         }
1417         for (uint32_t i = 0; i < assets.size(); ++i) {
1418             if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1419                 isNotIncCursor = true;
1420                 return true;
1421             }
1422         }
1423     }
1424     return false;
1425 }
1426 
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket,bool & isNotIncCursor)1427 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1428     const VBucket &vBucket, bool &isNotIncCursor)
1429 {
1430     std::vector<Field> assetFields;
1431     for (const auto &field : tableSchema.fields) {
1432         if (field.type == TYPE_INDEX<Asset>) {
1433             assetFields.push_back(field);
1434         }
1435         if (field.type == TYPE_INDEX<Assets>) {
1436             assetFields.push_back(field);
1437         }
1438     }
1439     if (assetFields.empty()) {
1440         return false;
1441     }
1442     sqlite3_stmt *selectStmt = nullptr;
1443     std::string queryAssetsSql = "SELECT ";
1444     for (const auto &field : assetFields) {
1445         queryAssetsSql += field.colName + ",";
1446     }
1447     queryAssetsSql.pop_back();
1448     queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1449         std::to_string(dataKey) + ";";
1450     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1451     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1452         LOGE("Get select assets statement failed, %d.", errCode);
1453         return true;
1454     }
1455     ResFinalizer finalizer([selectStmt]() {
1456         sqlite3_stmt *statementInner = selectStmt;
1457         int ret = E_OK;
1458         SQLiteUtils::ResetStatement(statementInner, true, ret);
1459         if (ret != E_OK) {
1460             LOGW("Reset stmt failed %d when get asset", ret);
1461         }
1462     });
1463     VBucket assetInfo;
1464     errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1465     if (errCode != E_OK) {
1466         return true;
1467     }
1468     return std::any_of(assetFields.begin(), assetFields.end(), [&](const Field &field) {
1469         return IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo, isNotIncCursor);
1470     });
1471 }
1472 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1473 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1474     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1475 {
1476     if (downloadData.data.size() != downloadData.opType.size()) {
1477         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1478         return -E_CLOUD_ERROR;
1479     }
1480     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1481         " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1482     sqlite3_stmt *stmt = nullptr;
1483     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1484     if (errCode != E_OK) {
1485         LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1486         return errCode;
1487     }
1488     int ret = E_OK;
1489     int index = 0;
1490     for (const auto &data: downloadData.data) {
1491         SQLiteUtils::ResetStatement(stmt, false, ret);
1492         OpType opType = downloadData.opType[index++];
1493         if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1494             continue;
1495         }
1496         errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1497         if (errCode != E_OK) {
1498             break;
1499         }
1500     }
1501     SQLiteUtils::ResetStatement(stmt, true, ret);
1502     return errCode == E_OK ? ret : errCode;
1503 }
1504 
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1505 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
1506     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1507 {
1508     if (downloadData.data.empty()) {
1509         return E_OK;
1510     }
1511     if (downloadData.data.size() != downloadData.opType.size()) {
1512         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1513         return -E_CLOUD_ERROR;
1514     }
1515     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag|0x1000 WHERE cloud_gid=?;";
1516     sqlite3_stmt *stmt = nullptr;
1517     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1518     if (errCode != E_OK) {
1519         LOGE("[Storage Executor]Get mark flag as asset async download stmt failed, %d.", errCode);
1520         return errCode;
1521     }
1522     int ret = E_OK;
1523     for (const auto &gid : gidFilters) {
1524         SQLiteUtils::ResetStatement(stmt, false, ret);
1525         if (ret != E_OK) {
1526             LOGE("[Storage Executor]Reset stmt failed:%d", ret);
1527             break;
1528         }
1529         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1530         if (errCode != E_OK) {
1531             break;
1532         }
1533         errCode = SQLiteUtils::StepWithRetry(stmt);
1534         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1535             errCode = E_OK;
1536         } else {
1537             LOGW("[Storage Executor]Step mark flag as asset async download stmt failed %d gid %s",
1538                 errCode, gid.c_str());
1539         }
1540     }
1541     SQLiteUtils::ResetStatement(stmt, true, ret);
1542     return errCode == E_OK ? ret : errCode;
1543 }
1544 
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1545 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1546     const CloudSyncBatch &batchData)
1547 {
1548     if (batchData.extend.empty()) {
1549         return E_OK;
1550     }
1551     if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1552         LOGE("invalid sync data for filling version.");
1553         return -E_INVALID_ARGS;
1554     }
1555     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1556         "' SET version = ? WHERE hash_key = ? ";
1557     sqlite3_stmt *stmt = nullptr;
1558     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1559     if (errCode != E_OK) {
1560         return errCode;
1561     }
1562     int ret = E_OK;
1563     for (size_t i = 0; i < batchData.extend.size(); ++i) {
1564         errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1565         if (errCode != E_OK) {
1566             LOGE("bind update version stmt failed.");
1567             SQLiteUtils::ResetStatement(stmt, true, ret);
1568             return errCode;
1569         }
1570     }
1571     SQLiteUtils::ResetStatement(stmt, true, ret);
1572     return ret;
1573 }
1574 
QueryCount(const std::string & tableName,int64_t & count)1575 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1576 {
1577     return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1578 }
1579 
CheckInventoryData(const std::string & tableName)1580 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1581 {
1582     int64_t dataCount = 0;
1583     int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1584     if (errCode != E_OK) {
1585         LOGE("Query count failed %d", errCode);
1586         return errCode;
1587     }
1588     return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1589 }
1590 
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1591 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp &timestamp,
1592     SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1593 {
1594     sqlite3_stmt *stmt = nullptr;
1595     int errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, stmt);
1596     if (errCode != E_OK) {
1597         LOGE("failed to get count statement %d", errCode);
1598         return errCode;
1599     }
1600     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1601     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1602         count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1603         errCode = E_OK;
1604     } else {
1605         LOGE("Failed to get the count to be uploaded. %d", errCode);
1606     }
1607     int ret = E_OK;
1608     SQLiteUtils::ResetStatement(stmt, true, ret);
1609     return errCode != E_OK ? errCode : ret;
1610 }
1611 
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1612 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp &timestamp, bool isCloudForcePush,
1613     bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1614 {
1615     int errCode;
1616     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1617     if (errCode != E_OK) {
1618         return errCode;
1619     }
1620     std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1621         CloudWaterType::DELETE);
1622     return GetUploadCountInner(timestamp, helper, sql, count);
1623 }
1624 
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1625 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> &timestampVec,
1626     bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1627 {
1628     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1629     if (timestampVec.size() != typeVec.size()) {
1630         return -E_INVALID_ARGS;
1631     }
1632     int errCode;
1633     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1634     if (errCode != E_OK) {
1635         return errCode;
1636     }
1637     count = 0;
1638     for (size_t i = 0; i < typeVec.size(); i++) {
1639         std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1640         int64_t tempCount = 0;
1641         helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1642         errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1643         if (errCode != E_OK) {
1644             return errCode;
1645         }
1646         count += tempCount;
1647     }
1648     return E_OK;
1649 }
1650 
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1651 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1652     bool ignoreEmptyGid)
1653 {
1654     if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1655         cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1656         return -E_INVALID_ARGS;
1657     }
1658     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1659         + "' SET cloud_gid = ? WHERE data_key = ? ";
1660     sqlite3_stmt *stmt = nullptr;
1661     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1662     if (errCode != E_OK) {
1663         return errCode;
1664     }
1665     errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1666     int resetCode = E_OK;
1667     SQLiteUtils::ResetStatement(stmt, true, resetCode);
1668     return errCode == E_OK ? resetCode : errCode;
1669 }
1670 
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1671 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1672     CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1673 {
1674     token.GetCloudTableSchema(tableSchema_);
1675     sqlite3_stmt *queryStmt = nullptr;
1676     bool isStepNext = false;
1677     int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1678     if (errCode != E_OK) {
1679         (void)token.ReleaseCloudStatement();
1680         return errCode;
1681     }
1682     uint32_t totalSize = 0;
1683     uint32_t stepNum = -1;
1684     do {
1685         if (isStepNext) {
1686             errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1687             if (errCode != E_OK) {
1688                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1689                 break;
1690             }
1691         }
1692         isStepNext = true;
1693         errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1694     } while (errCode == E_OK);
1695     if (errCode != -E_UNFINISHED) {
1696         (void)token.ReleaseCloudStatement();
1697     }
1698     return errCode;
1699 }
1700 
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1701 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1702 {
1703     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1704         Asset asset;
1705         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1706         if (errCode != E_OK) {
1707             return errCode;
1708         }
1709         if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1710             return -E_CLOUD_ERROR;
1711         }
1712         vBucket.insert_or_assign(field.colName, asset);
1713     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1714         Assets assets;
1715         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1716         if (errCode != E_OK) {
1717             return errCode;
1718         }
1719         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1720             return -E_CLOUD_ERROR;
1721         }
1722         vBucket.insert_or_assign(field.colName, assets);
1723     } else {
1724         vBucket.insert_or_assign(field.colName, cloudValue);
1725     }
1726     return E_OK;
1727 }
1728 
GetDownloadAsset(std::vector<VBucket> & assetsV,const Field & field,Type & cloudValue)1729 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAsset(std::vector<VBucket> &assetsV, const Field &field,
1730     Type &cloudValue)
1731 {
1732     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1733         Asset asset;
1734         VBucket bucket;
1735         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1736         if (errCode != E_OK) {
1737             return errCode;
1738         }
1739         if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1740             bucket.insert_or_assign(field.colName, asset);
1741             assetsV.push_back(bucket);
1742         }
1743     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1744         Assets assets;
1745         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1746         if (errCode != E_OK) {
1747             return errCode;
1748         }
1749         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1750             return E_OK;
1751         }
1752         for (const auto &asset : assets) {
1753             if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1754                 VBucket bucket;
1755                 bucket.insert_or_assign(field.colName, asset);
1756                 assetsV.push_back(bucket);
1757             }
1758         }
1759     }
1760     return E_OK;
1761 }
1762 
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1763 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1764     const std::vector<Field> &assetFields, VBucket &assetInfo)
1765 {
1766     int errCode = SQLiteUtils::StepWithRetry(stmt);
1767     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1768         int index = 0;
1769         for (const auto &field : assetFields) {
1770             Type cloudValue;
1771             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1772             if (errCode != E_OK) {
1773                 break;
1774             }
1775             errCode = PutVBucketByType(assetInfo, field, cloudValue);
1776             if (errCode != E_OK) {
1777                 break;
1778             }
1779         }
1780     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1781         errCode = E_OK;
1782     } else {
1783         LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1784     }
1785     return errCode;
1786 }
1787 
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)1788 int SQLiteSingleVerRelationalStorageExecutor::GetLocalDataCount(const std::string &tableName, int &dataCount,
1789     int &logicDeleteDataCount)
1790 {
1791     std::string dataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key != -1";
1792     int errCode = SQLiteUtils::GetCountBySql(dbHandle_, dataCountSql, dataCount);
1793     if (errCode != E_OK) {
1794         LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
1795         return errCode;
1796     }
1797 
1798     std::string logicDeleteDataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) +
1799         " where flag&0x08!=0 and data_key != -1";
1800     errCode = SQLiteUtils::GetCountBySql(dbHandle_, logicDeleteDataCountSql, logicDeleteDataCount);
1801     if (errCode != E_OK) {
1802         LOGE("[RDBExecutor] Query local logic delete data count failed: %d", errCode);
1803     }
1804     return errCode;
1805 }
1806 
UpdateExtendField(const std::string & tableName,const std::set<std::string> & extendColNames,const std::string & condition)1807 int SQLiteSingleVerRelationalStorageExecutor::UpdateExtendField(const std::string &tableName,
1808     const std::set<std::string> &extendColNames, const std::string &condition)
1809 {
1810     bool isLogTableExist = false;
1811     int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
1812     if (errCode == E_OK && !isLogTableExist) {
1813         LOGW("[RDBExecutor][UpdateExtendField] Log table of [%s [%zu]] not found!",
1814             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1815         return E_OK;
1816     }
1817     std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " as log set extend_field = json_object(";
1818     for (const auto &extendColName : extendColNames) {
1819         sql += "'" + extendColName + "',data." + extendColName + ",";
1820     }
1821     sql.pop_back();
1822     sql += ") from " + tableName + " as data where log.data_key = data." + std::string(DBConstant::SQLITE_INNER_ROWID);
1823     sql += condition;
1824     sqlite3_stmt *stmt = nullptr;
1825     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1826     if (errCode != E_OK) {
1827         return errCode;
1828     }
1829 
1830     ResFinalizer finalizer([stmt]() {
1831         sqlite3_stmt *statement = stmt;
1832         int ret = E_OK;
1833         SQLiteUtils::ResetStatement(statement, true, ret);
1834         if (ret != E_OK) {
1835             LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1836         }
1837     });
1838     errCode = SQLiteUtils::StepWithRetry(stmt);
1839     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1840         LOGE("[RDBExecutor][UpdateExtendField] Update [%s [%zu]] extend field failed: %d",
1841             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1842         return errCode;
1843     }
1844     return E_OK;
1845 }
1846 
BuildJsonExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,sqlite3 * db)1847 int BuildJsonExtendField(const std::string &tableName, const std::string &lowVersionExtendColName, sqlite3 *db)
1848 {
1849     std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " set extend_field = json_object('" +
1850         lowVersionExtendColName + "',extend_field) where data_key = -1 and (json_valid(extend_field) = 0 or " +
1851         "json_extract(extend_field, '$." + lowVersionExtendColName +"') is null)";
1852     sqlite3_stmt *stmt = nullptr;
1853     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1854     if (errCode != E_OK) {
1855         return errCode;
1856     }
1857     errCode = SQLiteUtils::StepWithRetry(stmt);
1858     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1859         LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update [%s [%zu]] extend field non-JSON format failed: %d",
1860             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1861     } else {
1862         errCode = E_OK;
1863     }
1864     int ret = E_OK;
1865     SQLiteUtils::ResetStatement(stmt, true, ret);
1866     if (ret != E_OK) {
1867         LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1868     }
1869     return errCode;
1870 }
1871 
GetUpdateExtendFieldSql(const std::string & tableName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1872 std::string GetUpdateExtendFieldSql(const std::string &tableName, const std::set<std::string> &oldExtendColNames,
1873     const std::set<std::string> &extendColNames)
1874 {
1875     std::string sql = "update " + DBCommon::GetLogTableName(tableName) +
1876         " set extend_field = json_insert(extend_field,";
1877     bool isContainNewCol = false;
1878     for (const auto &extendColName : extendColNames) {
1879         if (oldExtendColNames.find(extendColName) != oldExtendColNames.end()) {
1880             continue;
1881         }
1882         isContainNewCol = true;
1883         sql += "'$." + extendColName + "',null,";
1884     }
1885     if (!isContainNewCol) {
1886         return "";
1887     }
1888     sql.pop_back();
1889     sql += ") where data_key = -1 and json_valid(extend_field) = 1;";
1890     return sql;
1891 }
1892 
UpdateDeleteDataExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1893 int SQLiteSingleVerRelationalStorageExecutor::UpdateDeleteDataExtendField(const std::string &tableName,
1894     const std::string &lowVersionExtendColName, const std::set<std::string> &oldExtendColNames,
1895     const std::set<std::string> &extendColNames)
1896 {
1897     bool isLogTableExist = false;
1898     if (SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist) == E_OK &&
1899         !isLogTableExist) {
1900         LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Log table of [%s [%zu]] not found!",
1901             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1902         return E_OK;
1903     }
1904     int errCode = E_OK;
1905     if (!lowVersionExtendColName.empty()) {
1906         errCode = BuildJsonExtendField(tableName, lowVersionExtendColName, dbHandle_);
1907         if (errCode != E_OK) {
1908             LOGE("[UpdateDeleteDataExtendField] Update low version extend field of [%s [%zu]] to json failed: %d",
1909                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1910             return errCode;
1911         }
1912     }
1913     std::string sql = GetUpdateExtendFieldSql(tableName, oldExtendColNames, extendColNames);
1914     if (sql.empty()) {
1915         return E_OK;
1916     }
1917 
1918     sqlite3_stmt *stmt = nullptr;
1919     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1920     if (errCode != E_OK) {
1921         return errCode;
1922     }
1923     errCode = SQLiteUtils::StepWithRetry(stmt);
1924     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1925         LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update extend field of [%s [%zu]] failed: %d",
1926             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1927     } else {
1928         errCode = E_OK;
1929     }
1930     int ret = E_OK;
1931     SQLiteUtils::ResetStatement(stmt, true, ret);
1932     if (ret != E_OK) {
1933         LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Reset stmt failed %d", ret);
1934     }
1935     return errCode;
1936 }
1937 
GetDownloadAssetGid(const TableSchema & tableSchema,std::vector<std::string> & gids,int64_t beginTime,bool abortWithLimit)1938 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetGid(const TableSchema &tableSchema,
1939     std::vector<std::string> &gids, int64_t beginTime, bool abortWithLimit)
1940 {
1941     std::string sql = "SELECT cloud_gid FROM " + DBCommon::GetLogTableName(tableSchema.name) +
1942         " WHERE flag&0x1000=0x1000 AND timestamp > ?;";
1943     sqlite3_stmt *stmt = nullptr;
1944     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1945     if (errCode != E_OK) {
1946         LOGE("[RDBExecutor]Get gid statement failed, %d", errCode);
1947         return errCode;
1948     }
1949     int ret = E_OK;
1950     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, beginTime);
1951     if (errCode != E_OK) {
1952         LOGE("[RDBExecutor] bind time failed %d when get download asset gid", errCode);
1953         SQLiteUtils::ResetStatement(stmt, true, ret);
1954         return errCode;
1955     }
1956     uint32_t count = 0;
1957     do {
1958         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1959         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1960             errCode = E_OK;
1961             break;
1962         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1963             LOGE("[RDBExecutor]Get downloading assets gid failed. %d", errCode);
1964             break;
1965         }
1966         std::string gid;
1967         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1968         if (errCode != E_OK) {
1969             LOGW("[RDBExecutor]Get downloading assets gid failed %d when get col", errCode);
1970             continue;
1971         }
1972         gids.push_back(gid);
1973         if (AbortGetDownloadAssetGidIfNeed(tableSchema, gid, abortWithLimit, count)) {
1974             break;
1975         }
1976     } while (errCode == E_OK);
1977     SQLiteUtils::ResetStatement(stmt, true, ret);
1978     return errCode == E_OK ? ret : errCode;
1979 }
1980 
GetDownloadAssetRecordsByGid(const TableSchema & tableSchema,const std::string gid,std::vector<VBucket> & assets)1981 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsByGid(const TableSchema &tableSchema,
1982     const std::string gid, std::vector<VBucket> &assets)
1983 {
1984     std::vector<Field> assetFields;
1985     std::string sql = "SELECT";
1986     for (const auto &field: tableSchema.fields) {
1987         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1988             assetFields.emplace_back(field);
1989             sql += " b." + field.colName + ",";
1990         }
1991     }
1992     if (assetFields.empty()) {
1993         return E_OK;
1994     }
1995     sql.pop_back(); // remove last ,
1996     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE a.cloud_gid = ?;";
1997     sqlite3_stmt *stmt = nullptr;
1998     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1999     if (errCode != E_OK) {
2000         LOGE("Get downloading asset records statement failed, %d", errCode);
2001         return errCode;
2002     }
2003     int ret = E_OK;
2004     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
2005     if (errCode != E_OK) {
2006         SQLiteUtils::ResetStatement(stmt, true, ret);
2007         return errCode;
2008     }
2009     errCode = SQLiteUtils::StepWithRetry(stmt);
2010     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2011         int index = 0;
2012         for (const auto &field: assetFields) {
2013             Type value;
2014             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, value);
2015             if (errCode != E_OK) {
2016                 break;
2017             }
2018             errCode = GetDownloadAsset(assets, field, value);
2019             if (errCode != E_OK) {
2020                 break;
2021             }
2022         }
2023     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2024         errCode = E_OK;
2025     } else {
2026         LOGE("step get downloading asset records statement failed %d.", errCode);
2027     }
2028     SQLiteUtils::ResetStatement(stmt, true, ret);
2029     return errCode == E_OK ? ret : errCode;
2030 }
2031 
GetDownloadingCount(const std::string & tableName,int32_t & count)2032 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingCount(const std::string &tableName, int32_t &count)
2033 {
2034     std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName) + " WHERE flag&0x1000=0x1000;";
2035     int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
2036     if (errCode != E_OK) {
2037         LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
2038     }
2039     return errCode;
2040 }
2041 
GetDownloadingAssetsCount(const TableSchema & tableSchema,int32_t & totalCount)2042 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingAssetsCount(
2043     const TableSchema &tableSchema, int32_t &totalCount)
2044 {
2045     std::vector<std::string> gids;
2046     int errCode = GetDownloadAssetGid(tableSchema, gids);
2047     if (errCode != E_OK) {
2048         LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2049         return errCode;
2050     }
2051     for (const auto &gid : gids) {
2052         std::vector<VBucket> assets;
2053         errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2054         if (errCode != E_OK) {
2055             LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
2056             return errCode;
2057         }
2058         totalCount += static_cast<int32_t>(assets.size());
2059     }
2060     return E_OK;
2061 }
2062 
GetDownloadAssetRecordsInner(const TableSchema & tableSchema,int64_t beginTime,std::vector<std::string> & gids)2063 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsInner(
2064     const TableSchema &tableSchema, int64_t beginTime, std::vector<std::string> &gids)
2065 {
2066     int errCode = GetDownloadAssetGid(tableSchema, gids, beginTime, true);
2067     if (errCode != E_OK) {
2068         LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2069     }
2070     return errCode;
2071 }
2072 
CleanDownloadingFlag(const std::string & tableName)2073 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlag(const std::string &tableName)
2074 {
2075     std::string sql;
2076     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000);";
2077     sqlite3_stmt *stmt = nullptr;
2078     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2079     if (errCode != E_OK) {
2080         LOGE("[RDBExecutor]Get stmt failed clean downloading flag: %d, tableName: %s, length: %zu",
2081             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2082         return errCode;
2083     }
2084     errCode = SQLiteUtils::StepWithRetry(stmt);
2085     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2086         errCode = E_OK;
2087     } else {
2088         LOGE("[RDBExecutor]Clean downloading flag failed: %d, tableName: %s, length: %zu",
2089             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2090     }
2091     int ret = E_OK;
2092     SQLiteUtils::ResetStatement(stmt, true, ret);
2093     if (ret != E_OK) {
2094         LOGE("[RDBExecutor]Reset stmt failed clean downloading flag: %d", ret);
2095     }
2096     return errCode != E_OK ? errCode : ret;
2097 }
2098 
AbortGetDownloadAssetGidIfNeed(const DistributedDB::TableSchema & tableSchema,const std::string & gid,bool abortWithLimit,uint32_t & count)2099 bool SQLiteSingleVerRelationalStorageExecutor::AbortGetDownloadAssetGidIfNeed(
2100     const DistributedDB::TableSchema &tableSchema, const std::string &gid, bool abortWithLimit,
2101     uint32_t &count)
2102 {
2103     if (!abortWithLimit) {
2104         return false;
2105     }
2106     std::vector<VBucket> assets;
2107     int errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2108     if (errCode != E_OK) {
2109         LOGW("[RDBExecutor]Get downloading assets failed %d gid %s", errCode, gid.c_str());
2110         return false;
2111     }
2112     count += assets.size();
2113     return count >= RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetMaxDownloadAssetsCount();
2114 }
2115 } // namespace DistributedDB
2116 #endif
2117