• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32 
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35     const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37     std::string querySql;
38     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39     std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40     int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41     if (errCode != E_OK) {
42         LOGE("Get query log sql fail, %d", errCode);
43         return errCode;
44     }
45     if (!pkSet.empty()) {
46         errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47         if (errCode != E_OK) {
48             LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49             return errCode;
50         }
51     }
52     sqlite3_stmt *selectStmt = nullptr;
53     errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54     if (errCode != E_OK) {
55         LOGE("Get query log statement fail, %d", errCode);
56         return errCode;
57     }
58 
59     bool alreadyFound = false;
60     do {
61         errCode = SQLiteUtils::StepWithRetry(selectStmt);
62         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63             if (alreadyFound) {
64                 LOGE("found more than one records in log table for one primary key or gid.");
65                 errCode = -E_CLOUD_ERROR;
66                 break;
67             }
68             alreadyFound = true;
69             std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70             errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71             if (errCode != E_OK) {
72                 LOGE("Get info by statement fail, %d", errCode);
73                 break;
74             }
75         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76             errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77             break;
78         } else {
79             LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80             break;
81         }
82     } while (errCode == E_OK);
83 
84     int ret = E_OK;
85     SQLiteUtils::ResetStatement(selectStmt, true, ret);
86     return errCode != E_OK ? errCode : ret;
87 }
88 
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91     int index = 0;
92     logInfo.dataKey = sqlite3_column_int64(statement, index++);
93     std::vector<uint8_t> device;
94     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device);    // 1 is device
95     DBCommon::VectorToString(device, logInfo.device);
96     std::vector<uint8_t> originDev;
97     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98     DBCommon::VectorToString(originDev, logInfo.originDev);
99     logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100     logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105     logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107     return index;
108 }
109 
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111     const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112     VBucket &assetInfo)
113 {
114     int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115     int errCode = E_OK;
116     for (const auto &field: assetFields) {
117         Type cloudValue;
118         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119         if (errCode != E_OK) {
120             break;
121         }
122         errCode = PutVBucketByType(assetInfo, field, cloudValue);
123         if (errCode != E_OK) {
124             break;
125         }
126     }
127     if (errCode != E_OK) {
128         LOGE("set asset field failed, errCode = %d", errCode);
129         return errCode;
130     }
131 
132     // fill primary key
133     for (const auto &item : pkMap) {
134         Type cloudValue;
135         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136         if (errCode != E_OK) {
137             break;
138         }
139         errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140         if (errCode != E_OK) {
141             break;
142         }
143     }
144     return errCode;
145 }
146 
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149     std::string sql = "insert into " + tableSchema.name + "(";
150     for (const auto &field : tableSchema.fields) {
151         sql += field.colName + ",";
152     }
153     sql.pop_back();
154     sql += ") values(";
155     for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156         sql += "?,";
157     }
158     sql.pop_back();
159     sql += ");";
160     return sql;
161 }
162 
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164     const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166     int errCode = E_OK;
167     TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168     // table name in cloud schema is in lower case
169     if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170         LOGE("localSchema doesn't contain table from cloud");
171         return -E_INTERNAL_ERROR;
172     }
173 
174     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175     if (pkMap.size() == 0) {
176         int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177         std::vector<uint8_t> value;
178         DBCommon::StringToVector(std::to_string(rowid), value);
179         errCode = DBCommon::CalcValueHash(value, hashValue);
180     } else {
181         std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182             tableSchema, localTable, pkMap, allowEmpty);
183     }
184     return errCode;
185 }
186 
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188     const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190     int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191     if (errCode != E_OK) {
192         LOGE("Get select log statement failed, %d", errCode);
193         return errCode;
194     }
195 
196     std::string cloudGid;
197     int ret = E_OK;
198     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200         SQLiteUtils::ResetStatement(selectStmt, true, ret);
201         LOGE("Get cloud gid fail when bind query log statement.");
202         return errCode;
203     }
204 
205     int index = 0;
206     if (!cloudGid.empty()) {
207         index++;
208         errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209         if (errCode != E_OK) {
210             LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212             return errCode;
213         }
214     }
215 
216     index++;
217     errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218     if (errCode != E_OK) {
219         LOGE("Bind hash key to query log statement failed. %d", errCode);
220         SQLiteUtils::ResetStatement(selectStmt, true, ret);
221     }
222     return errCode != E_OK ? errCode : ret;
223 }
224 
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226     const std::set<std::string> &pkSet, std::string &querySql)
227 {
228     std::string cloudGid;
229     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230     if (errCode != E_OK) {
231         LOGE("Get cloud gid fail when query log table.");
232         return errCode;
233     }
234 
235     if (pkSet.empty() && cloudGid.empty()) {
236         LOGE("query log table failed because of both primary key and gid are empty.");
237         return -E_CLOUD_ERROR;
238     }
239     std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240         " sharing_resource, status, version FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
241         "_log WHERE ";
242     if (!cloudGid.empty()) {
243         sql += "cloud_gid = ? OR ";
244     }
245     sql += "hash_key = ?";
246 
247     querySql = sql;
248     return E_OK;
249 }
250 
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)251 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
252     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
253     std::map<int, int> &statisticMap)
254 {
255     int index = 0;
256     int errCode = E_OK;
257     for (OpType op : downloadData.opType) {
258         VBucket &vBucket = downloadData.data[index];
259         switch (op) {
260             case OpType::INSERT:
261                 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
262                 break;
263             case OpType::UPDATE:
264                 errCode = UpdateCloudData(vBucket, tableSchema);
265                 break;
266             case OpType::DELETE:
267                 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
268                 break;
269             case OpType::ONLY_UPDATE_GID:
270             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
271             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
272             case OpType::UPDATE_TIMESTAMP:
273             case OpType::CLEAR_GID:
274             case OpType::LOCKED_NOT_HANDLE:
275                 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
276                 [[fallthrough]];
277             case OpType::NOT_HANDLE:
278                 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
279                     GetLocalDataKey(index, downloadData), op) : errCode;
280                 break;
281             default:
282                 errCode = -E_CLOUD_ERROR;
283                 break;
284         }
285         if (errCode != E_OK) {
286             LOGE("put cloud sync data fail: %d", errCode);
287             return errCode;
288         }
289         statisticMap[static_cast<int>(op)]++;
290         index++;
291     }
292     return errCode;
293 }
294 
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)295 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
296     const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
297     std::vector<Asset> &assets, std::vector<std::string> &notifyTableList)
298 {
299     int errCode = SetLogTriggerStatus(false);
300     if (errCode != E_OK) {
301         LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
302         return errCode;
303     }
304     if (mode == FLAG_ONLY) {
305         errCode = DoCleanLogs(tableNameList, localSchema);
306         if (errCode != E_OK) {
307             LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
308             return errCode;
309         }
310         notifyTableList = tableNameList;
311     } else if (mode == FLAG_AND_DATA) {
312         errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
313         if (errCode != E_OK) {
314             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
315             return errCode;
316         }
317         notifyTableList = tableNameList;
318     } else if (mode == CLEAR_SHARED_TABLE) {
319         errCode = DoCleanShareTableDataAndLog(tableNameList);
320         if (errCode != E_OK) {
321             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
322             return errCode;
323         }
324         notifyTableList = tableNameList;
325     }
326     for (const auto &tableName: tableNameList) {
327         errCode = CleanDownloadingFlag(tableName);
328         if (errCode != E_OK) {
329             LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu",
330                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
331             return errCode;
332         }
333     }
334     errCode = SetLogTriggerStatus(true);
335     if (errCode != E_OK) {
336         LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
337     }
338 
339     return errCode;
340 }
341 
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)342 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
343     const RelationalSchemaObject &localSchema)
344 {
345     int errCode = E_OK;
346     int i = 1;
347     for (const auto &tableName: tableNameList) {
348         std::string logTableName = DBCommon::GetLogTableName(tableName);
349         LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
350         errCode = DoCleanAssetId(tableName, localSchema);
351         if (errCode != E_OK) {
352             LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
353             return errCode;
354         }
355         errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
356         if (errCode != E_OK) {
357             LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
358             return errCode;
359         }
360         i++;
361     }
362 
363     return errCode;
364 }
365 
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)366 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
367 {
368     if (ctx == nullptr || argc != 0 || argv == nullptr) {
369         LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
370         return;
371     }
372     auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
373     if (context == nullptr) {
374         LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
375         return;
376     }
377     context->cursor++;
378     sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
379 }
380 
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const381 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
382     void(*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
383 {
384     std::string sql = "update_cursor";
385     int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
386         &context, updateCursor, nullptr, nullptr, nullptr);
387     if (errCode != SQLITE_OK) {
388         LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
389         return SQLiteUtils::MapSQLiteErrno(errCode);
390     }
391     return E_OK;
392 }
393 
GetCursor(const std::string & tableName,uint64_t & cursor)394 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName, uint64_t &cursor)
395 {
396     return SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
397 }
398 
SetCursor(const std::string & tableName,uint64_t cursor)399 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, uint64_t cursor)
400 {
401     std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata SET VALUE = ? where KEY = ?;";
402     sqlite3_stmt *stmt = nullptr;
403     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
404     if (errCode != E_OK) {
405         LOGE("Set cursor sql failed=%d", errCode);
406         return errCode;
407     }
408     ResFinalizer finalizer([stmt]() {
409         sqlite3_stmt *statement = stmt;
410         int ret = E_OK;
411         SQLiteUtils::ResetStatement(statement, true, ret);
412         if (ret != E_OK) {
413             LOGW("Reset stmt failed %d when set cursor", ret);
414         }
415     });
416     int index = 1;
417     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
418     if (errCode != E_OK) {
419         LOGE("Bind saved cursor failed:%d", errCode);
420         return errCode;
421     }
422     Key key;
423     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
424     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
425     if (errCode != E_OK) {
426         return errCode;
427     }
428     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
429     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
430         errCode = E_OK;
431     }
432     return errCode;
433 }
434 
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)435 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
436     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
437 {
438     int errCode = E_OK;
439     for (size_t i = 0; i < tableNameList.size(); i++) {
440         std::string tableName = tableNameList[i];
441         std::string logTableName = DBCommon::GetLogTableName(tableName);
442         std::vector<int64_t> dataKeys;
443         errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
444         if (errCode != E_OK) {
445             LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
446             return errCode;
447         }
448 
449         std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
450         errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
451         if (errCode != E_OK) {
452             LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
453             return errCode;
454         }
455         if (isLogicDelete_) {
456             errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
457         } else {
458             errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
459         }
460         if (errCode != E_OK) {
461             LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
462             return errCode;
463         }
464     }
465     return errCode;
466 }
467 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)468 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
469     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
470 {
471     int errCode = E_OK;
472     int ret = E_OK;
473     sqlite3_stmt *selectStmt = nullptr;
474     for (const auto &rowId : dataKeys) {
475         std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
476             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
477         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
478         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
479             LOGE("Get select asset statement failed, %d", errCode);
480             return errCode;
481         }
482         errCode = SQLiteUtils::StepWithRetry(selectStmt);
483         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
484             std::vector<uint8_t> blobValue;
485             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
486             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
487                 LOGE("Get column blob value failed, %d", errCode);
488                 goto END;
489             }
490             if (blobValue.empty()) {
491                 SQLiteUtils::ResetStatement(selectStmt, true, ret);
492                 continue;
493             }
494             Asset asset;
495             errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
496             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
497                 LOGE("Transfer blob to asset failed, %d", errCode);
498                 goto END;
499             }
500             assets.push_back(asset);
501         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
502             errCode = E_OK;
503             Asset asset;
504             assets.push_back(asset);
505         }
506         SQLiteUtils::ResetStatement(selectStmt, true, ret);
507     }
508     return errCode != E_OK ? errCode : ret;
509 END:
510     SQLiteUtils::ResetStatement(selectStmt, true, ret);
511     return errCode != E_OK ? errCode : ret;
512 }
513 
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)514 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
515     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
516 {
517     int errCode = E_OK;
518     int ret = E_OK;
519     sqlite3_stmt *selectStmt = nullptr;
520     for (const auto &rowId : dataKeys) {
521         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
522             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
523         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
524         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
525             LOGE("Get select assets statement failed, %d", errCode);
526             goto END;
527         }
528         errCode = SQLiteUtils::StepWithRetry(selectStmt);
529         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
530             std::vector<uint8_t> blobValue;
531             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
532             if (errCode != E_OK) {
533                 goto END;
534             }
535             Assets tmpAssets;
536             errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
537             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
538                 goto END;
539             }
540             for (const auto &asset: tmpAssets) {
541                 assets.push_back(asset);
542             }
543         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
544             errCode = E_OK;
545         }
546         SQLiteUtils::ResetStatement(selectStmt, true, ret);
547     }
548     return errCode != E_OK ? errCode : ret;
549 END:
550     SQLiteUtils::ResetStatement(selectStmt, true, ret);
551     return errCode != E_OK ? errCode : ret;
552 }
553 
GetCloudDataCount(const std::string & tableName,DownloadData & downloadData,int64_t & count)554 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataCount(const std::string &tableName,
555     DownloadData &downloadData, int64_t &count)
556 {
557     if (downloadData.data.empty()) {
558         return E_OK;
559     }
560     int errCode = E_OK;
561     sqlite3_stmt *queryStmt = nullptr;
562     std::string querySql = "SELECT COUNT(*) FROM '" + DBCommon::GetLogTableName(tableName) +
563         "' WHERE FLAG & 0x02 = 0 AND CLOUD_GID IN (";
564     for (const VBucket &vBucket : downloadData.data) {
565         std::string cloudGid;
566         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
567         if (errCode != E_OK) {
568             LOGE("get gid for query log statement failed, %d", errCode);
569             return -E_CLOUD_ERROR;
570         }
571         querySql += "'" + cloudGid + "',";
572     }
573     querySql.pop_back();
574     querySql += ");";
575     errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, queryStmt);
576     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
577         LOGE("Get query count statement failed, %d", errCode);
578         return errCode;
579     }
580     errCode = SQLiteUtils::StepWithRetry(queryStmt);
581     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
582         count = static_cast<int64_t>(sqlite3_column_int64(queryStmt, 0));
583         errCode = E_OK;
584     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
585         errCode = E_OK;
586     }
587     int ret = E_OK;
588     SQLiteUtils::ResetStatement(queryStmt, true, ret);
589     return errCode != E_OK ? errCode : ret;
590 }
591 
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)592 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
593     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
594 {
595     int errCode = E_OK;
596     for (const auto &fieldInfo: fieldInfos) {
597         if (fieldInfo.IsAssetType()) {
598             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
599             if (errCode != E_OK) {
600                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
601                 return errCode;
602             }
603         } else if (fieldInfo.IsAssetsType()) {
604             errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
605             if (errCode != E_OK) {
606                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
607                 return errCode;
608             }
609         }
610     }
611     return errCode;
612 }
613 
SetCursorIncFlag(bool flag)614 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
615 {
616     std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
617         " VALUES ('cursor_inc_flag', ";
618     if (flag) {
619         sql += "'true'";
620     } else {
621         sql += "'false'";
622     }
623     sql += ");";
624     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
625     if (errCode != E_OK) {
626         LOGE("set cursor inc flag fail, errCode = %d", errCode);
627     }
628     return errCode;
629 }
630 
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)631 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
632     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
633 {
634     if (downloadData.data.size() != downloadData.opType.size()) {
635         LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
636              downloadData.opType.size());
637         return -E_CLOUD_ERROR;
638     }
639 
640     int errCode = SetLogTriggerStatus(false);
641     if (errCode != E_OK) {
642         LOGE("Fail to set log trigger off, %d", errCode);
643         return errCode;
644     }
645 
646     std::map<int, int> statisticMap = {};
647     errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
648     if (errCode != E_OK) {
649         LOGE("ExecutePutCloudData failed, %d", errCode);
650     }
651     int ret = SetLogTriggerStatus(true);
652     if (ret != E_OK) {
653         LOGE("Fail to set log trigger on, %d", ret);
654     }
655     int64_t count = 0;
656     int errCodeCount = GetCloudDataCount(tableName, downloadData, count);
657     if (errCodeCount != E_OK) {
658         LOGW("get cloud data count failed, %d", errCodeCount);
659     }
660     LOGI("save cloud data of table %s [length %zu]:%d, cloud data count:%lld, ins:%d, upd:%d, del:%d, only gid:%d,"
661         "flag zero:%d, flag one:%d, upd timestamp:%d, clear gid:%d, not handle:%d, lock:%d",
662          DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode, count,
663          statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
664          statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
665          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
666          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
667          statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
668          statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
669     return errCode == E_OK ? ret : errCode;
670 }
671 
UpdateAssetStatusForAssetOnly(const TableSchema & tableSchema,VBucket & vBucket)672 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetStatusForAssetOnly(
673     const TableSchema &tableSchema, VBucket &vBucket)
674 {
675     std::string cloudGid;
676     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
677     if (errCode != E_OK) {
678         LOGE("Miss gid when fill Asset %d.", errCode);
679         return errCode;
680     }
681     std::vector<Field> assetsField;
682     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
683     if (errCode != E_OK) {
684         LOGE("No assets need to be filled when download assets only, err:%d.", errCode);
685         return errCode;
686     }
687 
688     sqlite3_stmt *stmt = nullptr;
689     errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
690     if (errCode != E_OK) {
691         LOGE("can not get assetsField from tableSchema:%s err:%d when download assets only.",
692             tableSchema.name.c_str(), errCode);
693         return errCode;
694     }
695     return ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
696 }
697 
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)698 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
699     const TrackerTable &trackerTable, int64_t dataKey)
700 {
701     int errCode = E_OK;
702     if (dataKey > 0) {
703         errCode = RemoveDataAndLog(tableSchema.name, dataKey);
704         if (errCode != E_OK) {
705             return errCode;
706         }
707     }
708     std::string sql = GetInsertSqlForCloudSync(tableSchema);
709     sqlite3_stmt *insertStmt = nullptr;
710     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
711     if (errCode != E_OK) {
712         LOGE("Get insert statement failed when save cloud data, %d", errCode);
713         return errCode;
714     }
715     if (putDataMode_ == PutDataMode::SYNC) {
716         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
717     }
718     errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
719     if (errCode != E_OK) {
720         SQLiteUtils::ResetStatement(insertStmt, true, errCode);
721         return errCode;
722     }
723     // insert data
724     errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
725     int ret = E_OK;
726     SQLiteUtils::ResetStatement(insertStmt, true, ret);
727     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
728         LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
729         return errCode;
730     }
731 
732     // insert log
733     return InsertLogRecord(tableSchema, trackerTable, vBucket);
734 }
735 
GetInsertLogSql(const std::string & logTableName,const std::set<std::string> & extendColNames)736 std::string GetInsertLogSql(const std::string &logTableName, const std::set<std::string> &extendColNames)
737 {
738     if (extendColNames.empty()) {
739         return "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " +
740             "CASE WHEN (SELECT status FROM " + logTableName + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
741             "(SELECT status FROM " + logTableName + " WHERE hash_key=?) " + "END)";
742     }
743     std::string sql = "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, json_object(";
744     for (const auto &extendColName : extendColNames) {
745         sql += "'" + extendColName + "',?,";
746     }
747     sql.pop_back();
748     sql += "), 0, ?, ?, CASE WHEN (SELECT status FROM " + logTableName +
749         " WHERE hash_key=?) IS NULL THEN 0 ELSE " + "(SELECT status FROM " + logTableName +
750         " WHERE hash_key=?) " + "END)";
751     return sql;
752 }
753 
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)754 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
755     const TrackerTable &trackerTable, VBucket &vBucket)
756 {
757     if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
758         // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
759         // so we need to delete the old log record according to the gid first
760         std::string gidStr;
761         int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
762         if (errCode != E_OK || gidStr.empty()) {
763             LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
764             return errCode;
765         }
766         std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
767             + gidStr + "';";
768         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
769         if (errCode != E_OK) {
770             LOGE("delete log record according gid fail, errCode = %d", errCode);
771             return errCode;
772         }
773     }
774 
775     std::string sql = GetInsertLogSql(DBCommon::GetLogTableName(tableSchema.name), trackerTable.GetExtendNames());
776     sqlite3_stmt *insertLogStmt = nullptr;
777     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
778     if (errCode != E_OK) {
779         LOGE("Get insert log statement failed when save cloud data, %d", errCode);
780         return errCode;
781     }
782 
783     errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
784     if (errCode != E_OK) {
785         SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
786         return errCode;
787     }
788 
789     errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
790     int ret = E_OK;
791     SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
792     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
793         return ret;
794     } else {
795         LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
796         return errCode;
797     }
798 }
799 
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)800 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
801     sqlite3_stmt *updateStmt)
802 {
803     auto it = bindCloudFieldFuncMap_.find(field.type);
804     if (it == bindCloudFieldFuncMap_.end()) {
805         LOGE("unknown cloud type when bind one field.");
806         return -E_CLOUD_ERROR;
807     }
808     return it->second(index, vBucket, field, updateStmt);
809 }
810 
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)811 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
812     const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
813 {
814     int errCode = E_OK;
815     int index = 0;
816     for (const auto &field : fields) {
817         index++;
818         errCode = BindOneField(index, vBucket, field, upsertStmt);
819         if (errCode != E_OK) {
820             return errCode;
821         }
822     }
823     return errCode;
824 }
825 
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey,int & index)826 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
827     std::vector<uint8_t> &hashKey, int &index)
828 {
829     int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
830     if (errCode != E_OK) {
831         LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
832         return errCode;
833     }
834 
835     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
836     if (errCode != E_OK) {
837         LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
838         return errCode;
839     }
840     return errCode;
841 }
842 
BindExtendValue(const VBucket & vBucket,const TrackerTable & trackerTable,sqlite3_stmt * stmt,int & index)843 int BindExtendValue(const VBucket &vBucket, const TrackerTable &trackerTable, sqlite3_stmt *stmt, int &index)
844 {
845     const std::set<std::string> &extendColNames = trackerTable.GetExtendNames();
846     int errCode = E_OK;
847     int extendValueIndex = index;
848     if (extendColNames.empty()) {
849         return SQLiteUtils::BindTextToStatement(stmt, index++, "");
850     }
851     for (const auto &extendColName : extendColNames) {
852         if (vBucket.find(extendColName) == vBucket.end()) {
853             errCode = SQLiteUtils::BindTextToStatement(stmt, extendValueIndex++, "");
854         } else {
855             Type extendValue = vBucket.at(extendColName);
856             errCode = SQLiteRelationalUtils::BindStatementByType(stmt, extendValueIndex++, extendValue);
857         }
858         if (errCode != E_OK) {
859             const std::string tableName = DBCommon::StringMiddleMasking(trackerTable.GetTableName());
860             size_t nameLength = trackerTable.GetTableName().size();
861             LOGE("[%s [%zu]] Bind extend field failed: %d", tableName.c_str(), nameLength, errCode);
862             return errCode;
863         }
864     }
865     index = extendValueIndex;
866     return E_OK;
867 }
868 
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt,int & index)869 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
870     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt, int &index)
871 {
872     std::vector<uint8_t> hashKey;
873     int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
874     if (errCode != E_OK) {
875         return errCode;
876     }
877     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
878     if (errCode != E_OK) {
879         LOGE("Bind hash_key to insert log statement failed, %d", errCode);
880         return errCode;
881     }
882 
883     std::string cloudGid;
884     if (putDataMode_ == PutDataMode::SYNC) {
885         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
886         if (errCode != E_OK) {
887             LOGE("get gid for insert log statement failed, %d", errCode);
888             return -E_CLOUD_ERROR;
889         }
890     }
891 
892     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, cloudGid); // next is cloud_gid
893     if (errCode != E_OK) {
894         LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
895         return errCode;
896     }
897 
898     errCode = BindExtendValue(vBucket, trackerTable, insertLogStmt, index); // next is extend_field
899     if (errCode != E_OK) {
900         LOGE("Bind extend_field to insert log statement failed, %d", errCode);
901         return errCode;
902     }
903 
904     errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt, index);
905     if (errCode != E_OK) {
906         return errCode;
907     }
908     return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey, index);
909 }
910 
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)911 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
912     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
913 {
914     int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
915     int bindIndex = 1; // 1 is rowid
916     int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, rowid);
917     if (errCode != E_OK) {
918         LOGE("Bind rowid to insert log statement failed, %d", errCode);
919         return errCode;
920     }
921 
922     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is device
923     if (errCode != E_OK) {
924         LOGE("Bind device to insert log statement failed, %d", errCode);
925         return errCode;
926     }
927 
928     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is ori_device
929     if (errCode != E_OK) {
930         LOGE("Bind ori_device to insert log statement failed, %d", errCode);
931         return errCode;
932     }
933 
934     int64_t val = 0;
935     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
936     if (errCode != E_OK) {
937         LOGE("get modify time for insert log statement failed, %d", errCode);
938         return -E_CLOUD_ERROR;
939     }
940 
941     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is timestamp
942     if (errCode != E_OK) {
943         LOGE("Bind timestamp to insert log statement failed, %d", errCode);
944         return errCode;
945     }
946 
947     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
948     if (errCode != E_OK) {
949         LOGE("get create time for insert log statement failed, %d", errCode);
950         return -E_CLOUD_ERROR;
951     }
952 
953     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is wtimestamp
954     if (errCode != E_OK) {
955         LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
956         return errCode;
957     }
958 
959     errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, bindIndex++, GetDataFlag())); // next is flag
960     if (errCode != E_OK) {
961         LOGE("Bind flag to insert log statement failed, %d", errCode);
962         return errCode;
963     }
964 
965     vBucket[DBConstant::ROWID] = rowid; // fill rowid to cloud data to notify user
966     return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt, bindIndex);
967 }
968 
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)969 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
970     const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
971 {
972     std::string where = " WHERE";
973     if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
974         where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
975             DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
976     }
977     if (!pkSet.empty() && queryByPk) {
978         if (!gidStr.empty()) {
979             where += " OR";
980         }
981         where += " (1 = 1";
982         for (const auto &pk : pkSet) {
983             where += (" AND " + pk + " = ?");
984         }
985         where += ");";
986     }
987     return where;
988 }
989 
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)990 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
991     const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
992     std::string &updateSql)
993 {
994     if (pkSet.empty() && gidStr.empty()) {
995         LOGE("update data fail because both primary key and gid is empty.");
996         return -E_CLOUD_ERROR;
997     }
998     std::string sql = "UPDATE " + tableSchema.name + " SET";
999     for (const auto &field : updateFields) {
1000         sql +=  " " + field.colName + " = ?,";
1001     }
1002     sql.pop_back();
1003     sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
1004     updateSql = sql;
1005     return E_OK;
1006 }
1007 
IsGidValid(const std::string & gidStr)1008 static inline bool IsGidValid(const std::string &gidStr)
1009 {
1010     if (!gidStr.empty()) {
1011         return gidStr.find("'") == std::string::npos;
1012     }
1013     return true;
1014 }
1015 
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)1016 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
1017     const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
1018 {
1019     std::string gidStr;
1020     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1021     if (errCode != E_OK) {
1022         LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
1023         return errCode;
1024     }
1025     if (!IsGidValid(gidStr)) {
1026         LOGE("invalid char in cloud gid");
1027         return -E_CLOUD_ERROR;
1028     }
1029 
1030     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1031     auto updateFields = GetUpdateField(vBucket, tableSchema);
1032     std::string updateSql;
1033     errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
1034     if (errCode != E_OK) {
1035         return errCode;
1036     }
1037 
1038     errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
1039     if (errCode != E_OK) {
1040         LOGE("Get update statement failed when update cloud data, %d", errCode);
1041         return errCode;
1042     }
1043 
1044     // bind value
1045     if (!pkSet.empty()) {
1046         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1047         updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
1048     }
1049     errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
1050     if (errCode != E_OK) {
1051         LOGE("bind value to update statement failed when update cloud data, %d", errCode);
1052         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1053     }
1054     return errCode;
1055 }
1056 
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)1057 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
1058 {
1059     if (putDataMode_ == PutDataMode::SYNC) {
1060         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
1061     }
1062     sqlite3_stmt *updateStmt = nullptr;
1063     int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
1064     if (errCode != E_OK) {
1065         LOGE("Get update data table statement fail, %d", errCode);
1066         return errCode;
1067     }
1068 
1069     // update data
1070     errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
1071     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1072         errCode = E_OK;
1073     } else {
1074         LOGE("update data failed when save cloud data:%d", errCode);
1075         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1076         return errCode;
1077     }
1078     SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1079 
1080     // update log
1081     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
1082     if (errCode != E_OK) {
1083         LOGE("update log record failed when update cloud data, errCode = %d", errCode);
1084     }
1085     return errCode;
1086 }
1087 
IsAllowWithPrimaryKey(OpType opType)1088 static inline bool IsAllowWithPrimaryKey(OpType opType)
1089 {
1090     return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
1091         opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
1092 }
1093 
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1094 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1095     OpType opType)
1096 {
1097     sqlite3_stmt *updateLogStmt = nullptr;
1098     std::vector<std::string> updateColName;
1099     int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1100     if (errCode != E_OK) {
1101         LOGE("Get update log statement failed, errCode = %d", errCode);
1102         return errCode;
1103     }
1104 
1105     errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1106         updateLogStmt);
1107     int ret = E_OK;
1108     if (errCode != E_OK) {
1109         LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1110         SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1111         return errCode != E_OK ? errCode : ret;
1112     }
1113 
1114     errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1115     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1116         errCode = E_OK;
1117     } else {
1118         LOGE("update log record failed when update cloud data:%d", errCode);
1119     }
1120     SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1121     return errCode != E_OK ? errCode : ret;
1122 }
1123 
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1124 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1125     const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1126     sqlite3_stmt *updateLogStmt)
1127 {
1128     int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1129     if (errCode != E_OK) {
1130         return errCode;
1131     }
1132     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1133     if (pkMap.empty()) {
1134         return E_OK;
1135     }
1136 
1137     std::vector<uint8_t> hashKey;
1138     errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1139     if (errCode != E_OK) {
1140         return errCode;
1141     }
1142     return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1143 }
1144 
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1145 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1146     const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1147 {
1148     std::string gidStr;
1149     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1150     if (errCode != E_OK) {
1151         LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1152         return errCode;
1153     }
1154     if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1155         LOGE("empty or invalid char in cloud gid");
1156         return -E_CLOUD_ERROR;
1157     }
1158 
1159     bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1160     std::string deleteSql = "DELETE FROM " + tableSchema.name;
1161     deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1162     errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1163     if (errCode != E_OK) {
1164         LOGE("Get delete statement failed when delete data, %d", errCode);
1165         return errCode;
1166     }
1167 
1168     int ret = E_OK;
1169     if (!pkSet.empty() && queryByPk) {
1170         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1171         errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1172         if (errCode != E_OK) {
1173             LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1174             SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1175         }
1176     }
1177     return errCode != E_OK ? errCode : ret;
1178 }
1179 
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1180 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1181     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1182 {
1183     if (isLogicDelete_) {
1184         return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1185     }
1186     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1187     sqlite3_stmt *deleteStmt = nullptr;
1188     int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1189     if (errCode != E_OK) {
1190         return errCode;
1191     }
1192     errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1193     int ret = E_OK;
1194     SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1195     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1196         LOGE("delete data failed when sync with cloud:%d", errCode);
1197         return errCode;
1198     }
1199     if (ret != E_OK) {
1200         LOGE("reset delete statement failed:%d", ret);
1201         return ret;
1202     }
1203 
1204     // update log
1205     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1206     if (errCode != E_OK) {
1207         LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1208     }
1209     return errCode;
1210 }
1211 
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1212 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1213     const TableSchema &tableSchema, OpType opType)
1214 {
1215     return UpdateLogRecord(vBucket, tableSchema, opType);
1216 }
1217 
DeleteTableTrigger(const std::string & missTable) const1218 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1219 {
1220     static const char *triggerEndName[] = {
1221         "_ON_INSERT",
1222         "_ON_UPDATE",
1223         "_ON_DELETE"
1224     };
1225     std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1226     for (const auto &endName : triggerEndName) {
1227         std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1228         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1229         if (errCode != E_OK) {
1230             LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1231             return errCode;
1232         }
1233     }
1234     return E_OK;
1235 }
1236 
SetLogicDelete(bool isLogicDelete)1237 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1238 {
1239     isLogicDelete_ = isLogicDelete;
1240 }
1241 
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1242 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1243     const std::string &status, const Key &hashKey)
1244 {
1245     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1246     sqlite3_stmt *stmt = nullptr;
1247     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1248     if (errCode != E_OK) {
1249         LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1250         return errCode;
1251     }
1252     int ret = E_OK;
1253     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1254     if (errCode != E_OK) {
1255         LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1256         SQLiteUtils::ResetStatement(stmt, true, ret);
1257         return errCode;
1258     }
1259     errCode = SQLiteUtils::StepWithRetry(stmt);
1260     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1261         errCode = E_OK;
1262     } else {
1263         LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1264     }
1265     SQLiteUtils::ResetStatement(stmt, true, ret);
1266     return errCode == E_OK ? ret : errCode;
1267 }
1268 
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1269 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1270 {
1271     maxUploadCount_ = maxUploadCount;
1272     maxUploadSize_ = maxUploadSize;
1273 }
1274 
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1275 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1276     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1277 {
1278     LOGD("[RDBExecutor] logic delete skip delete data");
1279     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1280     if (errCode != E_OK) {
1281         return errCode;
1282     }
1283     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1284     if (errCode != E_OK) {
1285         return errCode;
1286     }
1287     if (!trackerTable.IsEmpty()) {
1288         return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1289     }
1290     return E_OK;
1291 }
1292 
InitCursorToMeta(const std::string & tableName)1293 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1294 {
1295     Value key;
1296     Value cursor;
1297     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1298     int errCode = GetKvData(key, cursor);
1299     if (errCode == -E_NOT_FOUND) {
1300         DBCommon::StringToVector(std::string("0"), cursor);
1301         errCode = PutKvData(key, cursor);
1302         if (errCode != E_OK) {
1303             LOGE("Init cursor to meta table failed. %d", errCode);
1304         }
1305         return errCode;
1306     }
1307     if (errCode != E_OK) {
1308         LOGE("Get cursor from meta table failed. %d", errCode);
1309     }
1310     return errCode;
1311 }
1312 
SetTableSchema(const TableSchema & tableSchema)1313 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1314 {
1315     tableSchema_ = tableSchema;
1316 }
1317 
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)1318 int SQLiteSingleVerRelationalStorageExecutor::ReviseLocalModTime(const std::string &tableName,
1319     const std::vector<ReviseModTimeInfo> &revisedData)
1320 {
1321     sqlite3_stmt *stmt = nullptr;
1322     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1323         " SET timestamp=? where hash_key=? AND timestamp=?";
1324     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1325     if (errCode != E_OK) {
1326         LOGE("[RDBExecutor][ReviseLocalModTime] Get stmt failed: %d", errCode);
1327         return errCode;
1328     }
1329     ResFinalizer finalizer([stmt]() {
1330         sqlite3_stmt *statement = stmt;
1331         int ret = E_OK;
1332         SQLiteUtils::ResetStatement(statement, true, ret);
1333         if (ret != E_OK) {
1334             LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed %d", ret);
1335         }
1336     });
1337     for (auto &data : revisedData) {
1338         int resetCode = E_OK;
1339         errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, data.curTime); // 1st bind modify time
1340         if (errCode != E_OK) {
1341             LOGE("[RDBExecutor][ReviseLocalModTime] Bind revised modify time failed: %d", errCode);
1342             return errCode;
1343         }
1344         errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, data.hashKey); // 2nd bind hash key
1345         if (errCode != E_OK) {
1346             LOGE("[RDBExecutor][ReviseLocalModTime] Bind hash key failed: %d", errCode);
1347             return errCode;
1348         }
1349         errCode = SQLiteUtils::BindInt64ToStatement(stmt, 3, data.invalidTime); // 3rd bind modify time
1350         if (errCode != E_OK) {
1351             LOGE("[RDBExecutor][ReviseLocalModTime] Bind modify time failed: %d", errCode);
1352             return errCode;
1353         }
1354         errCode = SQLiteUtils::StepWithRetry(stmt);
1355         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1356             LOGE("[RDBExecutor][ReviseLocalModTime] Revise failed: %d", errCode);
1357             return errCode;
1358         }
1359         LOGI("[RDBExecutor][ReviseLocalModTime] Local data mod time revised from %lld to %lld",
1360             data.invalidTime, data.curTime);
1361         SQLiteUtils::ResetStatement(stmt, false, resetCode);
1362         if (resetCode != E_OK) {
1363             LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed: %d", resetCode);
1364             return resetCode;
1365         }
1366     }
1367     return E_OK;
1368 }
1369 
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo,bool & isNotIncCursor)1370 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1371     const VBucket &vBucket, const Field &field, VBucket &assetInfo, bool &isNotIncCursor)
1372 {
1373     if (field.type == TYPE_INDEX<Asset>) {
1374         Asset asset;
1375         UpdateLocalAssetId(vBucket, field.colName, asset);
1376         Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1377         if (assetDBPtr == nullptr) {
1378             isNotIncCursor = true;
1379             return true;
1380         }
1381         const Asset &assetDB = *assetDBPtr;
1382         if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1383             isNotIncCursor = true;
1384             return true;
1385         }
1386     }
1387     if (field.type == TYPE_INDEX<Assets>) {
1388         Assets assets;
1389         UpdateLocalAssetsId(vBucket, field.colName, assets);
1390         Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1391         if (assetsDBPtr == nullptr) {
1392             isNotIncCursor = true;
1393             return true;
1394         }
1395         Assets &assetsDB = *assetsDBPtr;
1396         if (assets.size() != assetsDB.size()) {
1397             isNotIncCursor = true;
1398             return true;
1399         }
1400         for (uint32_t i = 0; i < assets.size(); ++i) {
1401             if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1402                 isNotIncCursor = true;
1403                 return true;
1404             }
1405         }
1406     }
1407     return false;
1408 }
1409 
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket,bool & isNotIncCursor)1410 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1411     const VBucket &vBucket, bool &isNotIncCursor)
1412 {
1413     std::vector<Field> assetFields;
1414     for (const auto &field : tableSchema.fields) {
1415         if (field.type == TYPE_INDEX<Asset>) {
1416             assetFields.push_back(field);
1417         }
1418         if (field.type == TYPE_INDEX<Assets>) {
1419             assetFields.push_back(field);
1420         }
1421     }
1422     if (assetFields.empty()) {
1423         return false;
1424     }
1425     sqlite3_stmt *selectStmt = nullptr;
1426     std::string queryAssetsSql = "SELECT ";
1427     for (const auto &field : assetFields) {
1428         queryAssetsSql += field.colName + ",";
1429     }
1430     queryAssetsSql.pop_back();
1431     queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1432         std::to_string(dataKey) + ";";
1433     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1434     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1435         LOGE("Get select assets statement failed, %d.", errCode);
1436         return true;
1437     }
1438     ResFinalizer finalizer([selectStmt]() {
1439         sqlite3_stmt *statementInner = selectStmt;
1440         int ret = E_OK;
1441         SQLiteUtils::ResetStatement(statementInner, true, ret);
1442         if (ret != E_OK) {
1443             LOGW("Reset stmt failed %d when get asset", ret);
1444         }
1445     });
1446     VBucket assetInfo;
1447     errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1448     if (errCode != E_OK) {
1449         return true;
1450     }
1451     return std::any_of(assetFields.begin(), assetFields.end(), [&](const Field &field) {
1452         return IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo, isNotIncCursor);
1453     });
1454 }
1455 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1456 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1457     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1458 {
1459     if (downloadData.data.size() != downloadData.opType.size()) {
1460         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1461         return -E_CLOUD_ERROR;
1462     }
1463     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1464         " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1465     sqlite3_stmt *stmt = nullptr;
1466     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1467     if (errCode != E_OK) {
1468         LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1469         return errCode;
1470     }
1471     int ret = E_OK;
1472     int index = 0;
1473     for (const auto &data: downloadData.data) {
1474         SQLiteUtils::ResetStatement(stmt, false, ret);
1475         OpType opType = downloadData.opType[index++];
1476         if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1477             continue;
1478         }
1479         errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1480         if (errCode != E_OK) {
1481             break;
1482         }
1483     }
1484     SQLiteUtils::ResetStatement(stmt, true, ret);
1485     return errCode == E_OK ? ret : errCode;
1486 }
1487 
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1488 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
1489     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1490 {
1491     if (downloadData.data.empty()) {
1492         return E_OK;
1493     }
1494     if (downloadData.data.size() != downloadData.opType.size()) {
1495         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1496         return -E_CLOUD_ERROR;
1497     }
1498     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag|0x1000 WHERE cloud_gid=?;";
1499     sqlite3_stmt *stmt = nullptr;
1500     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1501     if (errCode != E_OK) {
1502         LOGE("[Storage Executor]Get mark flag as asset async download stmt failed, %d.", errCode);
1503         return errCode;
1504     }
1505     int ret = E_OK;
1506     for (const auto &gid : gidFilters) {
1507         SQLiteUtils::ResetStatement(stmt, false, ret);
1508         if (ret != E_OK) {
1509             LOGE("[Storage Executor]Reset stmt failed:%d", ret);
1510             break;
1511         }
1512         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1513         if (errCode != E_OK) {
1514             break;
1515         }
1516         errCode = SQLiteUtils::StepWithRetry(stmt);
1517         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1518             errCode = E_OK;
1519         } else {
1520             LOGW("[Storage Executor]Step mark flag as asset async download stmt failed %d gid %s",
1521                 errCode, gid.c_str());
1522         }
1523     }
1524     SQLiteUtils::ResetStatement(stmt, true, ret);
1525     return errCode == E_OK ? ret : errCode;
1526 }
1527 
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1528 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1529     const CloudSyncBatch &batchData)
1530 {
1531     if (batchData.extend.empty()) {
1532         return E_OK;
1533     }
1534     if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1535         LOGE("invalid sync data for filling version.");
1536         return -E_INVALID_ARGS;
1537     }
1538     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1539         "' SET version = ? WHERE hash_key = ? ";
1540     sqlite3_stmt *stmt = nullptr;
1541     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1542     if (errCode != E_OK) {
1543         return errCode;
1544     }
1545     int ret = E_OK;
1546     for (size_t i = 0; i < batchData.extend.size(); ++i) {
1547         errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1548         if (errCode != E_OK) {
1549             LOGE("bind update version stmt failed.");
1550             SQLiteUtils::ResetStatement(stmt, true, ret);
1551             return errCode;
1552         }
1553     }
1554     SQLiteUtils::ResetStatement(stmt, true, ret);
1555     return ret;
1556 }
1557 
QueryCount(const std::string & tableName,int64_t & count)1558 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1559 {
1560     return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1561 }
1562 
CheckInventoryData(const std::string & tableName)1563 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1564 {
1565     int64_t dataCount = 0;
1566     int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1567     if (errCode != E_OK) {
1568         LOGE("Query count failed %d", errCode);
1569         return errCode;
1570     }
1571     return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1572 }
1573 
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1574 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp &timestamp,
1575     SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1576 {
1577     sqlite3_stmt *stmt = nullptr;
1578     int errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, stmt);
1579     if (errCode != E_OK) {
1580         LOGE("failed to get count statement %d", errCode);
1581         return errCode;
1582     }
1583     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1584     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1585         count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1586         errCode = E_OK;
1587     } else {
1588         LOGE("Failed to get the count to be uploaded. %d", errCode);
1589     }
1590     SQLiteUtils::ResetStatement(stmt, true, errCode);
1591     return errCode;
1592 }
1593 
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1594 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp &timestamp, bool isCloudForcePush,
1595     bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1596 {
1597     int errCode;
1598     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1599     if (errCode != E_OK) {
1600         return errCode;
1601     }
1602     std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1603         CloudWaterType::DELETE);
1604     return GetUploadCountInner(timestamp, helper, sql, count);
1605 }
1606 
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1607 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> &timestampVec,
1608     bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1609 {
1610     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1611     if (timestampVec.size() != typeVec.size()) {
1612         return -E_INVALID_ARGS;
1613     }
1614     int errCode;
1615     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1616     if (errCode != E_OK) {
1617         return errCode;
1618     }
1619     count = 0;
1620     for (size_t i = 0; i < typeVec.size(); i++) {
1621         std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1622         int64_t tempCount = 0;
1623         helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1624         errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1625         if (errCode != E_OK) {
1626             return errCode;
1627         }
1628         count += tempCount;
1629     }
1630     return E_OK;
1631 }
1632 
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1633 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1634     bool ignoreEmptyGid)
1635 {
1636     if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1637         cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1638         return -E_INVALID_ARGS;
1639     }
1640     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1641         + "' SET cloud_gid = ? WHERE data_key = ? ";
1642     sqlite3_stmt *stmt = nullptr;
1643     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1644     if (errCode != E_OK) {
1645         return errCode;
1646     }
1647     errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1648     int resetCode = E_OK;
1649     SQLiteUtils::ResetStatement(stmt, true, resetCode);
1650     return errCode == E_OK ? resetCode : errCode;
1651 }
1652 
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1653 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1654     CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1655 {
1656     token.GetCloudTableSchema(tableSchema_);
1657     sqlite3_stmt *queryStmt = nullptr;
1658     bool isStepNext = false;
1659     int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1660     if (errCode != E_OK) {
1661         (void)token.ReleaseCloudStatement();
1662         return errCode;
1663     }
1664     uint32_t totalSize = 0;
1665     uint32_t stepNum = -1;
1666     do {
1667         if (isStepNext) {
1668             errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1669             if (errCode != E_OK) {
1670                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1671                 break;
1672             }
1673         }
1674         isStepNext = true;
1675         errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1676     } while (errCode == E_OK);
1677     if (errCode != -E_UNFINISHED) {
1678         (void)token.ReleaseCloudStatement();
1679     }
1680     return errCode;
1681 }
1682 
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1683 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1684 {
1685     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1686         Asset asset;
1687         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1688         if (errCode != E_OK) {
1689             return errCode;
1690         }
1691         if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1692             return -E_CLOUD_ERROR;
1693         }
1694         vBucket.insert_or_assign(field.colName, asset);
1695     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1696         Assets assets;
1697         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1698         if (errCode != E_OK) {
1699             return errCode;
1700         }
1701         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1702             return -E_CLOUD_ERROR;
1703         }
1704         vBucket.insert_or_assign(field.colName, assets);
1705     } else {
1706         vBucket.insert_or_assign(field.colName, cloudValue);
1707     }
1708     return E_OK;
1709 }
1710 
GetDownloadAsset(std::vector<VBucket> & assetsV,const Field & field,Type & cloudValue)1711 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAsset(std::vector<VBucket> &assetsV, const Field &field,
1712     Type &cloudValue)
1713 {
1714     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1715         Asset asset;
1716         VBucket bucket;
1717         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1718         if (errCode != E_OK) {
1719             return errCode;
1720         }
1721         if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1722             bucket.insert_or_assign(field.colName, asset);
1723             assetsV.push_back(bucket);
1724         }
1725     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1726         Assets assets;
1727         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1728         if (errCode != E_OK) {
1729             return errCode;
1730         }
1731         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1732             return E_OK;
1733         }
1734         for (const auto &asset : assets) {
1735             if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1736                 VBucket bucket;
1737                 bucket.insert_or_assign(field.colName, asset);
1738                 assetsV.push_back(bucket);
1739             }
1740         }
1741     }
1742     return E_OK;
1743 }
1744 
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1745 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1746     const std::vector<Field> &assetFields, VBucket &assetInfo)
1747 {
1748     int errCode = SQLiteUtils::StepWithRetry(stmt);
1749     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1750         int index = 0;
1751         for (const auto &field : assetFields) {
1752             Type cloudValue;
1753             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1754             if (errCode != E_OK) {
1755                 break;
1756             }
1757             errCode = PutVBucketByType(assetInfo, field, cloudValue);
1758             if (errCode != E_OK) {
1759                 break;
1760             }
1761         }
1762     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1763         errCode = E_OK;
1764     } else {
1765         LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1766     }
1767     return errCode;
1768 }
1769 
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)1770 int SQLiteSingleVerRelationalStorageExecutor::GetLocalDataCount(const std::string &tableName, int &dataCount,
1771     int &logicDeleteDataCount)
1772 {
1773     std::string dataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key != -1";
1774     int errCode = SQLiteUtils::GetCountBySql(dbHandle_, dataCountSql, dataCount);
1775     if (errCode != E_OK) {
1776         LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
1777         return errCode;
1778     }
1779 
1780     std::string logicDeleteDataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) +
1781         " where flag&0x08!=0 and data_key != -1";
1782     errCode = SQLiteUtils::GetCountBySql(dbHandle_, logicDeleteDataCountSql, logicDeleteDataCount);
1783     if (errCode != E_OK) {
1784         LOGE("[RDBExecutor] Query local logic delete data count failed: %d", errCode);
1785     }
1786     return errCode;
1787 }
1788 
UpdateExtendField(const std::string & tableName,const std::set<std::string> & extendColNames)1789 int SQLiteSingleVerRelationalStorageExecutor::UpdateExtendField(const std::string &tableName,
1790     const std::set<std::string> &extendColNames)
1791 {
1792     bool isLogTableExist = false;
1793     int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
1794     if (errCode == E_OK && !isLogTableExist) {
1795         LOGW("[RDBExecutor][UpdateExtendField] Log table of [%s [%zu]] not found!",
1796             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1797         return E_OK;
1798     }
1799     std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " as log set extend_field = json_object(";
1800     for (const auto &extendColName : extendColNames) {
1801         sql += "'" + extendColName + "',data." + extendColName + ",";
1802     }
1803     sql.pop_back();
1804     sql += ") from " + tableName + " as data where log.data_key = data." + std::string(DBConstant::SQLITE_INNER_ROWID);
1805     sqlite3_stmt *stmt = nullptr;
1806     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1807     if (errCode != E_OK) {
1808         return errCode;
1809     }
1810 
1811     ResFinalizer finalizer([stmt]() {
1812         sqlite3_stmt *statement = stmt;
1813         int ret = E_OK;
1814         SQLiteUtils::ResetStatement(statement, true, ret);
1815         if (ret != E_OK) {
1816             LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1817         }
1818     });
1819     errCode = SQLiteUtils::StepWithRetry(stmt);
1820     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1821         LOGE("[RDBExecutor][UpdateExtendField] Update [%s [%zu]] extend field failed: %d",
1822             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1823         return errCode;
1824     }
1825     return E_OK;
1826 }
1827 
BuildJsonExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,sqlite3 * db)1828 int BuildJsonExtendField(const std::string &tableName, const std::string &lowVersionExtendColName, sqlite3 *db)
1829 {
1830     std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " set extend_field = json_object('" +
1831         lowVersionExtendColName + "',extend_field) where data_key = -1 and (json_valid(extend_field) = 0 or " +
1832         "json_extract(extend_field, '$." + lowVersionExtendColName +"') is null)";
1833     sqlite3_stmt *stmt = nullptr;
1834     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1835     if (errCode != E_OK) {
1836         return errCode;
1837     }
1838     errCode = SQLiteUtils::StepWithRetry(stmt);
1839     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1840         LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update [%s [%zu]] extend field non-JSON format failed: %d",
1841             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1842     } else {
1843         errCode = E_OK;
1844     }
1845     int ret = E_OK;
1846     SQLiteUtils::ResetStatement(stmt, true, ret);
1847     if (ret != E_OK) {
1848         LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1849     }
1850     return errCode;
1851 }
1852 
GetUpdateExtendFieldSql(const std::string & tableName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1853 std::string GetUpdateExtendFieldSql(const std::string &tableName, const std::set<std::string> &oldExtendColNames,
1854     const std::set<std::string> &extendColNames)
1855 {
1856     std::string sql = "update " + DBCommon::GetLogTableName(tableName) +
1857         " set extend_field = json_insert(extend_field,";
1858     bool isContainNewCol = false;
1859     for (const auto &extendColName : extendColNames) {
1860         if (oldExtendColNames.find(extendColName) != oldExtendColNames.end()) {
1861             continue;
1862         }
1863         isContainNewCol = true;
1864         sql += "'$." + extendColName + "',null,";
1865     }
1866     if (!isContainNewCol) {
1867         return "";
1868     }
1869     sql.pop_back();
1870     sql += ") where data_key = -1 and json_valid(extend_field) = 1;";
1871     return sql;
1872 }
1873 
UpdateDeleteDataExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1874 int SQLiteSingleVerRelationalStorageExecutor::UpdateDeleteDataExtendField(const std::string &tableName,
1875     const std::string &lowVersionExtendColName, const std::set<std::string> &oldExtendColNames,
1876     const std::set<std::string> &extendColNames)
1877 {
1878     bool isLogTableExist = false;
1879     if (SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist) == E_OK &&
1880         !isLogTableExist) {
1881         LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Log table of [%s [%zu]] not found!",
1882             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1883         return E_OK;
1884     }
1885     int errCode = E_OK;
1886     if (!lowVersionExtendColName.empty()) {
1887         errCode = BuildJsonExtendField(tableName, lowVersionExtendColName, dbHandle_);
1888         if (errCode != E_OK) {
1889             LOGE("[UpdateDeleteDataExtendField] Update low version extend field of [%s [%zu]] to json failed: %d",
1890                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1891             return errCode;
1892         }
1893     }
1894     std::string sql = GetUpdateExtendFieldSql(tableName, oldExtendColNames, extendColNames);
1895     if (sql.empty()) {
1896         return E_OK;
1897     }
1898 
1899     sqlite3_stmt *stmt = nullptr;
1900     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1901     if (errCode != E_OK) {
1902         return errCode;
1903     }
1904     errCode = SQLiteUtils::StepWithRetry(stmt);
1905     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1906         LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update extend field of [%s [%zu]] failed: %d",
1907             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1908     } else {
1909         errCode = E_OK;
1910     }
1911     int ret = E_OK;
1912     SQLiteUtils::ResetStatement(stmt, true, ret);
1913     if (ret != E_OK) {
1914         LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Reset stmt failed %d", ret);
1915     }
1916     return errCode;
1917 }
1918 
GetDownloadAssetGid(const TableSchema & tableSchema,std::vector<std::string> & gids,int64_t beginTime,bool abortWithLimit)1919 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetGid(const TableSchema &tableSchema,
1920     std::vector<std::string> &gids, int64_t beginTime, bool abortWithLimit)
1921 {
1922     std::string sql = "SELECT cloud_gid FROM " + DBCommon::GetLogTableName(tableSchema.name) +
1923         " WHERE flag&0x1000=0x1000 AND timestamp >= ?;";
1924     sqlite3_stmt *stmt = nullptr;
1925     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1926     if (errCode != E_OK) {
1927         LOGE("[RDBExecutor]Get gid statement failed, %d", errCode);
1928         return errCode;
1929     }
1930     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, beginTime);
1931     if (errCode != E_OK) {
1932         LOGE("[RDBExecutor] bind time failed %d when get download asset gid", errCode);
1933         SQLiteUtils::ResetStatement(stmt, true, errCode);
1934         return errCode;
1935     }
1936     uint32_t count = 0;
1937     do {
1938         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1939         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1940             errCode = E_OK;
1941             break;
1942         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1943             LOGE("[RDBExecutor]Get downloading assets gid failed. %d", errCode);
1944             break;
1945         }
1946         std::string gid;
1947         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1948         if (errCode != E_OK) {
1949             LOGW("[RDBExecutor]Get downloading assets gid failed %d when get col", errCode);
1950             continue;
1951         }
1952         gids.push_back(gid);
1953         if (AbortGetDownloadAssetGidIfNeed(tableSchema, gid, abortWithLimit, count)) {
1954             break;
1955         }
1956     } while (errCode == E_OK);
1957     int ret = E_OK;
1958     SQLiteUtils::ResetStatement(stmt, true, ret);
1959     return errCode == E_OK ? ret : errCode;
1960 }
1961 
GetDownloadAssetRecordsByGid(const TableSchema & tableSchema,const std::string gid,std::vector<VBucket> & assets)1962 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsByGid(const TableSchema &tableSchema,
1963     const std::string gid, std::vector<VBucket> &assets)
1964 {
1965     std::vector<Field> assetFields;
1966     std::string sql = "SELECT";
1967     for (const auto &field: tableSchema.fields) {
1968         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1969             assetFields.emplace_back(field);
1970             sql += " b." + field.colName + ",";
1971         }
1972     }
1973     if (assetFields.empty()) {
1974         return E_OK;
1975     }
1976     sql.pop_back(); // remove last ,
1977     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE a.cloud_gid = ?;";
1978     sqlite3_stmt *stmt = nullptr;
1979     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1980     if (errCode != E_OK) {
1981         LOGE("Get downloading asset records statement failed, %d", errCode);
1982         return errCode;
1983     }
1984     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1985     if (errCode != E_OK) {
1986         SQLiteUtils::ResetStatement(stmt, true, errCode);
1987         return errCode;
1988     }
1989     errCode = SQLiteUtils::StepWithRetry(stmt);
1990     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1991         int index = 0;
1992         for (const auto &field: assetFields) {
1993             Type value;
1994             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, value);
1995             if (errCode != E_OK) {
1996                 break;
1997             }
1998             errCode = GetDownloadAsset(assets, field, value);
1999             if (errCode != E_OK) {
2000                 break;
2001             }
2002         }
2003     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2004         errCode = E_OK;
2005     } else {
2006         LOGE("step get downloading asset records statement failed %d.", errCode);
2007     }
2008     int ret = E_OK;
2009     SQLiteUtils::ResetStatement(stmt, true, ret);
2010     return errCode == E_OK ? ret : errCode;
2011 }
2012 
GetDownloadingCount(const std::string & tableName,int32_t & count)2013 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingCount(const std::string &tableName, int32_t &count)
2014 {
2015     std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName) + " WHERE flag&0x1000=0x1000;";
2016     int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
2017     if (errCode != E_OK) {
2018         LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
2019     }
2020     return errCode;
2021 }
2022 
GetDownloadingAssetsCount(const TableSchema & tableSchema,int32_t & totalCount)2023 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingAssetsCount(
2024     const TableSchema &tableSchema, int32_t &totalCount)
2025 {
2026     std::vector<std::string> gids;
2027     int errCode = GetDownloadAssetGid(tableSchema, gids);
2028     if (errCode != E_OK) {
2029         LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2030         return errCode;
2031     }
2032     for (const auto &gid : gids) {
2033         std::vector<VBucket> assets;
2034         errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2035         if (errCode != E_OK) {
2036             LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
2037             return errCode;
2038         }
2039         totalCount += static_cast<int32_t>(assets.size());
2040     }
2041     return E_OK;
2042 }
2043 
GetDownloadAssetRecordsInner(const TableSchema & tableSchema,int64_t beginTime,std::vector<std::string> & gids)2044 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsInner(
2045     const TableSchema &tableSchema, int64_t beginTime, std::vector<std::string> &gids)
2046 {
2047     int errCode = GetDownloadAssetGid(tableSchema, gids, beginTime, true);
2048     if (errCode != E_OK) {
2049         LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2050     }
2051     return errCode;
2052 }
2053 
CleanDownloadingFlag(const std::string & tableName)2054 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlag(const std::string &tableName)
2055 {
2056     std::string sql;
2057     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000);";
2058     sqlite3_stmt *stmt = nullptr;
2059     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2060     if (errCode != E_OK) {
2061         LOGE("[RDBExecutor]Get stmt failed clean downloading flag: %d, tableName: %s, length: %zu",
2062             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2063         return errCode;
2064     }
2065     errCode = SQLiteUtils::StepWithRetry(stmt);
2066     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2067         errCode = E_OK;
2068     } else {
2069         LOGE("[RDBExecutor]Clean downloading flag failed: %d, tableName: %s, length: %zu",
2070             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2071     }
2072     int ret = E_OK;
2073     SQLiteUtils::ResetStatement(stmt, true, ret);
2074     if (ret != E_OK) {
2075         LOGE("[RDBExecutor]Reset stmt failed clean downloading flag: %d", ret);
2076     }
2077     return errCode != E_OK ? errCode : ret;
2078 }
2079 
AbortGetDownloadAssetGidIfNeed(const DistributedDB::TableSchema & tableSchema,const std::string & gid,bool abortWithLimit,uint32_t & count)2080 bool SQLiteSingleVerRelationalStorageExecutor::AbortGetDownloadAssetGidIfNeed(
2081     const DistributedDB::TableSchema &tableSchema, const std::string &gid, bool abortWithLimit,
2082     uint32_t &count)
2083 {
2084     if (!abortWithLimit) {
2085         return false;
2086     }
2087     std::vector<VBucket> assets;
2088     int errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2089     if (errCode != E_OK) {
2090         LOGW("[RDBExecutor]Get downloading assets failed %d gid %s", errCode, gid.c_str());
2091         return false;
2092     }
2093     count += assets.size();
2094     return count >= RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetMaxDownloadAssetsCount();
2095 }
2096 } // namespace DistributedDB
2097 #endif
2098