• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27 
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32 
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34 
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36     std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38     if (assetFields.empty() && pkSet.empty()) {
39         return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40     }
41     std::string gid;
42     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44         LOGE("Get cloud gid fail when query log table.");
45         return errCode;
46     }
47 
48     if (pkSet.empty() && gid.empty()) {
49         LOGE("query log table failed because of both primary key and gid are empty.");
50         return -E_CLOUD_ERROR;
51     }
52     std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53         " a.cloud_gid, a.sharing_resource, a.status, a.version";
54     for (const auto &field : assetFields) {
55         sql += ", b." + field.colName;
56     }
57     for (const auto &pk : pkSet) {
58         sql += ", b." + pk;
59     }
60     sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61     if (!gid.empty()) {
62         sql += " a.cloud_gid = ? or ";
63     }
64     sql += "a.hash_key = ?";
65     querySql = sql;
66     return E_OK;
67 }
68 
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70     const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72     std::string sql = "UPDATE " + tableName + " SET ";
73     for (const auto &field: fields) {
74         sql += field.colName + " = ?,";
75     }
76     sql.pop_back();
77     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78     sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79     sqlite3_stmt *stmt = nullptr;
80     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81     if (errCode != E_OK) {
82         LOGE("Get fill asset statement failed, %d.", errCode);
83         return errCode;
84     }
85     int ret = E_OK;
86     for (size_t i = 0; i < fields.size(); ++i) {
87         errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
88         if (errCode != E_OK) {
89             SQLiteUtils::ResetStatement(stmt, true, ret);
90             return errCode;
91         }
92     }
93     statement = stmt;
94     return errCode;
95 }
96 
CleanDownloadingFlagByGid(const std::string & tableName,const std::string & gid,VBucket dbAssets)97 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlagByGid(const std::string &tableName,
98     const std::string &gid, VBucket dbAssets)
99 {
100     if (CloudStorageUtils::IsAssetsContainDownloadRecord(dbAssets)) {
101         return E_OK;
102     }
103     std::string sql;
104     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000) where cloud_gid = ?;";
105     sqlite3_stmt *stmt = nullptr;
106     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
107     if (errCode != E_OK) {
108         LOGE("[RDBExecutor]Get stmt failed clean downloading flag:%d, tableName:%s, length:%zu",
109             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
110         return errCode;
111     }
112     int ret = E_OK;
113     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
114     if (errCode != E_OK) {
115         LOGE("[RDBExecutor]bind gid failed when clean downloading flag:%d, tableName:%s, length:%zu",
116             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
117         SQLiteUtils::ResetStatement(stmt, true, ret);
118         return errCode;
119     }
120     errCode = SQLiteUtils::StepWithRetry(stmt);
121     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
122         errCode = E_OK;
123     } else {
124         LOGE("[RDBExecutor]clean downloading flag failed:%d, tableName:%s, length:%zu",
125             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
126     }
127     SQLiteUtils::ResetStatement(stmt, true, ret);
128     if (ret != E_OK) {
129         LOGE("[RDBExecutor]reset stmt when clean downloading flag:%d, tableName:%s, length:%zu",
130             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
131     }
132     return errCode != E_OK ? errCode : ret;
133 }
134 
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess,uint64_t & currCursor)135 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
136     VBucket &vBucket, bool isDownloadSuccess, uint64_t &currCursor)
137 {
138     std::string cloudGid;
139     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
140     if (errCode != E_OK) {
141         LOGE("Miss gid when fill Asset.");
142         return errCode;
143     }
144     std::vector<Field> assetsField;
145     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
146     if (errCode != E_OK) {
147         LOGE("No assets need to be filled.");
148         return errCode;
149     }
150     CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
151 
152     Bytes hashKey;
153     (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
154     VBucket dbAssets;
155     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
156     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
157         LOGE("get assets by gid or hashkey failed %d.", errCode);
158         return errCode;
159     }
160     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
161         AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
162 
163     if (isDownloadSuccess) {
164         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
165             CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
166         errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid, currCursor);
167         if (errCode != E_OK) {
168             return errCode;
169         }
170     } else {
171         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
172             CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
173     }
174     sqlite3_stmt *stmt = nullptr;
175     errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
176     if (errCode != E_OK) {
177         return errCode;
178     }
179     errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
180     if (errCode != E_OK) {
181         return errCode;
182     }
183     errCode = CleanDownloadingFlagByGid(tableSchema.name, cloudGid, dbAssets);
184     int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
185     return errCode == E_OK ? ret : errCode;
186 }
187 
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid,uint64_t & currCursor)188 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
189     const std::string &gid, uint64_t &currCursor)
190 {
191     uint64_t cursor = DBConstant::INVALID_CURSOR;
192     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
193     if (errCode != E_OK) {
194         LOGE("Get cursor of table[%s length[%u]] failed when increase cursor: %d, gid[%s]",
195             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode, gid.c_str());
196         return errCode;
197     }
198     cursor++;
199     std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
200     sql += " SET cursor = ? where cloud_gid = ?;";
201     sqlite3_stmt *statement = nullptr;
202     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
203     if (errCode != E_OK) {
204         LOGE("get update asset data cursor stmt failed %d.", errCode);
205         return errCode;
206     }
207     ResFinalizer finalizer([statement]() {
208         sqlite3_stmt *statementInner = statement;
209         int ret = E_OK;
210         SQLiteUtils::ResetStatement(statementInner, true, ret);
211         if (ret != E_OK) {
212             LOGW("Reset stmt failed %d when increase cursor on asset data", ret);
213         }
214     });
215     int index = 1;
216     errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
217     if (errCode != E_OK) {
218         LOGE("bind cursor data stmt failed %d.", errCode);
219         return errCode;
220     }
221     errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
222     if (errCode != E_OK) {
223         LOGE("bind cursor gid data stmt failed %d.", errCode);
224         return errCode;
225     }
226     errCode = SQLiteUtils::StepWithRetry(statement, false);
227     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
228         LOGE("Fill upload asset failed:%d.", errCode);
229         return errCode;
230     }
231     currCursor = cursor;
232     errCode = SetCursor(tableName, cursor);
233     if (errCode != E_OK) {
234         LOGE("Upgrade cursor failed after asset download success %d.", errCode);
235     }
236     return errCode;
237 }
238 
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)239 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
240     const CloudSyncBatch &data)
241 {
242     int errCode = E_OK;
243     if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
244         return errCode;
245     }
246     errCode = SetLogTriggerStatus(false);
247     if (errCode != E_OK) {
248         LOGE("Fail to set log trigger off, %d.", errCode);
249         return errCode;
250     }
251     sqlite3_stmt *stmt = nullptr;
252     for (size_t i = 0; i < data.assets.size(); ++i) {
253         if (data.assets.at(i).empty()) {
254             continue;
255         }
256         if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
257             DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
258             continue;
259         }
260         errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
261         if (errCode != E_OK) {
262             if (errCode == -E_NOT_FOUND) {
263                 errCode = E_OK;
264                 continue;
265             }
266             break;
267         }
268         errCode = SQLiteUtils::StepWithRetry(stmt, false);
269         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
270             LOGE("Fill upload asset failed:%d.", errCode);
271             break;
272         }
273         errCode = E_OK;
274         SQLiteUtils::ResetStatement(stmt, true, errCode);
275         stmt = nullptr;
276         if (errCode != E_OK) {
277             break;
278         }
279     }
280     int ret = E_OK;
281     SQLiteUtils::ResetStatement(stmt, true, ret);
282     int endCode = SetLogTriggerStatus(true);
283     if (endCode != E_OK) {
284         LOGE("Fail to set log trigger off, %d.", endCode);
285         return endCode;
286     }
287     return errCode != E_OK ? errCode : ret;
288 }
289 
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)290 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
291 {
292     switch (opType) {
293         case OpType::UPDATE_VERSION:
294             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
295         case OpType::INSERT_VERSION:
296             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
297         default:
298             LOGE("Fill version with unknown type %d", static_cast<int>(opType));
299             return -E_INVALID_ARGS;
300     }
301 }
302 
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)303 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
304     sqlite3_stmt *&stmt)
305 {
306     int errCode = E_OK;
307     std::string version;
308     if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
309         vBucket, version) != E_OK) {
310         LOGW("get version from vBucket failed.");
311     }
312     if (hashKey.empty()) {
313         LOGE("hash key is empty when update version.");
314         return -E_CLOUD_ERROR;
315     }
316     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
317     if (errCode != E_OK) {
318         return errCode;
319     }
320     errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
321     if (errCode != E_OK) {
322         return errCode;
323     }
324     errCode = SQLiteUtils::StepWithRetry(stmt, false);
325     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
326         errCode = E_OK;
327         SQLiteUtils::ResetStatement(stmt, false, errCode);
328     } else {
329         LOGE("step version stmt failed: %d.", errCode);
330     }
331     return errCode;
332 }
333 
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)334 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
335     const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
336 {
337     VBucket vBucket = data.assets.at(index);
338     VBucket dbAssets;
339     std::string cloudGid;
340     int errCode;
341     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
342     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
343     if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
344         return errCode;
345     }
346     AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
347         AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
348     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
349         action);
350     if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
351         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
352             CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
353     } else {
354         if (DBCommon::IsRecordError(data.extend.at(index)) || DBCommon::IsRecordAssetsMissing(data.extend.at(index))) {
355             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
356                 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
357         } else {
358             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
359                 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
360         }
361     }
362 
363     errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
364     if (errCode != E_OK) {
365         LOGE("get and bind asset failed %d.", errCode);
366         return errCode;
367     }
368     int64_t rowid = data.rowid[index];
369     return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
370 }
371 
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)372 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
373     TableInfo &tableInfo)
374 {
375     return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
376 }
377 
CreateTrackerTable(const TrackerTable & trackerTable,const TableInfo & table,bool checkData)378 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable,
379     const TableInfo &table, bool checkData)
380 {
381     std::unique_ptr<SqliteLogTableManager> tableManager = std::make_unique<SimpleTrackerLogTableManager>();
382     if (trackerTable.IsEmpty()) {
383         // drop trigger
384         return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
385     }
386 
387     // create log table
388     int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
389     if (errCode != E_OK) {
390         return errCode;
391     }
392     // init cursor
393     errCode = InitCursorToMeta(table.GetTableName());
394     if (errCode != E_OK) {
395         return errCode;
396     }
397     errCode = GeneLogInfoForExistedData(dbHandle_, "", table, tableManager, true);
398     if (errCode != E_OK) {
399         LOGE("general tracker log info for existed data failed %d.", errCode);
400         return errCode;
401     }
402     errCode = SetLogTriggerStatus(true);
403     if (errCode != E_OK) {
404         return errCode;
405     }
406     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
407     if (errCode != E_OK) {
408         return errCode;
409     }
410     if (checkData) {
411         return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
412     }
413     return E_OK;
414 }
415 
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)416 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
417 {
418     if (!schema.ToSchemaString().empty()) {
419         return E_OK;
420     }
421     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY,
422         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY));
423     Value schemaVal;
424     int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
425     if (errCode != E_OK) {
426         return errCode;
427     }
428     if (schemaVal.empty()) {
429         return -E_NOT_FOUND;
430     }
431     std::string schemaStr;
432     DBCommon::VectorToString(schemaVal, schemaStr);
433     errCode = schema.ParseFromTrackerSchemaString(schemaStr);
434     if (errCode != E_OK) {
435         LOGE("Parse from tracker schema string err.");
436     }
437     return errCode;
438 }
439 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)440 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
441 {
442     sqlite3_stmt *statement = nullptr;
443     int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
444     if (errCode != E_OK) {
445         LOGE("Execute sql failed when prepare stmt.");
446         return errCode;
447     }
448     if (condition.readOnly && !SQLiteUtils::IsStmtReadOnly(statement)) {
449         LOGW("[ExecuteSql] The condition is read-only, but SQL is not read-only");
450     }
451     size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
452     if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
453         LOGE("Sql bind args mismatch.");
454         SQLiteUtils::ResetStatement(statement, true, errCode);
455         return -E_INVALID_ARGS;
456     }
457     int ret = E_OK;
458     for (size_t i = 0; i < condition.bindArgs.size(); i++) {
459         Type type = condition.bindArgs[i];
460         errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
461         if (errCode != E_OK) {
462             SQLiteUtils::ResetStatement(statement, true, ret);
463             return errCode;
464         }
465     }
466     while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
467         VBucket bucket;
468         errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
469         if (errCode != E_OK) {
470             SQLiteUtils::ResetStatement(statement, true, ret);
471             return errCode;
472         }
473         records.push_back(std::move(bucket));
474     }
475     SQLiteUtils::ResetStatement(statement, true, ret);
476     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
477 }
478 
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)479 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
480     const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
481     std::set<std::string> &clearWaterMarkTables)
482 {
483     bool isRefNotSet = false;
484     std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty, isRefNotSet);
485     for (const auto &table : changeTables) {
486         std::string logTableName = DBCommon::GetLogTableName(table);
487         bool isExists = false;
488         int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
489         if (errCode != E_OK) {
490             LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
491             return errCode;
492         }
493         if (!isExists) { // table maybe dropped after set reference
494             LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
495             continue;
496         }
497 
498         bool isEmpty = true;
499         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
500         if (errCode != E_OK) {
501             LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
502             clearWaterMarkTables.clear();
503             return errCode;
504         }
505         if (!isEmpty) {
506             clearWaterMarkTables.insert(table);
507         }
508     }
509     LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu, changeTables size = %zu, isFirst:%d.",
510         clearWaterMarkTables.size(), changeTables.size(), isRefNotSet);
511     // If reference set for the first time and has no data, or the reference has not changed, return E_OK.
512     return ((isRefNotSet && clearWaterMarkTables.empty()) || changeTables.empty()) ? E_OK : -E_TABLE_REFERENCE_CHANGED;
513 }
514 
UpgradedLogForExistedData(const TableInfo & tableInfo,bool schemaChanged)515 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(const TableInfo &tableInfo, bool schemaChanged)
516 {
517     std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
518     if (schemaChanged) {
519         std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
520             "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
521         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
522         if (ret != E_OK) {
523             LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
524             return ret;
525         }
526     }
527     if (tableInfo.GetTrackerTable().IsEmpty()) {
528         return E_OK;
529     }
530     LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
531     int errCode = SetLogTriggerStatus(false);
532     if (errCode != E_OK) {
533         return errCode;
534     }
535     std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
536     TrackerTable trackerTable = tableInfo.GetTrackerTable();
537     errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
538         [this, &sql]() {
539         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
540         if (ret != E_OK) {
541             LOGE("Upgrade log for extend field failed.");
542         }
543         return ret;
544     });
545     return SetLogTriggerStatus(true);
546 }
547 
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)548 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
549 {
550     int errCode = E_OK;
551     std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
552     for (const auto &sql: dropSql) {
553         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
554         if (errCode != E_OK) {
555             LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
556             return errCode;
557         }
558     }
559     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
560     if (errCode != E_OK) {
561         LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
562         return errCode;
563     }
564     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
565     if (errCode != E_OK) {
566         LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
567         return errCode;
568     }
569     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
570     if (errCode != E_OK) {
571         LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
572     }
573     return errCode;
574 }
575 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)576 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
577     ChangeProperties &changeProperties)
578 {
579     std::string fileName;
580     if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
581         LOGE("get db file name failed.");
582         return -E_INVALID_DB;
583     }
584     SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
585     return E_OK;
586 }
587 
ClearAllTempSyncTrigger()588 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
589 {
590     sqlite3_stmt *stmt = nullptr;
591     static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
592     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
593     if (errCode != E_OK) {
594         LOGE("get clear all temp trigger stmt failed %d.", errCode);
595         return errCode;
596     }
597     int ret = E_OK;
598     while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
599         std::string str;
600         (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
601         if (errCode != E_OK) {
602             SQLiteUtils::ResetStatement(stmt, true, ret);
603             return errCode;
604         }
605         std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
606         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
607         if (errCode != E_OK) {
608             LOGE("drop temp trigger failed %d.", errCode);
609             SQLiteUtils::ResetStatement(stmt, true, ret);
610             return errCode;
611         }
612     }
613     SQLiteUtils::ResetStatement(stmt, true, ret);
614     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
615 }
616 
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)617 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
618     bool isOnlyTrackTable)
619 {
620     return SQLiteRelationalUtils::CleanTrackerData(dbHandle_, tableName, cursor, isOnlyTrackTable);
621 }
622 
CreateSharedTable(const TableSchema & tableSchema)623 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
624 {
625     LOGI("Create shared table[%s length[%u]]", DBCommon::StringMiddleMasking(tableSchema.name).c_str(),
626         tableSchema.name.length());
627     std::map<int32_t, std::string> cloudFieldTypeMap;
628     cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
629     cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
630     cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
631     cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
632     cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
633     cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
634     cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
635     cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
636 
637     std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
638     std::string primaryKey = ", PRIMARY KEY (";
639     createTableSql += CloudDbConstant::CLOUD_OWNER;
640     createTableSql += " TEXT, ";
641     createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
642     createTableSql += " TEXT";
643     primaryKey += CloudDbConstant::CLOUD_OWNER;
644     bool hasPrimaryKey = false;
645     for (const auto &field : tableSchema.fields) {
646         createTableSql += ", " + field.colName + " ";
647         createTableSql += cloudFieldTypeMap[field.type];
648         createTableSql += field.nullable ? "" : " NOT NULL";
649         if (field.primary) {
650             primaryKey += ", " + field.colName;
651             hasPrimaryKey = true;
652         }
653     }
654     if (hasPrimaryKey) {
655         createTableSql += primaryKey + ")";
656     }
657     createTableSql += ");";
658     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
659     if (errCode != E_OK) {
660         LOGE("Create shared table failed, %d.", errCode);
661     }
662     return errCode;
663 }
664 
DeleteTable(const std::vector<std::string> & tableNames)665 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
666 {
667     for (const auto &tableName : tableNames) {
668         std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
669         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
670         if (errCode != E_OK) {
671             LOGE("Delete table failed, %d.", errCode);
672             return errCode;
673         }
674     }
675     return E_OK;
676 }
677 
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)678 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
679     const std::map<std::string, std::vector<Field>> &updateTableNames)
680 {
681     int errCode = E_OK;
682     std::map<int32_t, std::string> fieldTypeMap;
683     fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
684     fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
685     fieldTypeMap[TYPE_INDEX<double>] = "REAL";
686     fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
687     fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
688     fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
689     fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
690     fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
691     for (const auto &table : updateTableNames) {
692         if (table.second.empty()) {
693             continue;
694         }
695         std::string addColumnSql = "";
696         for (const auto &field : table.second) {
697             addColumnSql += "ALTER TABLE " + table.first + " ADD ";
698             addColumnSql += field.colName + " ";
699             addColumnSql += fieldTypeMap[field.type];
700             addColumnSql += field.primary ? " PRIMARY KEY" : "";
701             addColumnSql += field.nullable ? ";" : " NOT NULL;";
702         }
703         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
704         if (errCode != E_OK) {
705             LOGE("Shared table add column failed, %d.", errCode);
706             return errCode;
707         }
708     }
709     return errCode;
710 }
711 
AlterTableName(const std::map<std::string,std::string> & tableNames)712 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
713 {
714     for (const auto &tableName : tableNames) {
715         std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
716         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
717         if (errCode != E_OK) {
718             LOGE("Alter table name failed, %d.", errCode);
719             return errCode;
720         }
721     }
722     return E_OK;
723 }
724 
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)725 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
726     const VBucket &vBucket, std::string &sql)
727 {
728     std::string gidStr;
729     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
730     if (errCode != E_OK) {
731         LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
732         return errCode;
733     }
734 
735     sql += " WHERE ";
736     if (!gidStr.empty()) {
737         sql += "cloud_gid = '" + gidStr + "'";
738     }
739     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
740     if (!pkMap.empty()) {
741         if (!gidStr.empty()) {
742             sql += " OR ";
743         }
744         sql += "(hash_key = ?);";
745     }
746     return E_OK;
747 }
748 
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)749 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
750 {
751     int errCode = E_OK;
752     for (const auto &tableName: tableNameList) {
753         int32_t count = 0;
754         std::string logTableName = DBCommon::GetLogTableName(tableName);
755         (void)GetFlagIsLocalCount(logTableName, count);
756         LOGI("[DoCleanShareTableDataAndLog]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
757             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
758         if (isLogicDelete_) {
759             errCode = SetDataOnShareTableWithLogicDelete(tableName, logTableName);
760         } else {
761             errCode = CleanShareTable(tableName);
762         }
763         if (errCode != E_OK) {
764             LOGE("clean share table failed at table:%s, length:%d, deleteType:%d, errCode:%d.",
765                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), isLogicDelete_ ? 1 : 0, errCode);
766             return errCode;
767         }
768         (void)GetFlagIsLocalCount(logTableName, count);
769         LOGI("[DoCleanShareTableDataAndLog]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
770             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
771     }
772     return errCode;
773 }
774 
CleanShareTable(const std::string & tableName)775 int SQLiteSingleVerRelationalStorageExecutor::CleanShareTable(const std::string &tableName)
776 {
777     int ret = E_OK;
778     int errCode = E_OK;
779     std::string delDataSql = "DELETE FROM '" + tableName + "';";
780     sqlite3_stmt *statement = nullptr;
781     errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
782     if (errCode != E_OK) {
783         LOGE("get clean shared data stmt failed %d.", errCode);
784         return errCode;
785     }
786     errCode = SQLiteUtils::StepWithRetry(statement);
787     SQLiteUtils::ResetStatement(statement, true, ret);
788     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
789         errCode = E_OK;
790     } else {
791         LOGE("clean shared data failed: %d.", errCode);
792         return errCode;
793     }
794     statement = nullptr;
795     std::string delLogSql = "DELETE FROM '" + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log';";
796     errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
797     if (errCode != E_OK) {
798         LOGE("get clean shared log stmt failed %d.", errCode);
799         return errCode;
800     }
801     errCode = SQLiteUtils::StepWithRetry(statement);
802     SQLiteUtils::ResetStatement(statement, true, ret);
803     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
804         errCode = E_OK;
805     } else {
806         LOGE("clean shared log failed: %d.", errCode);
807         return errCode;
808     }
809     // here errCode must be E_OK, just return ret
810     return ret;
811 }
812 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)813 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
814     const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
815     std::map<int64_t, Entries> &referenceGid)
816 {
817     int errCode = E_OK;
818     for (const auto &[targetTable, targetReference] : tableReference) {
819         errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
820         if (errCode != E_OK) {
821             LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
822             return errCode;
823         }
824     }
825     return errCode;
826 }
827 
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)828 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
829     const std::string &targetTable, const CloudSyncBatch &syncBatch,
830     const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
831 {
832     auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
833     if (sourceFields.empty()) {
834         LOGD("[RDBExecutor] source field is empty.");
835         return E_OK;
836     }
837     if (sourceFields.size() != targetFields.size()) {
838         LOGE("[RDBExecutor] reference field size not equal.");
839         return -E_INTERNAL_ERROR;
840     }
841     std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
842     sqlite3_stmt *statement = nullptr;
843     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
844     if (errCode != E_OK) {
845         LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
846         return errCode;
847     }
848     errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
849     int ret = E_OK;
850     SQLiteUtils::ResetStatement(statement, true, ret);
851     return errCode == E_OK ? ret : errCode;
852 }
853 
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)854 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
855     const std::string &targetTable, const std::vector<std::string> &sourceFields,
856     const std::vector<std::string> &targetFields)
857 {
858     // sql like this:
859     // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
860     //   (SELECT parent._rowid_ AS rowid_b FROM parent,
861     //     (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
862     //     WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
863     //   WHERE parent.name = source_a.name ) temp_table
864     // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
865     std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
866     std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
867     std::string sql;
868     sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
869     sql += "(";
870     sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
871            + ", ";
872     sql += "(SELECT " + sourceTable + "._rowid_,";
873     std::set<std::string> sourceFieldSet;
874     for (const auto &item : sourceFields) {
875         sourceFieldSet.insert(item);
876     }
877     for (const auto &sourceField : sourceFieldSet) {
878         sql += sourceField + ",";
879     }
880     sql.pop_back();
881     sql += " FROM " + sourceTable + ", " + logSourceTable;
882     sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
883     sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
884     sql += " WHERE ";
885     for (size_t i = 0u; i < sourceFields.size(); ++i) {
886         if (i != 0u) {
887             sql += " AND ";
888         }
889         sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
890     }
891     sql += ") temp_table ";
892     sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
893     sql += " AND " + logTargetTable + ".flag&0x08=0x00";
894     return sql;
895 }
896 
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)897 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
898     const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
899 {
900     int errCode = E_OK;
901     if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
902         LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
903             syncBatch.timestamp.size());
904         return -E_INVALID_ARGS;
905     }
906     int matchCount = 0;
907     for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
908         errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
909         if (errCode != E_OK) {
910             LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
911             break;
912         }
913         errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
914         if (errCode != E_OK) {
915             LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
916             break;
917         }
918         while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
919             std::string gid;
920             (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
921             if (gid.empty()) {
922                 LOGE("[RDBExecutor] reference data don't contain gid.");
923                 errCode = -E_CLOUD_ERROR;
924                 break;
925             }
926             referenceGid[syncBatch.rowid[i]][targetTable] = gid;
927             matchCount++;
928         }
929         if (errCode == -E_FINISHED) {
930             errCode = E_OK;
931         }
932         if (errCode != E_OK) {
933             LOGE("[RDBExecutor] step stmt failed %d.", errCode);
934             break;
935         }
936         SQLiteUtils::ResetStatement(statement, false, errCode);
937         if (errCode != E_OK) {
938             LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
939             break;
940         }
941     }
942     if (matchCount != 0) {
943         LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
944     }
945     return errCode;
946 }
947 
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)948 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
949     const std::vector<TableReferenceProperty> &targetTableReference)
950 {
951     PairStringVector sourceTargetFiled;
952     for (const auto &reference : targetTableReference) {
953         for (const auto &column : reference.columns) {
954             sourceTargetFiled.first.push_back(column.first);
955             sourceTargetFiled.second.push_back(column.second);
956         }
957     }
958     return sourceTargetFiled;
959 }
960 
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)961 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
962     bool ignoreEmptyGid, sqlite3_stmt *&stmt)
963 {
964     int fillGidCount = 0;
965     int errCode = E_OK;
966     for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
967         auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
968         if (gidEntry == cloudDataResult.insData.extend[i].end()) {
969             bool isSkipAssetsMissRecord = false;
970             if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
971                 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
972                 isSkipAssetsMissRecord = true;
973             }
974             if (ignoreEmptyGid || isSkipAssetsMissRecord) {
975                 continue;
976             }
977             errCode = -E_INVALID_ARGS;
978             LOGE("[RDBExecutor] Extend not contain gid.");
979             break;
980         }
981         bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
982         if (ignoreEmptyGid && containError) {
983             continue;
984         }
985         std::string val;
986         if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
987             cloudDataResult.insData.extend[i], val) != E_OK) {
988             errCode = -E_CLOUD_ERROR;
989             LOGE("[RDBExecutor] Can't get string gid from extend.");
990             break;
991         }
992         if (val.empty()) {
993             errCode = -E_CLOUD_ERROR;
994             LOGE("[RDBExecutor] Get empty gid from extend.");
995             break;
996         }
997         errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
998         if (errCode != E_OK) {
999             LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
1000             break;
1001         }
1002     }
1003     LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
1004     return errCode;
1005 }
1006 
CleanExtendAndCursorForDeleteData(const std::string & tableName)1007 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
1008 {
1009     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1010     std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
1011     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1012     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1013         LOGE("update extend field and cursor failed %d.", errCode);
1014     }
1015     return errCode;
1016 }
1017 
CheckIfExistUserTable(const std::string & tableName)1018 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
1019 {
1020     std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
1021     sqlite3_stmt *statement = nullptr;
1022     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1023     if (errCode != E_OK) {
1024         LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
1025         return errCode;
1026     }
1027     errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
1028     if (errCode != E_OK) {
1029         LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
1030         int ret = E_OK;
1031         SQLiteUtils::ResetStatement(statement, true, ret);
1032         return errCode;
1033     }
1034     if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1035         LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
1036         SQLiteUtils::ResetStatement(statement, true, errCode);
1037         return -E_INVALID_ARGS;
1038     }
1039     SQLiteUtils::ResetStatement(statement, true, errCode);
1040     return E_OK;
1041 }
1042 
GetCloudDeleteSql(const std::string & table,std::string & sql)1043 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table, std::string &sql)
1044 {
1045     uint64_t cursor = DBConstant::INVALID_CURSOR;
1046     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, table, cursor);
1047     if (errCode != E_OK) {
1048         LOGE("[GetCloudDeleteSql] Get cursor of table[%s length[%u]] failed: %d",
1049             DBCommon::StringMiddleMasking(table).c_str(), table.length(), errCode);
1050         return errCode;
1051     }
1052     sql += " cloud_gid = '', version = '', ";
1053     if (isLogicDelete_) {
1054         // cursor already increased by DeleteCloudData, can be assigned directly here
1055         // 1001 which is logicDelete|cloudForcePush|local|delete
1056         sql += "flag = (flag&" + std::string(CONSISTENT_FLAG) + "|" +
1057                std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1058                               static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) +
1059                ")&" + std::to_string(~static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) +
1060                ", cursor = " + std::to_string(cursor) + " ";
1061     } else {
1062         sql += "data_key = -1, flag = (flag&" + std::string(CONSISTENT_FLAG) + "|" +
1063                std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ")&" +
1064                std::to_string(~static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) + ", sharing_resource = ''";
1065         errCode = SetCursor(table, cursor + 1);
1066         if (errCode == E_OK) {
1067             sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1068         } else {
1069             LOGE("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1070             return errCode;
1071         }
1072     }
1073     return E_OK;
1074 }
1075 
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1076 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1077 {
1078     int errCode = E_OK;
1079     std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1080         std::to_string(dataKey);
1081     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1082     if (errCode != E_OK) {
1083         LOGE("[RDBExecutor] remove data failed %d", errCode);
1084         return errCode;
1085     }
1086     std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1087         std::to_string(dataKey);
1088     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1089     if (errCode != E_OK) {
1090         LOGE("[RDBExecutor] remove log failed %d", errCode);
1091     }
1092     return errCode;
1093 }
1094 
GetLocalDataKey(size_t index,const DownloadData & downloadData)1095 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1096     const DownloadData &downloadData)
1097 {
1098     if (index >= downloadData.existDataKey.size()) {
1099         LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1100         return -1; // -1 means not exist
1101     }
1102     return downloadData.existDataKey[index];
1103 }
1104 
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1105 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1106     sqlite3_stmt *&stmt, int &fillGidCount)
1107 {
1108     int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1109     if (errCode != E_OK) {
1110         return errCode;
1111     }
1112     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1113     if (errCode != E_OK) {
1114         return errCode;
1115     }
1116     errCode = SQLiteUtils::StepWithRetry(stmt, false);
1117     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1118         errCode = E_OK;
1119         fillGidCount++;
1120         SQLiteUtils::ResetStatement(stmt, false, errCode);
1121     } else {
1122         LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1123     }
1124     return errCode;
1125 }
1126 
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1127 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1128     const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity)
1129 {
1130     auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1131     return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, localIdentity);
1132 }
1133 
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1134 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1135     const RelationalSchemaObject &localSchema)
1136 {
1137     std::vector<int64_t> dataKeys;
1138     std::string logTableName = DBCommon::GetLogTableName(tableName);
1139     int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1140     if (errCode != E_OK) {
1141         LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1142         return errCode;
1143     }
1144     std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1145     errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1146     if (errCode != E_OK) {
1147         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1148     }
1149     return errCode;
1150 }
1151 
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1152 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1153     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1154 {
1155     int errCode = E_OK;
1156     for (const auto &fieldInfo : fieldInfos) {
1157         if (fieldInfo.IsAssetType()) {
1158             Assets assets;
1159             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1160             if (errCode != E_OK) {
1161                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1162                 return errCode;
1163             }
1164             errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1165             if (errCode != E_OK) {
1166                 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1167                 return errCode;
1168             }
1169         } else if (fieldInfo.IsAssetsType()) {
1170             errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1171             if (errCode != E_OK) {
1172                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1173                 return errCode;
1174             }
1175         }
1176     }
1177     return errCode;
1178 }
1179 
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1180 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1181     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1182 {
1183     if (assets.empty()) { // LCOV_EXCL_BR_LINE
1184         return E_OK;
1185     }
1186     int errCode = E_OK;
1187     int ret = E_OK;
1188     sqlite3_stmt *stmt = nullptr;
1189     size_t index = 0;
1190     for (const auto &rowId : dataKeys) {
1191         if (rowId == -1) { // -1 means data is deleted
1192             continue;
1193         }
1194         if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1195             index++;
1196             continue;
1197         }
1198         std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1199             std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1200         errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1201         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1202             LOGE("Get statement failed, %d", errCode);
1203             return errCode;
1204         }
1205         assets[index].assetId = "";
1206         assets[index].status &= ~AssetStatus::UPLOADING;
1207         errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1208         index++;
1209         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1210             LOGE("Bind asset to blob statement failed, %d", errCode);
1211             goto END;
1212         }
1213         errCode = SQLiteUtils::StepWithRetry(stmt);
1214         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1215             errCode = E_OK;
1216         } else {
1217             LOGE("Step statement failed, %d", errCode);
1218             goto END;
1219         }
1220         SQLiteUtils::ResetStatement(stmt, true, ret);
1221     }
1222     return errCode != E_OK ? errCode : ret;
1223 END:
1224     SQLiteUtils::ResetStatement(stmt, true, ret);
1225     return errCode != E_OK ? errCode : ret;
1226 }
1227 
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1228 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1229     const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1230 {
1231     int errCode = E_OK;
1232     int ret = E_OK;
1233     sqlite3_stmt *selectStmt = nullptr;
1234     for (const auto &rowId : dataKeys) {
1235         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1236             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1237         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1238         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1239             LOGE("Get select assets statement failed, %d.", errCode);
1240             goto END;
1241         }
1242         Assets assets;
1243         errCode = GetAssetsByRowId(selectStmt, assets);
1244         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1245             LOGE("Get assets by rowId failed, %d.", errCode);
1246             goto END;
1247         }
1248         SQLiteUtils::ResetStatement(selectStmt, true, ret);
1249         if (assets.empty()) { // LCOV_EXCL_BR_LINE
1250             continue;
1251         }
1252         for (auto &asset : assets) {
1253             asset.assetId = "";
1254             asset.status &= ~AssetStatus::UPLOADING;
1255         }
1256         std::vector<uint8_t> assetsValue;
1257         errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1258         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1259             LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1260             return errCode;
1261         }
1262         errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1263         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1264             LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1265             return errCode;
1266         }
1267     }
1268     return errCode != E_OK ? errCode : ret;
1269 END:
1270     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1271     return errCode != E_OK ? errCode : ret;
1272 }
1273 
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1274 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1275     const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1276 {
1277     std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1278         std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1279     sqlite3_stmt *stmt = nullptr;
1280     int ret = E_OK;
1281     int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1282     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1283         LOGE("Get statement failed, %d", errCode);
1284         SQLiteUtils::ResetStatement(stmt, true, ret);
1285         return errCode;
1286     }
1287     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1288     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1289         SQLiteUtils::ResetStatement(stmt, true, ret);
1290         return errCode;
1291     }
1292     errCode = SQLiteUtils::StepWithRetry(stmt);
1293     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1294         errCode = E_OK;
1295     }
1296     SQLiteUtils::ResetStatement(stmt, true, ret);
1297     return errCode != E_OK ? errCode : ret;
1298 }
1299 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1300 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1301     const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1302 {
1303     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1304     auto &[errCode, status] = res;
1305     std::vector<Field> assetFields;
1306     std::string sql = "SELECT";
1307     for (const auto &field: tableSchema.fields) {
1308         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1309             assetFields.emplace_back(field);
1310             sql += " b." + field.colName + ",";
1311         }
1312     }
1313     if (assetFields.empty()) {
1314         return { -E_NOT_FOUND, status };
1315     }
1316     sql += "a.cloud_gid, a.status ";
1317     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1318         (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR  a.hash_key = ?);");
1319     sqlite3_stmt *stmt = nullptr;
1320     errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1321     if (errCode != E_OK) {
1322         return res;
1323     }
1324     errCode = SQLiteUtils::StepWithRetry(stmt);
1325     int index = 0;
1326     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1327         for (const auto &field: assetFields) {
1328             Type cloudValue;
1329             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1330             if (errCode != E_OK) {
1331                 break;
1332             }
1333             errCode = PutVBucketByType(assets, field, cloudValue);
1334             if (errCode != E_OK) {
1335                 break;
1336             }
1337         }
1338         std::string curGid;
1339         errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1340         if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1341             // Gid is different, there may be duplicate primary keys in the cloud
1342             errCode = -E_CLOUD_GID_MISMATCH;
1343         }
1344         status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1345     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1346         errCode = -E_NOT_FOUND;
1347     } else {
1348         LOGE("step get asset stmt failed %d.", errCode);
1349     }
1350     SQLiteUtils::ResetStatement(stmt, true, errCode);
1351     return res;
1352 }
1353 
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1354 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1355     const Bytes &hashKey, sqlite3_stmt *&stmt)
1356 {
1357     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1358     if (errCode != E_OK) {
1359         LOGE("Get asset statement failed, %d.", errCode);
1360         return errCode;
1361     }
1362     int index = 1;
1363     int ret = E_OK;
1364     if (!gid.empty()) {
1365         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1366         if (errCode != E_OK) {
1367             LOGE("bind gid failed %d.", errCode);
1368             SQLiteUtils::ResetStatement(stmt, true, ret);
1369             return errCode;
1370         }
1371     }
1372     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1373     if (errCode != E_OK) {
1374         LOGE("bind hash failed %d.", errCode);
1375         SQLiteUtils::ResetStatement(stmt, true, ret);
1376     }
1377     return errCode;
1378 }
1379 
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1380 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1381     bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1382 {
1383     int errCode = E_OK;
1384     switch (opType) {
1385         case OpType::UPDATE_VERSION: // fallthrough
1386         case OpType::INSERT_VERSION: {
1387             errCode = FillCloudVersionForUpload(opType, data);
1388             break;
1389         }
1390         case OpType::SET_UPLOADING: {
1391             errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1392             if (errCode != E_OK) {
1393                 LOGE("Failed to set uploading for ins data, %d.", errCode);
1394                 return errCode;
1395             }
1396             errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1397             break;
1398         }
1399         case OpType::INSERT: {
1400             errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1401             if (errCode != E_OK) {
1402                 LOGE("Failed to fill cloud log gid, %d.", errCode);
1403                 return errCode;
1404             }
1405             if (fillAsset) {
1406                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1407                 if (errCode != E_OK) {
1408                     LOGE("Failed to fill asset for ins, %d.", errCode);
1409                     return errCode;
1410                 }
1411             }
1412             errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1413             break;
1414         }
1415         case OpType::UPDATE: {
1416             if (fillAsset && !data.updData.assets.empty()) {
1417                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1418                 if (errCode != E_OK) {
1419                     LOGE("Failed to fill asset for upd, %d.", errCode);
1420                     return errCode;
1421                 }
1422             }
1423             errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1424             break;
1425         }
1426         default:
1427             break;
1428     }
1429     return errCode;
1430 }
1431 
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1432 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1433 {
1434     int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1435     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1436         std::vector<uint8_t> blobValue;
1437         errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1438         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1439             LOGE("Get column blob value failed %d.", errCode);
1440             return errCode;
1441         }
1442         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1443         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1444             LOGE("Transfer blob to assets failed %d", errCode);
1445         }
1446         return errCode;
1447     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1448         return E_OK;
1449     } else {
1450         LOGE("Step select statement failed %d.", errCode);
1451         return errCode;
1452     }
1453 }
1454 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1455 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1456 {
1457     assetLoader_ = loader;
1458 }
1459 
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1460 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1461     int beginIndex, const std::string &cloudGid)
1462 {
1463     int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1464     if (errCode != E_OK) {
1465         LOGE("Bind cloud gid to statement failed %d.", errCode);
1466         int ret = E_OK;
1467         SQLiteUtils::ResetStatement(stmt, true, ret);
1468         return errCode;
1469     }
1470     errCode = SQLiteUtils::StepWithRetry(stmt);
1471     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1472         errCode = E_OK;
1473     } else {
1474         LOGE("Fill cloud asset failed: %d.", errCode);
1475     }
1476     int ret = E_OK;
1477     SQLiteUtils::ResetStatement(stmt, true, ret);
1478     return errCode != E_OK ? errCode : ret;
1479 }
1480 
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1481 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1482     const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1483 {
1484     if (assetLoader_ == nullptr) {
1485         LOGE("assetLoader may be not set.");
1486         return -E_NOT_SET;
1487     }
1488     std::vector<Asset> toDeleteAssets;
1489     CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1490     if (toDeleteAssets.empty()) {
1491         return E_OK;
1492     }
1493     DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1494     if (ret != OK) {
1495         LOGE("remove local assets failed %d.", ret);
1496         return -E_REMOVE_ASSETS_FAILED;
1497     }
1498     return E_OK;
1499 }
1500 
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1501 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1502     const VBucket &assets, sqlite3_stmt *&statement)
1503 {
1504     std::string sql = "UPDATE '" + tableName + "' SET ";
1505     for (const auto &item: assets) {
1506         sql += item.first + " = ?,";
1507     }
1508     sql.pop_back();
1509     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1510     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1511     if (errCode != E_OK) {
1512         return errCode;
1513     }
1514     int bindIndex = 1;
1515     for (const auto &item: assets) {
1516         Field field = {
1517             .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1518         };
1519         errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1520         if (errCode != E_OK) {
1521             return errCode;
1522         }
1523     }
1524     return errCode;
1525 }
1526 
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1527 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1528     const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1529 {
1530     if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1531         opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1532         return E_OK;
1533     }
1534     if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1535         // this is shared table, not need to update asset id.
1536         return E_OK;
1537     }
1538     bool isNotIncCursor = false; // if isNotIncCursor is false, will increase cursor
1539     if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket, isNotIncCursor)) {
1540         return E_OK;
1541     }
1542     int error = SetCursorIncFlag(!isNotIncCursor);
1543     if (error != E_OK) {
1544         LOGE("[Storage Executor] failed to set cursor_inc_flag, %d.", error);
1545         return error;
1546     }
1547     int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1548     if (errCode != E_OK) {
1549         LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1550     }
1551     if (isNotIncCursor) {
1552         error = SetCursorIncFlag(true);
1553         if (error != E_OK) {
1554             LOGE("[Storage Executor] failed to set cursor_inc_flag true, %d.", error);
1555             return error;
1556         }
1557     }
1558     return errCode;
1559 }
1560 
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1561 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1562     Asset &asset)
1563 {
1564     for (const auto &[col, value] : vBucket) {
1565         if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1566             asset = std::get<Asset>(value);
1567         }
1568     }
1569 }
1570 
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1571 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1572     Assets &assets)
1573 {
1574     for (const auto &[col, value] : vBucket) {
1575         if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1576             assets = std::get<Assets>(value);
1577         }
1578     }
1579 }
1580 
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1581 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1582     sqlite3_stmt *&stmt)
1583 {
1584     std::vector<uint8_t> blobValue;
1585     int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1586     if (errCode != E_OK) {
1587         LOGE("Transfer asset to blob failed, %d.", errCode);
1588         return errCode;
1589     }
1590     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1591     if (errCode != E_OK) {
1592         LOGE("Bind asset blob to statement failed, %d.", errCode);
1593     }
1594     return errCode;
1595 }
1596 
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1597 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1598     sqlite3_stmt *&stmt)
1599 {
1600     std::vector<uint8_t> blobValue;
1601     int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1602     if (errCode != E_OK) {
1603         LOGE("Transfer asset to blob failed, %d.", errCode);
1604         return errCode;
1605     }
1606     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1607     if (errCode != E_OK) {
1608         LOGE("Bind asset blob to statement failed, %d.", errCode);
1609     }
1610     return errCode;
1611 }
1612 
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1613 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1614     const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1615 {
1616     int assetIndex = 0;
1617     int assetsIndex = 0;
1618     for (const auto &field : tableSchema.fields) {
1619         if (field.type == TYPE_INDEX<Asset>) {
1620             if (assetOfOneRecord[assetIndex].name.empty()) {
1621                 continue;
1622             }
1623             int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1624             if (errCode != E_OK) {
1625                 LOGE("Bind asset to blob statement failed, %d.", errCode);
1626                 return errCode;
1627             }
1628             assetIndex++;
1629         } else if (field.type == TYPE_INDEX<Assets>) {
1630             if (assetsOfOneRecord[assetsIndex].empty()) {
1631                 continue;
1632             }
1633             int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1634             if (errCode != E_OK) {
1635                 LOGE("Bind assets to blob statement failed, %d.", errCode);
1636                 return errCode;
1637             }
1638             assetsIndex++;
1639         }
1640     }
1641     return E_OK;
1642 }
1643 
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1644 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1645     const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1646 {
1647     int errCode = E_OK;
1648     int ret = E_OK;
1649     sqlite3_stmt *stmt = nullptr;
1650     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1651     if (errCode != E_OK) {
1652         LOGE("Get update asset statement failed, %d.", errCode);
1653         return errCode;
1654     }
1655     errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1656     if (errCode != E_OK) {
1657         LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1658         SQLiteUtils::ResetStatement(stmt, true, ret);
1659         return errCode != E_OK ? errCode : ret;
1660     }
1661     errCode = SQLiteUtils::StepWithRetry(stmt);
1662     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1663         errCode = E_OK;
1664     } else {
1665         LOGE("Step statement failed, %d", errCode);
1666     }
1667     SQLiteUtils::ResetStatement(stmt, true, ret);
1668     return errCode != E_OK ? errCode : ret;
1669 }
1670 
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1671 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1672     const VBucket &vBucket)
1673 {
1674     int errCode = E_OK;
1675     std::vector<Asset> assetOfOneRecord;
1676     std::vector<Assets> assetsOfOneRecord;
1677     std::string updateAssetIdSql = "UPDATE " + tableSchema.name  + " SET";
1678     for (const auto &field : tableSchema.fields) {
1679         if (field.type == TYPE_INDEX<Asset>) {
1680             Asset asset;
1681             UpdateLocalAssetId(vBucket, field.colName, asset);
1682             assetOfOneRecord.push_back(asset);
1683             if (!asset.name.empty()) {
1684                 updateAssetIdSql += " " + field.colName + " = ?,";
1685             }
1686         }
1687         if (field.type == TYPE_INDEX<Assets>) {
1688             Assets assets;
1689             UpdateLocalAssetsId(vBucket, field.colName, assets);
1690             assetsOfOneRecord.push_back(assets);
1691             if (!assets.empty()) {
1692                 updateAssetIdSql += " " + field.colName + " = ?,";
1693             }
1694         }
1695     }
1696     if (updateAssetIdSql == "UPDATE " + tableSchema.name  + " SET") {
1697         return E_OK;
1698     }
1699     updateAssetIdSql.pop_back();
1700     updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1701     errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1702     if (errCode != E_OK) {
1703         LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1704     }
1705     return errCode;
1706 }
1707 
SetPutDataMode(PutDataMode mode)1708 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1709 {
1710     putDataMode_ = mode;
1711 }
1712 
SetMarkFlagOption(MarkFlagOption option)1713 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1714 {
1715     markFlagOption_ = option;
1716 }
1717 
GetDataFlag()1718 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1719 {
1720     if (putDataMode_ != PutDataMode::USER) {
1721         return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1722             static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1723     }
1724     uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1725     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1726         flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1727     }
1728     flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1729     return static_cast<int64_t>(flag);
1730 }
1731 
GetUpdateDataFlagSql(const VBucket & data)1732 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql(const VBucket &data)
1733 {
1734     std::string retentionFlag = "flag = (flag & " +
1735                                 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY) |
1736                                                static_cast<uint32_t>(LogInfoFlag::FLAG_ASSET_DOWNLOADING_FOR_ASYNC)) +
1737                                 ") | " + std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL));
1738     if (putDataMode_ == PutDataMode::SYNC) {
1739         if (CloudStorageUtils::IsAssetsContainDownloadRecord(data)) {
1740             return retentionFlag;
1741         }
1742         return UPDATE_FLAG_CLOUD;
1743     }
1744     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1745         return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1746     }
1747     return retentionFlag;
1748 }
1749 
GetDev()1750 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1751 {
1752     return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1753 }
1754 
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1755 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1756     const TableSchema &tableSchema)
1757 {
1758     std::set<std::string> useFields;
1759     std::vector<Field> fields;
1760     if (putDataMode_ == PutDataMode::SYNC) {
1761         for (const auto &field : tableSchema.fields) {
1762             useFields.insert(field.colName);
1763         }
1764         fields = tableSchema.fields;
1765     } else {
1766         for (const auto &field : vBucket) {
1767             if (field.first.empty() || field.first[0] == '#') {
1768                 continue;
1769             }
1770             useFields.insert(field.first);
1771         }
1772         for (const auto &field : tableSchema.fields) {
1773             if (useFields.find(field.colName) == useFields.end()) {
1774                 continue;
1775             }
1776             fields.push_back(field);
1777         }
1778     }
1779     return fields;
1780 }
1781 
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1782 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1783     const LogInfo &logInfo)
1784 {
1785     bool useHashKey = false;
1786     if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1787         if (logInfo.hashKey.empty()) {
1788             LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1789             return -E_INVALID_ARGS;
1790         }
1791         useHashKey = true;
1792     }
1793     sqlite3_stmt *stmt = nullptr;
1794     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1795     if (errCode != E_OK) {
1796         LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1797         return errCode;
1798     }
1799     int ret = E_OK;
1800     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1801     if (errCode != E_OK) {
1802         LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1803         SQLiteUtils::ResetStatement(stmt, true, ret);
1804         return errCode;
1805     }
1806     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1807     if (errCode != E_OK) {
1808         LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1809         SQLiteUtils::ResetStatement(stmt, true, ret);
1810         return errCode;
1811     }
1812     if (useHashKey) {
1813         errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1814         if (errCode != E_OK) {
1815             LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1816             SQLiteUtils::ResetStatement(stmt, true, ret);
1817             return errCode;
1818         }
1819     }
1820     errCode = SQLiteUtils::StepWithRetry(stmt);
1821     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1822         errCode = E_OK;
1823     } else {
1824         LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1825     }
1826     SQLiteUtils::ResetStatement(stmt, true, ret);
1827     return errCode == E_OK ? ret : errCode;
1828 }
1829 
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp,bool isExistAssetsDownload)1830 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1831     const Key &hashKey, Timestamp timestamp, bool isExistAssetsDownload)
1832 {
1833     sqlite3_stmt *stmt = nullptr;
1834     int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName,
1835         isExistAssetsDownload), stmt);
1836     if (errCode != E_OK) {
1837         LOGW("[Storage Executor] Get statement fail! errCode:%d", errCode);
1838         return;
1839     }
1840     int index = 1;
1841     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1842     if (errCode != E_OK) {
1843         SQLiteUtils::ResetStatement(stmt, true, errCode);
1844         LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1845         return;
1846     }
1847     errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1848     if (errCode != E_OK) {
1849         SQLiteUtils::ResetStatement(stmt, true, errCode);
1850         LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1851         return;
1852     }
1853     errCode = SQLiteUtils::StepWithRetry(stmt);
1854     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1855         errCode = E_OK;
1856     } else {
1857         LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1858     }
1859     SQLiteUtils::ResetStatement(stmt, true, errCode);
1860 }
1861 
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data,bool isQueryDownloadRecords)1862 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1863     std::vector<VBucket> &data, bool isQueryDownloadRecords)
1864 {
1865     std::string sql = "SELECT ";
1866     std::vector<Field> pkFields;
1867     for (const auto &field : table.fields) {
1868         if (!field.primary) {
1869             continue;
1870         }
1871         sql += "b." + field.colName + ",";
1872         pkFields.push_back(field);
1873     }
1874     if (pkFields.empty()) {
1875         // ignore no pk table
1876         return E_OK;
1877     }
1878     sql.pop_back();
1879     if (isQueryDownloadRecords) {
1880         sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " +
1881             FLAG_IS_WAIT_COMPENSATED_CONTAIN_DOWNLOAD_SYNC;
1882     } else {
1883         sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1884     }
1885     sqlite3_stmt *stmt = nullptr;
1886     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1887     if (errCode != E_OK) {
1888         LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1889         return errCode;
1890     }
1891     do {
1892         errCode = SQLiteUtils::StepWithRetry(stmt);
1893         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1894             VBucket pkData;
1895             errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1896             if (errCode != E_OK) {
1897                 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1898                 break;
1899             }
1900             data.push_back(pkData);
1901         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1902             errCode = E_OK;
1903             break;
1904         } else {
1905             LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1906             break;
1907         }
1908     } while (errCode == E_OK);
1909     int ret = E_OK;
1910     SQLiteUtils::ResetStatement(stmt, true, ret);
1911     return errCode == E_OK ? ret : errCode;
1912 }
1913 
ClearUnLockingStatus(const std::string & tableName)1914 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1915 {
1916     std::string sql;
1917     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1918         "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1919     sqlite3_stmt *stmt = nullptr;
1920     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1921     if (errCode != E_OK) {
1922         LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1923         return errCode;
1924     }
1925     int ret = E_OK;
1926     errCode = SQLiteUtils::StepWithRetry(stmt);
1927     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1928         errCode = E_OK;
1929     } else {
1930         LOGE("[Storage Executor] Step update record status stmt failed, %d.", errCode);
1931     }
1932     SQLiteUtils::ResetStatement(stmt, true, ret);
1933     return errCode == E_OK ? ret : errCode;
1934 }
1935 
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1936 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1937     int startIndex, VBucket &record)
1938 {
1939     int errCode = E_OK;
1940     for (const auto &field : fields) {
1941         Type cloudValue;
1942         errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1943         if (errCode != E_OK) {
1944             break;
1945         }
1946         errCode = PutVBucketByType(record, field, cloudValue);
1947         if (errCode != E_OK) {
1948             break;
1949         }
1950         startIndex++;
1951     }
1952     return errCode;
1953 }
1954 
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt,int & index)1955 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1956     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt, int &index)
1957 {
1958     int errCode = E_OK;
1959     std::string version;
1960     if (putDataMode_ == PutDataMode::SYNC) {
1961         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1962         if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1963             LOGE("get version for insert log statement failed, %d", errCode);
1964             return -E_CLOUD_ERROR;
1965         }
1966     }
1967     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, version); // next is version
1968     if (errCode != E_OK) {
1969         LOGE("Bind version to insert log statement failed, %d", errCode);
1970         return errCode;
1971     }
1972 
1973     std::string shareUri;
1974     if (putDataMode_ == PutDataMode::SYNC) {
1975         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1976             vBucket, shareUri);
1977         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1978             LOGE("get shareUri for insert log statement failed, %d", errCode);
1979             return -E_CLOUD_ERROR;
1980         }
1981     }
1982 
1983     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, shareUri); // next is sharing_resource
1984     if (errCode != E_OK) {
1985         LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1986     }
1987     return errCode;
1988 }
1989 
CheckAndCreateTrigger(const TableInfo & table)1990 void SQLiteSingleVerRelationalStorageExecutor::CheckAndCreateTrigger(const TableInfo &table)
1991 {
1992     auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
1993     tableManager->CheckAndCreateTrigger(dbHandle_, table, "");
1994 }
1995 
ClearLogOfMismatchedData(const std::string & tableName)1996 void SQLiteSingleVerRelationalStorageExecutor::ClearLogOfMismatchedData(const std::string &tableName)
1997 {
1998     bool isLogTableExist = false;
1999     int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
2000     if (!isLogTableExist) {
2001         return;
2002     }
2003     std::string sql = "SELECT data_key from " + DBCommon::GetLogTableName(tableName) + " WHERE data_key NOT IN " +
2004         "(SELECT _rowid_ FROM " + tableName + ") AND data_key != -1";
2005     sqlite3_stmt *stmt = nullptr;
2006     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2007     if (errCode != E_OK) {
2008         LOGW("[RDBExecutor][ClearMisLog] Get stmt failed, %d", errCode);
2009         return;
2010     }
2011     std::vector<int64_t> dataKeys;
2012     std::string misDataKeys = "(";
2013     errCode = SQLiteUtils::StepWithRetry(stmt, false);
2014     while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2015         int dataKey = sqlite3_column_int64(stmt, 0);
2016         dataKeys.push_back(dataKey);
2017         misDataKeys += std::to_string(dataKey) + ",";
2018         errCode = SQLiteUtils::StepWithRetry(stmt, false);
2019     }
2020     SQLiteUtils::ResetStatement(stmt, true, errCode);
2021     stmt = nullptr;
2022     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2023         LOGW("[RDBExecutor][ClearMisLog] Step failed. %d", errCode);
2024         return;
2025     }
2026     if (dataKeys.empty()) {
2027         return;
2028     }
2029     misDataKeys.pop_back();
2030     misDataKeys += ")";
2031     LOGW("[RDBExecutor][ClearMisLog] Mismatched:%s", misDataKeys.c_str());
2032     std::string delSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key IN " + misDataKeys;
2033     errCode = SQLiteUtils::GetStatement(dbHandle_, delSql, stmt);
2034     if (errCode != E_OK) {
2035         LOGW("[RDBExecutor][ClearMisLog] Get del stmt failed, %d", errCode);
2036         return;
2037     }
2038     errCode = SQLiteUtils::StepWithRetry(stmt, false);
2039     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2040         LOGW("[RDBExecutor][ClearMisLog] Step del failed, %d", errCode);
2041     }
2042     SQLiteUtils::ResetStatement(stmt, true, errCode);
2043 }
2044 
RecoverNullExtendLog(const TrackerSchema & trackerSchema,const TrackerTable & table)2045 void SQLiteSingleVerRelationalStorageExecutor::RecoverNullExtendLog(const TrackerSchema &trackerSchema,
2046     const TrackerTable &table)
2047 {
2048     if (trackerSchema.extendColNames.empty()) {
2049         return;
2050     }
2051     std::string sql = "SELECT COUNT(1) FROM " + DBCommon::GetLogTableName(trackerSchema.tableName) +
2052         " WHERE (json_valid(extend_field) = 0 OR json_type(extend_field) IS NOT 'object') AND data_key != -1";
2053     if (!SQLiteRelationalUtils::ExecuteCheckSql(dbHandle_, sql).second) {
2054         return;
2055     }
2056     auto errCode = SQLiteUtils::BeginTransaction(dbHandle_, TransactType::IMMEDIATE);
2057     if (errCode != E_OK) {
2058         LOGE("[RDBExecutor][RecoverTracker] Begin transaction fail %d", errCode);
2059         return;
2060     }
2061     errCode = RecoverNullExtendLogInner(trackerSchema, table);
2062     int changeRow = 0;
2063     if (errCode == E_OK) {
2064         changeRow = sqlite3_changes(dbHandle_);
2065         errCode = SQLiteUtils::CommitTransaction(dbHandle_);
2066     } else {
2067         (void)SQLiteUtils::RollbackTransaction(dbHandle_);
2068     }
2069     LOGI("[RDBExecutor][RecoverTracker] Recover tracker[%s][%zu] finished[%d] changeRow[%d]",
2070         DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(),
2071         errCode, changeRow);
2072 }
2073 
RecoverNullExtendLogInner(const TrackerSchema & trackerSchema,const TrackerTable & table)2074 int SQLiteSingleVerRelationalStorageExecutor::RecoverNullExtendLogInner(const TrackerSchema &trackerSchema,
2075     const TrackerTable &table)
2076 {
2077     std::vector<std::function<int()>> actions;
2078     actions.emplace_back([this]() {
2079         return SetLogTriggerStatus(false);
2080     });
2081     actions.emplace_back([this, &table]() {
2082         return SQLiteUtils::ExecuteRawSQL(dbHandle_, table.GetTempUpdateLogCursorTriggerSql());
2083     });
2084     actions.emplace_back([this, &trackerSchema]() {
2085         return UpdateExtendField(trackerSchema.tableName, trackerSchema.extendColNames,
2086             " AND (json_valid(log.extend_field) = 0 OR json_type(log.extend_field) IS NOT 'object')");
2087     });
2088     actions.emplace_back([this]() {
2089         return ClearAllTempSyncTrigger();
2090     });
2091     actions.emplace_back([this]() {
2092         return SetLogTriggerStatus(true);
2093     });
2094     return SQLiteRelationalUtils::ExecuteListAction(actions);
2095 }
2096 } // namespace DistributedDB
2097 #endif
2098