• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27 
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32 
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34 
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36     std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38     if (assetFields.empty() && pkSet.empty()) {
39         return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40     }
41     std::string gid;
42     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44         LOGE("Get cloud gid fail when query log table.");
45         return errCode;
46     }
47 
48     if (pkSet.empty() && gid.empty()) {
49         LOGE("query log table failed because of both primary key and gid are empty.");
50         return -E_CLOUD_ERROR;
51     }
52     std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53         " a.cloud_gid, a.sharing_resource, a.status, a.version";
54     for (const auto &field : assetFields) {
55         sql += ", b." + field.colName;
56     }
57     for (const auto &pk : pkSet) {
58         sql += ", b." + pk;
59     }
60     sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61     if (!gid.empty()) {
62         sql += " a.cloud_gid = ? or ";
63     }
64     sql += "a.hash_key = ?";
65     querySql = sql;
66     return E_OK;
67 }
68 
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70     const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72     std::string sql = "UPDATE " + tableName + " SET ";
73     for (const auto &field: fields) {
74         sql += field.colName + " = ?,";
75     }
76     sql.pop_back();
77     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78     sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79     sqlite3_stmt *stmt = nullptr;
80     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81     if (errCode != E_OK) {
82         LOGE("Get fill asset statement failed, %d.", errCode);
83         return errCode;
84     }
85     for (size_t i = 0; i < fields.size(); ++i) {
86         errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87         if (errCode != E_OK) {
88             SQLiteUtils::ResetStatement(stmt, true, errCode);
89             return errCode;
90         }
91     }
92     statement = stmt;
93     return errCode;
94 }
95 
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)96 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
97     VBucket &vBucket, bool isDownloadSuccess)
98 {
99     std::string cloudGid;
100     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
101     if (errCode != E_OK) {
102         LOGE("Miss gid when fill Asset.");
103         return errCode;
104     }
105     std::vector<Field> assetsField;
106     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
107     if (errCode != E_OK) {
108         LOGE("No assets need to be filled.");
109         return errCode;
110     }
111     CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
112 
113     Bytes hashKey;
114     (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
115     VBucket dbAssets;
116     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
117     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
118         LOGE("get assets by gid or hashkey failed %d.", errCode);
119         return errCode;
120     }
121     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
122         AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
123 
124     if (isDownloadSuccess) {
125         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
126             CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
127         errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid);
128         if (errCode != E_OK) {
129             return errCode;
130         }
131     } else {
132         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
133             CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
134     }
135 
136     sqlite3_stmt *stmt = nullptr;
137     errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
138     if (errCode != E_OK) {
139         return errCode;
140     }
141     errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
142     int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
143     return errCode == E_OK ? ret : errCode;
144 }
145 
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid)146 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
147     const std::string &gid)
148 {
149     int cursor = GetCursor(tableName);
150     cursor++;
151     std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
152     sql += " SET cursor = ? where cloud_gid = ?;";
153     sqlite3_stmt *statement = nullptr;
154     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
155     if (errCode != E_OK) {
156         LOGE("get update asset data cursor stmt failed %d.", errCode);
157         return errCode;
158     }
159     ResFinalizer finalizer([statement]() {
160         sqlite3_stmt *statementInner = statement;
161         int ret = E_OK;
162         SQLiteUtils::ResetStatement(statementInner, true, ret);
163         if (ret != E_OK) {
164             LOGW("Reset  stmt failed %d when increase cursor on asset data", ret);
165         }
166     });
167     int index = 1;
168     errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
169     if (errCode != E_OK) {
170         LOGE("bind cursor data stmt failed %d.", errCode);
171         return errCode;
172     }
173     errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
174     if (errCode != E_OK) {
175         LOGE("bind cursor gid data stmt failed %d.", errCode);
176         return errCode;
177     }
178     errCode = SQLiteUtils::StepWithRetry(statement, false);
179     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
180         LOGE("Fill upload asset failed:%d.", errCode);
181         return errCode;
182     }
183     LOGI("Upgrade cursor to %d after asset download success.", cursor);
184     errCode = SetCursor(tableName, cursor);
185     if (errCode != E_OK) {
186         LOGE("Upgrade cursor failed after asset download success %d.", errCode);
187     }
188     return errCode;
189 }
190 
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)191 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
192     const CloudSyncBatch &data)
193 {
194     int errCode = E_OK;
195     if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
196         return errCode;
197     }
198     errCode = SetLogTriggerStatus(false);
199     if (errCode != E_OK) {
200         LOGE("Fail to set log trigger off, %d.", errCode);
201         return errCode;
202     }
203     sqlite3_stmt *stmt = nullptr;
204     for (size_t i = 0; i < data.assets.size(); ++i) {
205         if (data.assets.at(i).empty()) {
206             continue;
207         }
208         if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
209             DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
210             continue;
211         }
212         errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
213         if (errCode != E_OK) {
214             if (errCode == -E_NOT_FOUND) {
215                 errCode = E_OK;
216                 continue;
217             }
218             break;
219         }
220         errCode = SQLiteUtils::StepWithRetry(stmt, false);
221         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
222             LOGE("Fill upload asset failed:%d.", errCode);
223             break;
224         }
225         errCode = E_OK;
226         SQLiteUtils::ResetStatement(stmt, true, errCode);
227         stmt = nullptr;
228         if (errCode != E_OK) {
229             break;
230         }
231     }
232     int ret = E_OK;
233     SQLiteUtils::ResetStatement(stmt, true, ret);
234     int endCode = SetLogTriggerStatus(true);
235     if (endCode != E_OK) {
236         LOGE("Fail to set log trigger off, %d.", endCode);
237         return endCode;
238     }
239     return errCode != E_OK ? errCode : ret;
240 }
241 
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)242 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
243 {
244     switch (opType) {
245         case OpType::UPDATE_VERSION:
246             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
247         case OpType::INSERT_VERSION:
248             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
249         default:
250             LOGE("Fill version with unknown type %d", static_cast<int>(opType));
251             return -E_INVALID_ARGS;
252     }
253 }
254 
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)255 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
256     sqlite3_stmt *&stmt)
257 {
258     int errCode = E_OK;
259     std::string version;
260     if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
261         vBucket, version) != E_OK) {
262         LOGW("get version from vBucket failed.");
263     }
264     if (hashKey.empty()) {
265         LOGE("hash key is empty when update version.");
266         return -E_CLOUD_ERROR;
267     }
268     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
269     if (errCode != E_OK) {
270         return errCode;
271     }
272     errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
273     if (errCode != E_OK) {
274         return errCode;
275     }
276     errCode = SQLiteUtils::StepWithRetry(stmt, false);
277     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
278         errCode = E_OK;
279         SQLiteUtils::ResetStatement(stmt, false, errCode);
280     } else {
281         LOGE("step version stmt failed: %d.", errCode);
282     }
283     return errCode;
284 }
285 
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)286 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
287     const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
288 {
289     VBucket vBucket = data.assets.at(index);
290     VBucket dbAssets;
291     std::string cloudGid;
292     int errCode;
293     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
294     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
295     if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
296         return errCode;
297     }
298     AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
299         AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
300     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
301         action);
302     if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
303         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
304             CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
305     } else {
306         if (DBCommon::IsRecordError(data.extend.at(index))) {
307             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
308                 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
309         } else {
310             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
311                 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
312         }
313     }
314 
315     errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
316     if (errCode != E_OK) {
317         LOGE("get and bind asset failed %d.", errCode);
318         return errCode;
319     }
320     int64_t rowid = data.rowid[index];
321     return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
322 }
323 
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)324 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
325     TableInfo &tableInfo)
326 {
327     return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
328 }
329 
CreateTrackerTable(const TrackerTable & trackerTable,bool isUpgrade)330 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable, bool isUpgrade)
331 {
332     TableInfo table;
333     table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
334     int errCode = AnalysisTrackerTable(trackerTable, table);
335     if (errCode != E_OK) {
336         return errCode;
337     }
338     auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
339     if (trackerTable.GetTrackerColNames().empty()) {
340         // drop trigger
341         return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
342     }
343 
344     // create log table
345     errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
346     if (errCode != E_OK) {
347         return errCode;
348     }
349     // init cursor
350     errCode = InitCursorToMeta(table.GetTableName());
351     if (errCode != E_OK) {
352         return errCode;
353     }
354     std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, "");
355     if (isUpgrade) {
356         errCode = CleanExtendAndCursorForDeleteData(table.GetTableName());
357         if (errCode != E_OK) {
358             LOGE("clean tracker log info for deleted data failed %d.", errCode);
359             return errCode;
360         }
361     }
362     errCode = GeneLogInfoForExistedData(dbHandle_, trackerTable.GetTableName(), calPrimaryKeyHash, table);
363     if (errCode != E_OK) {
364         LOGE("general tracker log info for existed data failed %d.", errCode);
365         return errCode;
366     }
367     errCode = SetLogTriggerStatus(true);
368     if (errCode != E_OK) {
369         return errCode;
370     }
371     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
372     if (errCode != E_OK) {
373         return errCode;
374     }
375     if (!isUpgrade) {
376         return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
377     }
378     return E_OK;
379 }
380 
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)381 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
382 {
383     if (!schema.ToSchemaString().empty()) {
384         return E_OK;
385     }
386     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
387         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
388     Value schemaVal;
389     int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
390     if (errCode != E_OK) {
391         return errCode;
392     }
393     if (schemaVal.empty()) {
394         return -E_NOT_FOUND;
395     }
396     std::string schemaStr;
397     DBCommon::VectorToString(schemaVal, schemaStr);
398     errCode = schema.ParseFromTrackerSchemaString(schemaStr);
399     if (errCode != E_OK) {
400         LOGE("Parse from tracker schema string err.");
401     }
402     return errCode;
403 }
404 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
406 {
407     sqlite3_stmt *statement = nullptr;
408     int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
409     if (errCode != E_OK) {
410         LOGE("Execute sql failed when prepare stmt.");
411         return errCode;
412     }
413     size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
414     if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
415         LOGE("Sql bind args mismatch.");
416         SQLiteUtils::ResetStatement(statement, true, errCode);
417         return -E_INVALID_ARGS;
418     }
419     for (size_t i = 0; i < condition.bindArgs.size(); i++) {
420         Type type = condition.bindArgs[i];
421         errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
422         if (errCode != E_OK) {
423             int ret = E_OK;
424             SQLiteUtils::ResetStatement(statement, true, ret);
425             return errCode;
426         }
427     }
428     while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
429         VBucket bucket;
430         errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
431         if (errCode != E_OK) {
432             int ret = E_OK;
433             SQLiteUtils::ResetStatement(statement, true, ret);
434             return errCode;
435         }
436         records.push_back(std::move(bucket));
437     }
438     int ret = E_OK;
439     SQLiteUtils::ResetStatement(statement, true, ret);
440     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
441 }
442 
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)443 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
444     const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
445     std::set<std::string> &clearWaterMarkTables)
446 {
447     std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
448     for (const auto &table : changeTables) {
449         std::string logTableName = DBCommon::GetLogTableName(table);
450         bool isExists = false;
451         int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
452         if (errCode != E_OK) {
453             LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
454             return errCode;
455         }
456         if (!isExists) { // table maybe dropped after set reference
457             LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
458             continue;
459         }
460 
461         bool isEmpty = true;
462         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
463         if (errCode != E_OK) {
464             LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
465             clearWaterMarkTables.clear();
466             return errCode;
467         }
468         if (!isEmpty) {
469             clearWaterMarkTables.insert(table);
470         }
471     }
472     LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
473     return E_OK;
474 }
475 
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)476 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
477 {
478     if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
479         return E_OK;
480     }
481     std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
482     if (schemaChanged) {
483         std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
484             "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
485         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
486         if (ret != E_OK) {
487             LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
488             return ret;
489         }
490     }
491     if (tableInfo.GetTrackerTable().IsEmpty()) {
492         return E_OK;
493     }
494     LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
495     int errCode = SetLogTriggerStatus(false);
496     if (errCode != E_OK) {
497         return errCode;
498     }
499     std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
500     TrackerTable trackerTable = tableInfo.GetTrackerTable();
501     errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
502         [this, &sql]() {
503         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
504         if (ret != E_OK) {
505             LOGE("Upgrade log for extend field failed.");
506         }
507         return ret;
508     });
509     return SetLogTriggerStatus(true);
510 }
511 
CreateTempSyncTrigger(const TrackerTable & trackerTable)512 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable)
513 {
514     int errCode = E_OK;
515     std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
516     for (const auto &sql: dropSql) {
517         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
518         if (errCode != E_OK) {
519             LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
520             return errCode;
521         }
522     }
523     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql());
524     if (errCode != E_OK) {
525         LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
526         return errCode;
527     }
528     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql());
529     if (errCode != E_OK) {
530         LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
531         return errCode;
532     }
533     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql());
534     if (errCode != E_OK) {
535         LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
536     }
537     return errCode;
538 }
539 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)540 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
541     ChangeProperties &changeProperties)
542 {
543     std::string fileName;
544     if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
545         LOGE("get db file name failed.");
546         return -E_INVALID_DB;
547     }
548     SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
549     return E_OK;
550 }
551 
ClearAllTempSyncTrigger()552 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
553 {
554     sqlite3_stmt *stmt = nullptr;
555     static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
556     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
557     if (errCode != E_OK) {
558         LOGE("get clear all temp trigger stmt failed %d.", errCode);
559         return errCode;
560     }
561     int ret = E_OK;
562     while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
563         std::string str;
564         (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
565         if (errCode != E_OK) {
566             SQLiteUtils::ResetStatement(stmt, true, ret);
567             return errCode;
568         }
569         std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
570         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
571         if (errCode != E_OK) {
572             LOGE("drop temp trigger failed %d.", errCode);
573             SQLiteUtils::ResetStatement(stmt, true, ret);
574             return errCode;
575         }
576     }
577     SQLiteUtils::ResetStatement(stmt, true, ret);
578     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
579 }
580 
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)581 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
582     bool isOnlyTrackTable)
583 {
584     std::string sql;
585     if (isOnlyTrackTable) {
586         sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
587     } else {
588         sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log SET extend_field = NULL";
589     }
590     sql += " where data_key = -1 and cursor <= ?;";
591     sqlite3_stmt *statement = nullptr;
592     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
593     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
594         LOGE("get clean tracker data stmt failed %d.", errCode);
595         return errCode;
596     }
597     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
598     int ret = E_OK;
599     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
600         LOGE("bind clean tracker data stmt failed %d.", errCode);
601         SQLiteUtils::ResetStatement(statement, true, ret);
602         return errCode;
603     }
604     errCode = SQLiteUtils::StepWithRetry(statement);
605     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
606         errCode = E_OK;
607     } else {
608         LOGE("clean tracker step failed: %d.", errCode);
609     }
610     SQLiteUtils::ResetStatement(statement, true, ret);
611     return errCode == E_OK ? ret : errCode;
612 }
613 
CreateSharedTable(const TableSchema & tableSchema)614 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
615 {
616     std::map<int32_t, std::string> cloudFieldTypeMap;
617     cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
618     cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
619     cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
620     cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
621     cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
622     cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
623     cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
624     cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
625 
626     std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
627     std::string primaryKey = ", PRIMARY KEY (";
628     createTableSql += CloudDbConstant::CLOUD_OWNER;
629     createTableSql += " TEXT, ";
630     createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
631     createTableSql += " TEXT";
632     primaryKey += CloudDbConstant::CLOUD_OWNER;
633     bool hasPrimaryKey = false;
634     for (const auto &field : tableSchema.fields) {
635         createTableSql += ", " + field.colName + " ";
636         createTableSql += cloudFieldTypeMap[field.type];
637         createTableSql += field.nullable ? "" : " NOT NULL";
638         if (field.primary) {
639             primaryKey += ", " + field.colName;
640             hasPrimaryKey = true;
641         }
642     }
643     if (hasPrimaryKey) {
644         createTableSql += primaryKey + ")";
645     }
646     createTableSql += ");";
647     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
648     if (errCode != E_OK) {
649         LOGE("Create shared table failed, %d.", errCode);
650     }
651     return errCode;
652 }
653 
DeleteTable(const std::vector<std::string> & tableNames)654 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
655 {
656     for (const auto &tableName : tableNames) {
657         std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
658         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
659         if (errCode != E_OK) {
660             LOGE("Delete table failed, %d.", errCode);
661             return errCode;
662         }
663     }
664     return E_OK;
665 }
666 
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)667 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
668     const std::map<std::string, std::vector<Field>> &updateTableNames)
669 {
670     int errCode = E_OK;
671     std::map<int32_t, std::string> fieldTypeMap;
672     fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
673     fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
674     fieldTypeMap[TYPE_INDEX<double>] = "REAL";
675     fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
676     fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
677     fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
678     fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
679     fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
680     for (const auto &table : updateTableNames) {
681         if (table.second.empty()) {
682             continue;
683         }
684         std::string addColumnSql = "";
685         for (const auto &field : table.second) {
686             addColumnSql += "ALTER TABLE " + table.first + " ADD ";
687             addColumnSql += field.colName + " ";
688             addColumnSql += fieldTypeMap[field.type];
689             addColumnSql += field.primary ? " PRIMARY KEY" : "";
690             addColumnSql += field.nullable ? ";" : " NOT NULL;";
691         }
692         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
693         if (errCode != E_OK) {
694             LOGE("Shared table add column failed, %d.", errCode);
695             return errCode;
696         }
697     }
698     return errCode;
699 }
700 
AlterTableName(const std::map<std::string,std::string> & tableNames)701 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
702 {
703     for (const auto &tableName : tableNames) {
704         std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
705         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
706         if (errCode != E_OK) {
707             LOGE("Alter table name failed, %d.", errCode);
708             return errCode;
709         }
710     }
711     return E_OK;
712 }
713 
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)714 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
715     const VBucket &vBucket, std::string &sql)
716 {
717     std::string gidStr;
718     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
719     if (errCode != E_OK) {
720         LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
721         return errCode;
722     }
723 
724     sql += " WHERE ";
725     if (!gidStr.empty()) {
726         sql += "cloud_gid = '" + gidStr + "'";
727     }
728     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
729     if (!pkMap.empty()) {
730         if (!gidStr.empty()) {
731             sql += " OR ";
732         }
733         sql += "(hash_key = ?);";
734     }
735     return E_OK;
736 }
737 
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)738 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
739 {
740     int ret = E_OK;
741     int errCode = E_OK;
742     for (const auto &tableName: tableNameList) {
743         std::string delDataSql = "DELETE FROM '" + tableName + "';";
744         sqlite3_stmt *statement = nullptr;
745         errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
746         if (errCode != E_OK) {
747             LOGE("get clean shared data stmt failed %d.", errCode);
748             return errCode;
749         }
750         errCode = SQLiteUtils::StepWithRetry(statement);
751         SQLiteUtils::ResetStatement(statement, true, ret);
752         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
753             errCode = E_OK;
754         } else {
755             LOGE("clean shared data failed: %d.", errCode);
756             break;
757         }
758         statement = nullptr;
759         std::string delLogSql = "DELETE FROM '" + DBConstant::RELATIONAL_PREFIX + tableName + "_log';";
760         errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
761         if (errCode != E_OK) {
762             LOGE("get clean shared log stmt failed %d.", errCode);
763             return errCode;
764         }
765         errCode = SQLiteUtils::StepWithRetry(statement);
766         SQLiteUtils::ResetStatement(statement, true, ret);
767         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
768             errCode = E_OK;
769         } else {
770             LOGE("clean shared log failed: %d.", errCode);
771             break;
772         }
773     }
774     return errCode == E_OK ? ret : errCode;
775 }
776 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)777 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
778     const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
779     std::map<int64_t, Entries> &referenceGid)
780 {
781     int errCode = E_OK;
782     for (const auto &[targetTable, targetReference] : tableReference) {
783         errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
784         if (errCode != E_OK) {
785             LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
786             return errCode;
787         }
788     }
789     return errCode;
790 }
791 
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)792 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
793     const std::string &targetTable, const CloudSyncBatch &syncBatch,
794     const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
795 {
796     auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
797     if (sourceFields.empty()) {
798         LOGD("[RDBExecutor] source field is empty.");
799         return E_OK;
800     }
801     if (sourceFields.size() != targetFields.size()) {
802         LOGE("[RDBExecutor] reference field size not equal.");
803         return -E_INTERNAL_ERROR;
804     }
805     std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
806     sqlite3_stmt *statement = nullptr;
807     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
808     if (errCode != E_OK) {
809         LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
810         return errCode;
811     }
812     errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
813     int ret = E_OK;
814     SQLiteUtils::ResetStatement(statement, true, ret);
815     return errCode == E_OK ? ret : errCode;
816 }
817 
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)818 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
819     const std::string &targetTable, const std::vector<std::string> &sourceFields,
820     const std::vector<std::string> &targetFields)
821 {
822     // sql like this:
823     // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
824     //   (SELECT parent._rowid_ AS rowid_b FROM parent,
825     //     (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
826     //     WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
827     //   WHERE parent.name = source_a.name ) temp_table
828     // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
829     std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
830     std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
831     std::string sql;
832     sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
833     sql += "(";
834     sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
835            + ", ";
836     sql += "(SELECT " + sourceTable + "._rowid_,";
837     std::set<std::string> sourceFieldSet;
838     for (const auto &item : sourceFields) {
839         sourceFieldSet.insert(item);
840     }
841     for (const auto &sourceField : sourceFieldSet) {
842         sql += sourceField + ",";
843     }
844     sql.pop_back();
845     sql += " FROM " + sourceTable + ", " + logSourceTable;
846     sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
847     sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
848     sql += " WHERE ";
849     for (size_t i = 0u; i < sourceFields.size(); ++i) {
850         if (i != 0u) {
851             sql += " AND ";
852         }
853         sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
854     }
855     sql += ") temp_table ";
856     sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
857     sql += " AND " + logTargetTable + ".flag&0x08=0x00";
858     return sql;
859 }
860 
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)861 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
862     const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
863 {
864     int errCode = E_OK;
865     if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
866         LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
867             syncBatch.timestamp.size());
868         return -E_INVALID_ARGS;
869     }
870     int matchCount = 0;
871     for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
872         errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
873         if (errCode != E_OK) {
874             LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
875             break;
876         }
877         errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
878         if (errCode != E_OK) {
879             LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
880             break;
881         }
882         while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
883             std::string gid;
884             (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
885             if (gid.empty()) {
886                 LOGE("[RDBExecutor] reference data don't contain gid.");
887                 errCode = -E_CLOUD_ERROR;
888                 break;
889             }
890             referenceGid[syncBatch.rowid[i]][targetTable] = gid;
891             matchCount++;
892         }
893         if (errCode == -E_FINISHED) {
894             errCode = E_OK;
895         }
896         if (errCode != E_OK) {
897             LOGE("[RDBExecutor] step stmt failed %d.", errCode);
898             break;
899         }
900         SQLiteUtils::ResetStatement(statement, false, errCode);
901         if (errCode != E_OK) {
902             LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
903             break;
904         }
905     }
906     if (matchCount != 0) {
907         LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
908     }
909     return errCode;
910 }
911 
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)912 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
913     const std::vector<TableReferenceProperty> &targetTableReference)
914 {
915     PairStringVector sourceTargetFiled;
916     for (const auto &reference : targetTableReference) {
917         for (const auto &column : reference.columns) {
918             sourceTargetFiled.first.push_back(column.first);
919             sourceTargetFiled.second.push_back(column.second);
920         }
921     }
922     return sourceTargetFiled;
923 }
924 
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)925 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
926     bool ignoreEmptyGid, sqlite3_stmt *&stmt)
927 {
928     int fillGidCount = 0;
929     int errCode = E_OK;
930     for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
931         auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
932         if (gidEntry == cloudDataResult.insData.extend[i].end()) {
933             bool isSkipAssetsMissRecord = false;
934             if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
935                 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
936                 isSkipAssetsMissRecord = true;
937             }
938             if (ignoreEmptyGid || isSkipAssetsMissRecord) {
939                 continue;
940             }
941             errCode = -E_INVALID_ARGS;
942             LOGE("[RDBExecutor] Extend not contain gid.");
943             break;
944         }
945         bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
946         if (ignoreEmptyGid && containError) {
947             continue;
948         }
949         std::string val;
950         if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
951             cloudDataResult.insData.extend[i], val) != E_OK) {
952             errCode = -E_INVALID_DATA;
953             LOGE("[RDBExecutor] Can't get string gid from extend.");
954             break;
955         }
956         if (val.empty()) {
957             errCode = -E_CLOUD_ERROR;
958             LOGE("[RDBExecutor] Get empty gid from extend.");
959             break;
960         }
961         errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
962         if (errCode != E_OK) {
963             LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
964             break;
965         }
966     }
967     LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
968     return errCode;
969 }
970 
CleanExtendAndCursorForDeleteData(const std::string & tableName)971 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
972 {
973     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
974     std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
975     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
976     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
977         LOGE("update extend field and cursor failed %d.", errCode);
978     }
979     return errCode;
980 }
981 
CheckIfExistUserTable(const std::string & tableName)982 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
983 {
984     std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
985     sqlite3_stmt *statement = nullptr;
986     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
987     if (errCode != E_OK) {
988         LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
989         return errCode;
990     }
991     errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
992     if (errCode != E_OK) {
993         LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
994         SQLiteUtils::ResetStatement(statement, true, errCode);
995         return errCode;
996     }
997     if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
998         LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
999         SQLiteUtils::ResetStatement(statement, true, errCode);
1000         return -E_INVALID_ARGS;
1001     }
1002     SQLiteUtils::ResetStatement(statement, true, errCode);
1003     return E_OK;
1004 }
1005 
GetCloudDeleteSql(const std::string & logTable)1006 std::string SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &logTable)
1007 {
1008     std::string sql;
1009     sql += " cloud_gid = '', version = '', ";
1010     if (isLogicDelete_) {
1011         // 1001 which is logicDelete|cloudForcePush|local|delete
1012         sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1013             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1014             static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = (SELECT CASE WHEN (MAX(cursor) is "
1015             "null) THEN 1 ELSE MAX(cursor) + 1 END FROM " + logTable + ")";
1016     } else {
1017         sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1018             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1019     }
1020     return sql;
1021 }
1022 
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1023 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1024 {
1025     int errCode = E_OK;
1026     std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1027         std::to_string(dataKey);
1028     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1029     if (errCode != E_OK) {
1030         LOGE("[RDBExecutor] remove data failed %d", errCode);
1031         return errCode;
1032     }
1033     std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1034         std::to_string(dataKey);
1035     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1036     if (errCode != E_OK) {
1037         LOGE("[RDBExecutor] remove log failed %d", errCode);
1038     }
1039     return errCode;
1040 }
1041 
GetLocalDataKey(size_t index,const DownloadData & downloadData)1042 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1043     const DownloadData &downloadData)
1044 {
1045     if (index >= downloadData.existDataKey.size()) {
1046         LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1047         return -1; // -1 means not exist
1048     }
1049     return downloadData.existDataKey[index];
1050 }
1051 
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1052 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1053     sqlite3_stmt *&stmt, int &fillGidCount)
1054 {
1055     int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1056     if (errCode != E_OK) {
1057         return errCode;
1058     }
1059     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1060     if (errCode != E_OK) {
1061         return errCode;
1062     }
1063     errCode = SQLiteUtils::StepWithRetry(stmt, false);
1064     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1065         errCode = E_OK;
1066         fillGidCount++;
1067         SQLiteUtils::ResetStatement(stmt, false, errCode);
1068     } else {
1069         LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1070     }
1071     return errCode;
1072 }
1073 
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1074 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1075     const TableInfo &tableInfo, TableSyncType syncType)
1076 {
1077     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1078     return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1079 }
1080 
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1081 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1082     const RelationalSchemaObject &localSchema)
1083 {
1084     std::vector<int64_t> dataKeys;
1085     std::string logTableName = DBCommon::GetLogTableName(tableName);
1086     int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1087     if (errCode != E_OK) {
1088         LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1089         return errCode;
1090     }
1091     std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1092     errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1093     if (errCode != E_OK) {
1094         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1095     }
1096     return errCode;
1097 }
1098 
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1099 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1100     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1101 {
1102     int errCode = E_OK;
1103     for (const auto &fieldInfo : fieldInfos) {
1104         if (fieldInfo.IsAssetType()) {
1105             Assets assets;
1106             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1107             if (errCode != E_OK) {
1108                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1109                 return errCode;
1110             }
1111             errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1112             if (errCode != E_OK) {
1113                 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1114                 return errCode;
1115             }
1116         } else if (fieldInfo.IsAssetsType()) {
1117             errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1118             if (errCode != E_OK) {
1119                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1120                 return errCode;
1121             }
1122         }
1123     }
1124     return errCode;
1125 }
1126 
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1127 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1128     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1129 {
1130     if (assets.empty()) { // LCOV_EXCL_BR_LINE
1131         return E_OK;
1132     }
1133     int errCode = E_OK;
1134     int ret = E_OK;
1135     sqlite3_stmt *stmt = nullptr;
1136     size_t index = 0;
1137     for (const auto &rowId : dataKeys) {
1138         if (rowId == -1) { // -1 means data is deleted
1139             continue;
1140         }
1141         if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1142             index++;
1143             continue;
1144         }
1145         std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1146             std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1147         errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1148         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1149             LOGE("Get statement failed, %d", errCode);
1150             return errCode;
1151         }
1152         assets[index].assetId = "";
1153         assets[index].status &= ~AssetStatus::UPLOADING;
1154         errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1155         index++;
1156         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1157             LOGE("Bind asset to blob statement failed, %d", errCode);
1158             goto END;
1159         }
1160         errCode = SQLiteUtils::StepWithRetry(stmt);
1161         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1162             errCode = E_OK;
1163         } else {
1164             LOGE("Step statement failed, %d", errCode);
1165             goto END;
1166         }
1167         SQLiteUtils::ResetStatement(stmt, true, ret);
1168     }
1169     return errCode != E_OK ? errCode : ret;
1170 END:
1171     SQLiteUtils::ResetStatement(stmt, true, ret);
1172     return errCode != E_OK ? errCode : ret;
1173 }
1174 
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1175 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1176     const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1177 {
1178     int errCode = E_OK;
1179     int ret = E_OK;
1180     sqlite3_stmt *selectStmt = nullptr;
1181     for (const auto &rowId : dataKeys) {
1182         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1183             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1184         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1185         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1186             LOGE("Get select assets statement failed, %d.", errCode);
1187             goto END;
1188         }
1189         Assets assets;
1190         errCode = GetAssetsByRowId(selectStmt, assets);
1191         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1192             LOGE("Get assets by rowId failed, %d.", errCode);
1193             goto END;
1194         }
1195         SQLiteUtils::ResetStatement(selectStmt, true, ret);
1196         if (assets.empty()) { // LCOV_EXCL_BR_LINE
1197             continue;
1198         }
1199         for (auto &asset : assets) {
1200             asset.assetId = "";
1201             asset.status &= ~AssetStatus::UPLOADING;
1202         }
1203         std::vector<uint8_t> assetsValue;
1204         errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1205         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1206             LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1207             return errCode;
1208         }
1209         errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1210         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1211             LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1212             return errCode;
1213         }
1214     }
1215     return errCode != E_OK ? errCode : ret;
1216 END:
1217     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1218     return errCode != E_OK ? errCode : ret;
1219 }
1220 
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1221 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1222     const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1223 {
1224     std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1225         std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1226     sqlite3_stmt *stmt = nullptr;
1227     int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1228     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1229         LOGE("Get statement failed, %d", errCode);
1230         SQLiteUtils::ResetStatement(stmt, true, errCode);
1231         return errCode;
1232     }
1233     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1234     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1235         SQLiteUtils::ResetStatement(stmt, true, errCode);
1236         return errCode;
1237     }
1238     errCode = SQLiteUtils::StepWithRetry(stmt);
1239     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1240         errCode = E_OK;
1241     }
1242     SQLiteUtils::ResetStatement(stmt, true, errCode);
1243     return errCode;
1244 }
1245 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1246 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1247     const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1248 {
1249     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1250     auto &[errCode, status] = res;
1251     std::vector<Field> assetFields;
1252     std::string sql = "SELECT";
1253     for (const auto &field: tableSchema.fields) {
1254         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1255             assetFields.emplace_back(field);
1256             sql += " b." + field.colName + ",";
1257         }
1258     }
1259     if (assetFields.empty()) {
1260         return { -E_NOT_FOUND, status };
1261     }
1262     sql += "a.cloud_gid, a.status ";
1263     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1264         (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR  a.hash_key = ?);");
1265     sqlite3_stmt *stmt = nullptr;
1266     errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1267     if (errCode != E_OK) {
1268         return res;
1269     }
1270     errCode = SQLiteUtils::StepWithRetry(stmt);
1271     int index = 0;
1272     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1273         for (const auto &field: assetFields) {
1274             Type cloudValue;
1275             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1276             if (errCode != E_OK) {
1277                 break;
1278             }
1279             errCode = PutVBucketByType(assets, field, cloudValue);
1280             if (errCode != E_OK) {
1281                 break;
1282             }
1283         }
1284         std::string curGid;
1285         errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1286         if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1287             // Gid is different, there may be duplicate primary keys in the cloud
1288             errCode = -E_CLOUD_GID_MISMATCH;
1289         }
1290         status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1291     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1292         errCode = -E_NOT_FOUND;
1293     } else {
1294         LOGE("step get asset stmt failed %d.", errCode);
1295     }
1296     SQLiteUtils::ResetStatement(stmt, true, errCode);
1297     return res;
1298 }
1299 
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1300 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1301     const Bytes &hashKey, sqlite3_stmt *&stmt)
1302 {
1303     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1304     if (errCode != E_OK) {
1305         LOGE("Get asset statement failed, %d.", errCode);
1306         return errCode;
1307     }
1308     int index = 1;
1309     if (!gid.empty()) {
1310         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1311         if (errCode != E_OK) {
1312             LOGE("bind gid failed %d.", errCode);
1313             SQLiteUtils::ResetStatement(stmt, true, errCode);
1314             return errCode;
1315         }
1316     }
1317     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1318     if (errCode != E_OK) {
1319         LOGE("bind hash failed %d.", errCode);
1320         SQLiteUtils::ResetStatement(stmt, true, errCode);
1321     }
1322     return errCode;
1323 }
1324 
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1325 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1326     bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1327 {
1328     int errCode = E_OK;
1329     switch (opType) {
1330         case OpType::UPDATE_VERSION: // fallthrough
1331         case OpType::INSERT_VERSION: {
1332             errCode = FillCloudVersionForUpload(opType, data);
1333             break;
1334         }
1335         case OpType::SET_UPLOADING: {
1336             errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1337             if (errCode != E_OK) {
1338                 LOGE("Failed to set uploading for ins data, %d.", errCode);
1339                 return errCode;
1340             }
1341             errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1342             break;
1343         }
1344         case OpType::INSERT: {
1345             errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1346             if (errCode != E_OK) {
1347                 LOGE("Failed to fill cloud log gid, %d.", errCode);
1348                 return errCode;
1349             }
1350             if (fillAsset) {
1351                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1352                 if (errCode != E_OK) {
1353                     LOGE("Failed to fill asset for ins, %d.", errCode);
1354                     return errCode;
1355                 }
1356             }
1357             errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1358             break;
1359         }
1360         case OpType::UPDATE: {
1361             if (fillAsset && !data.updData.assets.empty()) {
1362                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1363                 if (errCode != E_OK) {
1364                     LOGE("Failed to fill asset for upd, %d.", errCode);
1365                     return errCode;
1366                 }
1367             }
1368             errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1369             break;
1370         }
1371         default:
1372             break;
1373     }
1374     return errCode;
1375 }
1376 
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1377 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1378 {
1379     int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1380     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1381         std::vector<uint8_t> blobValue;
1382         errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1383         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1384             LOGE("Get column blob value failed %d.", errCode);
1385             return errCode;
1386         }
1387         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1388         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1389             LOGE("Transfer blob to assets failed %d", errCode);
1390         }
1391         return errCode;
1392     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1393         return E_OK;
1394     } else {
1395         LOGE("Step select statement failed %d.", errCode);
1396         return errCode;
1397     }
1398 }
1399 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1400 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1401 {
1402     assetLoader_ = loader;
1403 }
1404 
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1406     int beginIndex, const std::string &cloudGid)
1407 {
1408     int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1409     if (errCode != E_OK) {
1410         LOGE("Bind cloud gid to statement failed %d.", errCode);
1411         int ret = E_OK;
1412         SQLiteUtils::ResetStatement(stmt, true, ret);
1413         return errCode;
1414     }
1415     errCode = SQLiteUtils::StepWithRetry(stmt);
1416     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1417         errCode = E_OK;
1418     } else {
1419         LOGE("Fill cloud asset failed: %d.", errCode);
1420     }
1421     int ret = E_OK;
1422     SQLiteUtils::ResetStatement(stmt, true, ret);
1423     return errCode != E_OK ? errCode : ret;
1424 }
1425 
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1426 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1427     const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1428 {
1429     if (assetLoader_ == nullptr) {
1430         LOGE("assetLoader may be not set.");
1431         return -E_NOT_SET;
1432     }
1433     std::vector<Asset> toDeleteAssets;
1434     CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1435     if (toDeleteAssets.empty()) {
1436         return E_OK;
1437     }
1438     DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1439     if (ret != OK) {
1440         LOGE("remove local assets failed %d.", ret);
1441         return -E_REMOVE_ASSETS_FAILED;
1442     }
1443     return E_OK;
1444 }
1445 
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1446 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1447     const VBucket &assets, sqlite3_stmt *&statement)
1448 {
1449     std::string sql = "UPDATE '" + tableName + "' SET ";
1450     for (const auto &item: assets) {
1451         sql += item.first + " = ?,";
1452     }
1453     sql.pop_back();
1454     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1455     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1456     if (errCode != E_OK) {
1457         return errCode;
1458     }
1459     int bindIndex = 1;
1460     for (const auto &item: assets) {
1461         Field field = {
1462             .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1463         };
1464         errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1465         if (errCode != E_OK) {
1466             return errCode;
1467         }
1468     }
1469     return errCode;
1470 }
1471 
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1472 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1473     const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1474 {
1475     if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1476         opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1477         return E_OK;
1478     }
1479     if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1480         // this is shared table, not need to update asset id.
1481         return E_OK;
1482     }
1483     int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1484     if (errCode != E_OK) {
1485         LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1486     }
1487     return errCode;
1488 }
1489 
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1490 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1491     Asset &asset)
1492 {
1493     for (const auto &[col, value] : vBucket) {
1494         if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1495             asset = std::get<Asset>(value);
1496         }
1497     }
1498 }
1499 
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1500 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1501     Assets &assets)
1502 {
1503     for (const auto &[col, value] : vBucket) {
1504         if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1505             assets = std::get<Assets>(value);
1506         }
1507     }
1508 }
1509 
UpdateLocalAssetsIdInner(const Assets & cloudAssets,Assets & assets)1510 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsIdInner(const Assets &cloudAssets, Assets &assets)
1511 {
1512     for (const auto &cloudAsset : cloudAssets) {
1513         for (auto &asset : assets) {
1514             if (asset.name == cloudAsset.name) {
1515                 asset.assetId = cloudAsset.assetId;
1516             }
1517         }
1518     }
1519 }
1520 
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1521 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1522     sqlite3_stmt *&stmt)
1523 {
1524     std::vector<uint8_t> blobValue;
1525     int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1526     if (errCode != E_OK) {
1527         LOGE("Transfer asset to blob failed, %d.", errCode);
1528         return errCode;
1529     }
1530     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1531     if (errCode != E_OK) {
1532         LOGE("Bind asset blob to statement failed, %d.", errCode);
1533     }
1534     return errCode;
1535 }
1536 
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1537 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1538     sqlite3_stmt *&stmt)
1539 {
1540     std::vector<uint8_t> blobValue;
1541     int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1542     if (errCode != E_OK) {
1543         LOGE("Transfer asset to blob failed, %d.", errCode);
1544         return errCode;
1545     }
1546     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1547     if (errCode != E_OK) {
1548         LOGE("Bind asset blob to statement failed, %d.", errCode);
1549     }
1550     return errCode;
1551 }
1552 
GetAssetOnTableInner(sqlite3_stmt * & stmt,Asset & asset)1553 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTableInner(sqlite3_stmt *&stmt, Asset &asset)
1554 {
1555     int errCode = SQLiteUtils::StepWithRetry(stmt);
1556     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1557         std::vector<uint8_t> blobValue;
1558         errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1559         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1560             LOGE("[RDBExecutor][GetAssetOnTableInner] Get column blob value failed, %d.", errCode);
1561             return errCode;
1562         }
1563         errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
1564         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1565             LOGE("[RDBExecutor] Transfer blob to asset failed, %d.", errCode);
1566         }
1567     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1568         errCode = E_OK;
1569     } else {
1570         LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1571     }
1572     return errCode;
1573 }
1574 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Asset & asset)1575 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
1576     const std::string &fieldName, const int64_t dataKey, Asset &asset)
1577 {
1578     sqlite3_stmt *selectStmt = nullptr;
1579     std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
1580         "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1581     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
1582     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1583         LOGE("Get select asset statement failed, %d.", errCode);
1584         return errCode;
1585     }
1586     errCode = GetAssetOnTableInner(selectStmt, asset);
1587     int ret = E_OK;
1588     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1589     return errCode != E_OK ? errCode : ret;
1590 }
1591 
GetAssetsOnTableInner(sqlite3_stmt * & stmt,Assets & assets)1592 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTableInner(sqlite3_stmt *&stmt, Assets &assets)
1593 {
1594     int errCode = SQLiteUtils::StepWithRetry(stmt);
1595     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1596         std::vector<uint8_t> blobValue;
1597         errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1598         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1599             LOGE("[RDBExecutor][GetAssetsOnTableInner] Get column blob value failed, %d.", errCode);
1600             return errCode;
1601         }
1602         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1603         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1604             LOGE("[RDBExecutor] Transfer blob to assets failed, %d.", errCode);
1605         }
1606     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1607         errCode = E_OK;
1608     } else {
1609         LOGE("[RDBExecutor] Step failed when get assets from table, errCode = %d.", errCode);
1610     }
1611     return errCode;
1612 }
1613 
GetAssetsOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Assets & assets)1614 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTable(const std::string &tableName,
1615     const std::string &fieldName, const int64_t dataKey, Assets &assets)
1616 {
1617     sqlite3_stmt *selectStmt = nullptr;
1618     std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1619         "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1620     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1621     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1622         LOGE("Get select assets statement failed, %d.", errCode);
1623         return errCode;
1624     }
1625     errCode = GetAssetsOnTableInner(selectStmt, assets);
1626     int ret = E_OK;
1627     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1628     return errCode != E_OK ? errCode : ret;
1629 }
1630 
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1631 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1632     const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1633 {
1634     int assetIndex = 0;
1635     int assetsIndex = 0;
1636     for (const auto &field : tableSchema.fields) {
1637         if (field.type == TYPE_INDEX<Asset>) {
1638             if (assetOfOneRecord[assetIndex].name.empty()) {
1639                 continue;
1640             }
1641             int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1642             if (errCode != E_OK) {
1643                 LOGE("Bind asset to blob statement failed, %d.", errCode);
1644                 return errCode;
1645             }
1646             assetIndex++;
1647         } else if (field.type == TYPE_INDEX<Assets>) {
1648             if (assetsOfOneRecord[assetsIndex].empty()) {
1649                 continue;
1650             }
1651             int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1652             if (errCode != E_OK) {
1653                 LOGE("Bind assets to blob statement failed, %d.", errCode);
1654                 return errCode;
1655             }
1656             assetsIndex++;
1657         }
1658     }
1659     return E_OK;
1660 }
1661 
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1662 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1663     const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1664 {
1665     int errCode = E_OK;
1666     int ret = E_OK;
1667     sqlite3_stmt *stmt = nullptr;
1668     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1669     if (errCode != E_OK) {
1670         LOGE("Get update asset statement failed, %d.", errCode);
1671         return errCode;
1672     }
1673     errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1674     if (errCode != E_OK) {
1675         LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1676         SQLiteUtils::ResetStatement(stmt, true, ret);
1677         return errCode != E_OK ? errCode : ret;
1678     }
1679     errCode = SQLiteUtils::StepWithRetry(stmt);
1680     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1681         errCode = E_OK;
1682     } else {
1683         LOGE("Step statement failed, %d", errCode);
1684     }
1685     SQLiteUtils::ResetStatement(stmt, true, ret);
1686     return errCode != E_OK ? errCode : ret;
1687 }
1688 
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1689 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1690     const VBucket &vBucket)
1691 {
1692     int errCode = E_OK;
1693     std::vector<Asset> assetOfOneRecord;
1694     std::vector<Assets> assetsOfOneRecord;
1695     std::string updateAssetIdSql = "UPDATE " + tableSchema.name  + " SET";
1696     for (const auto &field : tableSchema.fields) {
1697         if (field.type == TYPE_INDEX<Asset>) {
1698             Asset asset;
1699             UpdateLocalAssetId(vBucket, field.colName, asset);
1700             assetOfOneRecord.push_back(asset);
1701             if (!asset.name.empty()) {
1702                 updateAssetIdSql += " " + field.colName + " = ?,";
1703             }
1704         }
1705         if (field.type == TYPE_INDEX<Assets>) {
1706             Assets assets;
1707             UpdateLocalAssetsId(vBucket, field.colName, assets);
1708             assetsOfOneRecord.push_back(assets);
1709             if (!assets.empty()) {
1710                 updateAssetIdSql += " " + field.colName + " = ?,";
1711             }
1712         }
1713     }
1714     if (updateAssetIdSql == "UPDATE " + tableSchema.name  + " SET") {
1715         return E_OK;
1716     }
1717     updateAssetIdSql.pop_back();
1718     updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1719     errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1720     if (errCode != E_OK) {
1721         LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1722     }
1723     return errCode;
1724 }
1725 
SetPutDataMode(PutDataMode mode)1726 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1727 {
1728     putDataMode_ = mode;
1729 }
1730 
SetMarkFlagOption(MarkFlagOption option)1731 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1732 {
1733     markFlagOption_ = option;
1734 }
1735 
GetDataFlag()1736 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1737 {
1738     if (putDataMode_ != PutDataMode::USER) {
1739         return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1740             static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY);
1741     }
1742     uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1743     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1744         flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1745     }
1746     flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY);
1747     return static_cast<int64_t>(flag);
1748 }
1749 
GetUpdateDataFlagSql()1750 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql()
1751 {
1752     if (putDataMode_ == PutDataMode::SYNC) {
1753         return UPDATE_FLAG_CLOUD;
1754     }
1755     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1756         return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1757     }
1758     return UPDATE_FLAG_CLOUD;
1759 }
1760 
GetDev()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1762 {
1763     return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1764 }
1765 
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1766 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1767     const TableSchema &tableSchema)
1768 {
1769     std::set<std::string> useFields;
1770     std::vector<Field> fields;
1771     if (putDataMode_ == PutDataMode::SYNC) {
1772         for (const auto &field : tableSchema.fields) {
1773             useFields.insert(field.colName);
1774         }
1775         fields = tableSchema.fields;
1776     } else {
1777         for (const auto &field : vBucket) {
1778             if (field.first.empty() || field.first[0] == '#') {
1779                 continue;
1780             }
1781             useFields.insert(field.first);
1782         }
1783         for (const auto &field : tableSchema.fields) {
1784             if (useFields.find(field.colName) == useFields.end()) {
1785                 continue;
1786             }
1787             fields.push_back(field);
1788         }
1789     }
1790     return fields;
1791 }
1792 
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1793 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1794     const LogInfo &logInfo)
1795 {
1796     bool useHashKey = false;
1797     if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1798         if (logInfo.hashKey.empty()) {
1799             LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1800             return -E_INVALID_ARGS;
1801         }
1802         useHashKey = true;
1803     }
1804     sqlite3_stmt *stmt = nullptr;
1805     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1806     if (errCode != E_OK) {
1807         LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1808         return errCode;
1809     }
1810     int ret = E_OK;
1811     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1812     if (errCode != E_OK) {
1813         LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1814         SQLiteUtils::ResetStatement(stmt, true, ret);
1815         return errCode;
1816     }
1817     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1818     if (errCode != E_OK) {
1819         LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1820         SQLiteUtils::ResetStatement(stmt, true, ret);
1821         return errCode;
1822     }
1823     if (useHashKey) {
1824         errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1825         if (errCode != E_OK) {
1826             LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1827             SQLiteUtils::ResetStatement(stmt, true, ret);
1828             return errCode;
1829         }
1830     }
1831     errCode = SQLiteUtils::StepWithRetry(stmt);
1832     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1833         errCode = E_OK;
1834     } else {
1835         LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1836     }
1837     SQLiteUtils::ResetStatement(stmt, true, ret);
1838     return errCode == E_OK ? ret : errCode;
1839 }
1840 
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp)1841 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1842     const Key &hashKey, Timestamp timestamp)
1843 {
1844     sqlite3_stmt *stmt = nullptr;
1845     int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName),
1846         stmt);
1847     int index = 1;
1848     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1849     if (errCode != E_OK) {
1850         SQLiteUtils::ResetStatement(stmt, true, errCode);
1851         LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1852         return;
1853     }
1854     errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1855     if (errCode != E_OK) {
1856         SQLiteUtils::ResetStatement(stmt, true, errCode);
1857         LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1858         return;
1859     }
1860     errCode = SQLiteUtils::StepWithRetry(stmt);
1861     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1862         errCode = E_OK;
1863     } else {
1864         LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1865     }
1866     SQLiteUtils::ResetStatement(stmt, true, errCode);
1867 }
1868 
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data)1869 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1870     std::vector<VBucket> &data)
1871 {
1872     std::string sql = "SELECT ";
1873     std::vector<Field> pkFields;
1874     for (const auto &field : table.fields) {
1875         if (!field.primary) {
1876             continue;
1877         }
1878         sql += "b." + field.colName + ",";
1879         pkFields.push_back(field);
1880     }
1881     if (pkFields.empty()) {
1882         // ignore no pk table
1883         return E_OK;
1884     }
1885     sql.pop_back();
1886     sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1887     sqlite3_stmt *stmt = nullptr;
1888     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1889     if (errCode != E_OK) {
1890         LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1891         return errCode;
1892     }
1893     do {
1894         errCode = SQLiteUtils::StepWithRetry(stmt);
1895         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1896             VBucket pkData;
1897             errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1898             if (errCode != E_OK) {
1899                 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1900                 break;
1901             }
1902             data.push_back(pkData);
1903         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1904             errCode = E_OK;
1905             break;
1906         } else {
1907             LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1908             break;
1909         }
1910     } while (errCode == E_OK);
1911     int ret = E_OK;
1912     SQLiteUtils::ResetStatement(stmt, true, ret);
1913     return errCode == E_OK ? ret : errCode;
1914 }
1915 
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1916 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1917     int startIndex, VBucket &record)
1918 {
1919     int errCode = E_OK;
1920     for (const auto &field : fields) {
1921         Type cloudValue;
1922         errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1923         if (errCode != E_OK) {
1924             break;
1925         }
1926         errCode = PutVBucketByType(record, field, cloudValue);
1927         if (errCode != E_OK) {
1928             break;
1929         }
1930         startIndex++;
1931     }
1932     return errCode;
1933 }
1934 
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)1935 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1936     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
1937 {
1938     int errCode = E_OK;
1939     std::string version;
1940     if (putDataMode_ == PutDataMode::SYNC) {
1941         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1942         if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1943             LOGE("get version for insert log statement failed, %d", errCode);
1944             return -E_CLOUD_ERROR;
1945         }
1946     }
1947     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 10, version); // 10 is version
1948     if (errCode != E_OK) {
1949         LOGE("Bind version to insert log statement failed, %d", errCode);
1950         return errCode;
1951     }
1952 
1953     std::string shareUri;
1954     if (putDataMode_ == PutDataMode::SYNC) {
1955         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1956             vBucket, shareUri);
1957         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1958             LOGE("get shareUri for insert log statement failed, %d", errCode);
1959             return -E_CLOUD_ERROR;
1960         }
1961     }
1962 
1963     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 11, shareUri); // 11 is sharing_resource
1964     if (errCode != E_OK) {
1965         LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1966     }
1967     return errCode;
1968 }
1969 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1970 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1971     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1972 {
1973     if (downloadData.data.size() != downloadData.opType.size()) {
1974         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1975         return -E_CLOUD_ERROR;
1976     }
1977     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1978         " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1979     sqlite3_stmt *stmt = nullptr;
1980     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1981     if (errCode != E_OK) {
1982         LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1983         return errCode;
1984     }
1985     int ret = E_OK;
1986     int index = 0;
1987     for (const auto &data: downloadData.data) {
1988         SQLiteUtils::ResetStatement(stmt, false, ret);
1989         OpType opType = downloadData.opType[index++];
1990         if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1991             continue;
1992         }
1993         errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1994         if (errCode != E_OK) {
1995             break;
1996         }
1997     }
1998     SQLiteUtils::ResetStatement(stmt, true, ret);
1999     return errCode == E_OK ? ret : errCode;
2000 }
2001 
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)2002 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
2003     const CloudSyncBatch &batchData)
2004 {
2005     if (batchData.extend.empty()) {
2006         return E_OK;
2007     }
2008     if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
2009         LOGE("invalid sync data for filling version.");
2010         return -E_INVALID_ARGS;
2011     }
2012     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
2013         "' SET version = ? WHERE hash_key = ? ";
2014     sqlite3_stmt *stmt = nullptr;
2015     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2016     if (errCode != E_OK) {
2017         return errCode;
2018     }
2019     int ret = E_OK;
2020     for (size_t i = 0; i < batchData.extend.size(); ++i) {
2021         errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
2022         if (errCode != E_OK) {
2023             LOGE("bind update version stmt failed.");
2024             SQLiteUtils::ResetStatement(stmt, true, ret);
2025             return errCode;
2026         }
2027     }
2028     SQLiteUtils::ResetStatement(stmt, true, ret);
2029     return ret;
2030 }
2031 
QueryCount(const std::string & tableName,int64_t & count)2032 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
2033 {
2034     return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
2035 }
2036 
CheckInventoryData(const std::string & tableName)2037 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
2038 {
2039     int64_t dataCount = 0;
2040     int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
2041     if (errCode != E_OK) {
2042         LOGE("Query count failed.", errCode);
2043         return errCode;
2044     }
2045     return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
2046 }
2047 } // namespace DistributedDB
2048 #endif