• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27 
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32 
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34 
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36     std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38     if (assetFields.empty() && pkSet.empty()) {
39         return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40     }
41     std::string gid;
42     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44         LOGE("Get cloud gid fail when query log table.");
45         return errCode;
46     }
47 
48     if (pkSet.empty() && gid.empty()) {
49         LOGE("query log table failed because of both primary key and gid are empty.");
50         return -E_CLOUD_ERROR;
51     }
52     std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53         " a.cloud_gid, a.sharing_resource, a.status, a.version";
54     for (const auto &field : assetFields) {
55         sql += ", b." + field.colName;
56     }
57     for (const auto &pk : pkSet) {
58         sql += ", b." + pk;
59     }
60     sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61     if (!gid.empty()) {
62         sql += " a.cloud_gid = ? or ";
63     }
64     sql += "a.hash_key = ?";
65     querySql = sql;
66     return E_OK;
67 }
68 
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70     const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72     std::string sql = "UPDATE " + tableName + " SET ";
73     for (const auto &field: fields) {
74         sql += field.colName + " = ?,";
75     }
76     sql.pop_back();
77     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78     sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79     sqlite3_stmt *stmt = nullptr;
80     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81     if (errCode != E_OK) {
82         LOGE("Get fill asset statement failed, %d.", errCode);
83         return errCode;
84     }
85     for (size_t i = 0; i < fields.size(); ++i) {
86         errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87         if (errCode != E_OK) {
88             SQLiteUtils::ResetStatement(stmt, true, errCode);
89             return errCode;
90         }
91     }
92     statement = stmt;
93     return errCode;
94 }
95 
CleanDownloadingFlagByGid(const std::string & tableName,const std::string & gid,VBucket dbAssets)96 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlagByGid(const std::string &tableName,
97     const std::string &gid, VBucket dbAssets)
98 {
99     if (CloudStorageUtils::IsAssetsContainDownloadRecord(dbAssets)) {
100         return E_OK;
101     }
102     std::string sql;
103     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000) where cloud_gid = ?;";
104     sqlite3_stmt *stmt = nullptr;
105     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
106     if (errCode != E_OK) {
107         LOGE("[RDBExecutor]Get stmt failed clean downloading flag:%d, tableName:%s, length:%zu",
108             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
109         return errCode;
110     }
111     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
112     if (errCode != E_OK) {
113         LOGE("[RDBExecutor]bind gid failed when clean downloading flag:%d, tableName:%s, length:%zu",
114             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
115         SQLiteUtils::ResetStatement(stmt, true, errCode);
116         return errCode;
117     }
118     errCode = SQLiteUtils::StepWithRetry(stmt);
119     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
120         errCode = E_OK;
121     } else {
122         LOGE("[RDBExecutor]clean downloading flag failed:%d, tableName:%s, length:%zu",
123             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
124     }
125     int ret = E_OK;
126     SQLiteUtils::ResetStatement(stmt, true, ret);
127     if (ret != E_OK) {
128         LOGE("[RDBExecutor]reset stmt when clean downloading flag:%d, tableName:%s, length:%zu",
129             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
130     }
131     return errCode != E_OK ? errCode : ret;
132 }
133 
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess,uint64_t & currCursor)134 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
135     VBucket &vBucket, bool isDownloadSuccess, uint64_t &currCursor)
136 {
137     std::string cloudGid;
138     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
139     if (errCode != E_OK) {
140         LOGE("Miss gid when fill Asset.");
141         return errCode;
142     }
143     std::vector<Field> assetsField;
144     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
145     if (errCode != E_OK) {
146         LOGE("No assets need to be filled.");
147         return errCode;
148     }
149     CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
150 
151     Bytes hashKey;
152     (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
153     VBucket dbAssets;
154     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
155     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
156         LOGE("get assets by gid or hashkey failed %d.", errCode);
157         return errCode;
158     }
159     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
160         AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
161 
162     if (isDownloadSuccess) {
163         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
164             CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
165         errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid, currCursor);
166         if (errCode != E_OK) {
167             return errCode;
168         }
169     } else {
170         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
171             CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
172     }
173     sqlite3_stmt *stmt = nullptr;
174     errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
175     if (errCode != E_OK) {
176         return errCode;
177     }
178     errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
179     if (errCode != E_OK) {
180         return errCode;
181     }
182     errCode = CleanDownloadingFlagByGid(tableSchema.name, cloudGid, dbAssets);
183     int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
184     return errCode == E_OK ? ret : errCode;
185 }
186 
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid,uint64_t & currCursor)187 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
188     const std::string &gid, uint64_t &currCursor)
189 {
190     uint64_t cursor = DBConstant::INVALID_CURSOR;
191     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
192     if (errCode != E_OK) {
193         LOGE("Get cursor of table[%s length[%u]] failed when increase cursor: %d, gid[%s]",
194             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode, gid.c_str());
195         return errCode;
196     }
197     cursor++;
198     std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
199     sql += " SET cursor = ? where cloud_gid = ?;";
200     sqlite3_stmt *statement = nullptr;
201     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
202     if (errCode != E_OK) {
203         LOGE("get update asset data cursor stmt failed %d.", errCode);
204         return errCode;
205     }
206     ResFinalizer finalizer([statement]() {
207         sqlite3_stmt *statementInner = statement;
208         int ret = E_OK;
209         SQLiteUtils::ResetStatement(statementInner, true, ret);
210         if (ret != E_OK) {
211             LOGW("Reset  stmt failed %d when increase cursor on asset data", ret);
212         }
213     });
214     int index = 1;
215     errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
216     if (errCode != E_OK) {
217         LOGE("bind cursor data stmt failed %d.", errCode);
218         return errCode;
219     }
220     errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
221     if (errCode != E_OK) {
222         LOGE("bind cursor gid data stmt failed %d.", errCode);
223         return errCode;
224     }
225     errCode = SQLiteUtils::StepWithRetry(statement, false);
226     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
227         LOGE("Fill upload asset failed:%d.", errCode);
228         return errCode;
229     }
230     currCursor = cursor;
231     errCode = SetCursor(tableName, cursor);
232     if (errCode != E_OK) {
233         LOGE("Upgrade cursor failed after asset download success %d.", errCode);
234     }
235     return errCode;
236 }
237 
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)238 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
239     const CloudSyncBatch &data)
240 {
241     int errCode = E_OK;
242     if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
243         return errCode;
244     }
245     errCode = SetLogTriggerStatus(false);
246     if (errCode != E_OK) {
247         LOGE("Fail to set log trigger off, %d.", errCode);
248         return errCode;
249     }
250     sqlite3_stmt *stmt = nullptr;
251     for (size_t i = 0; i < data.assets.size(); ++i) {
252         if (data.assets.at(i).empty()) {
253             continue;
254         }
255         if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
256             DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
257             continue;
258         }
259         errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
260         if (errCode != E_OK) {
261             if (errCode == -E_NOT_FOUND) {
262                 errCode = E_OK;
263                 continue;
264             }
265             break;
266         }
267         errCode = SQLiteUtils::StepWithRetry(stmt, false);
268         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
269             LOGE("Fill upload asset failed:%d.", errCode);
270             break;
271         }
272         errCode = E_OK;
273         SQLiteUtils::ResetStatement(stmt, true, errCode);
274         stmt = nullptr;
275         if (errCode != E_OK) {
276             break;
277         }
278     }
279     int ret = E_OK;
280     SQLiteUtils::ResetStatement(stmt, true, ret);
281     int endCode = SetLogTriggerStatus(true);
282     if (endCode != E_OK) {
283         LOGE("Fail to set log trigger off, %d.", endCode);
284         return endCode;
285     }
286     return errCode != E_OK ? errCode : ret;
287 }
288 
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)289 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
290 {
291     switch (opType) {
292         case OpType::UPDATE_VERSION:
293             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
294         case OpType::INSERT_VERSION:
295             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
296         default:
297             LOGE("Fill version with unknown type %d", static_cast<int>(opType));
298             return -E_INVALID_ARGS;
299     }
300 }
301 
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)302 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
303     sqlite3_stmt *&stmt)
304 {
305     int errCode = E_OK;
306     std::string version;
307     if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
308         vBucket, version) != E_OK) {
309         LOGW("get version from vBucket failed.");
310     }
311     if (hashKey.empty()) {
312         LOGE("hash key is empty when update version.");
313         return -E_CLOUD_ERROR;
314     }
315     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
316     if (errCode != E_OK) {
317         return errCode;
318     }
319     errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
320     if (errCode != E_OK) {
321         return errCode;
322     }
323     errCode = SQLiteUtils::StepWithRetry(stmt, false);
324     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
325         errCode = E_OK;
326         SQLiteUtils::ResetStatement(stmt, false, errCode);
327     } else {
328         LOGE("step version stmt failed: %d.", errCode);
329     }
330     return errCode;
331 }
332 
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)333 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
334     const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
335 {
336     VBucket vBucket = data.assets.at(index);
337     VBucket dbAssets;
338     std::string cloudGid;
339     int errCode;
340     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
341     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
342     if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
343         return errCode;
344     }
345     AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
346         AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
347     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
348         action);
349     if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
350         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
351             CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
352     } else {
353         if (DBCommon::IsRecordError(data.extend.at(index)) || DBCommon::IsRecordAssetsMissing(data.extend.at(index))) {
354             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
355                 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
356         } else {
357             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
358                 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
359         }
360     }
361 
362     errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
363     if (errCode != E_OK) {
364         LOGE("get and bind asset failed %d.", errCode);
365         return errCode;
366     }
367     int64_t rowid = data.rowid[index];
368     return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
369 }
370 
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)371 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
372     TableInfo &tableInfo)
373 {
374     return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
375 }
376 
CreateTrackerTable(const TrackerTable & trackerTable,const TableInfo & table,bool checkData)377 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable,
378     const TableInfo &table, bool checkData)
379 {
380     std::unique_ptr<SqliteLogTableManager> tableManager = std::make_unique<SimpleTrackerLogTableManager>();
381     if (trackerTable.IsEmpty()) {
382         // drop trigger
383         return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
384     }
385 
386     // create log table
387     int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
388     if (errCode != E_OK) {
389         return errCode;
390     }
391     // init cursor
392     errCode = InitCursorToMeta(table.GetTableName());
393     if (errCode != E_OK) {
394         return errCode;
395     }
396     errCode = GeneLogInfoForExistedData(dbHandle_, "", table, tableManager, true);
397     if (errCode != E_OK) {
398         LOGE("general tracker log info for existed data failed %d.", errCode);
399         return errCode;
400     }
401     errCode = SetLogTriggerStatus(true);
402     if (errCode != E_OK) {
403         return errCode;
404     }
405     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
406     if (errCode != E_OK) {
407         return errCode;
408     }
409     if (checkData) {
410         return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
411     }
412     return E_OK;
413 }
414 
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)415 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
416 {
417     if (!schema.ToSchemaString().empty()) {
418         return E_OK;
419     }
420     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
421         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
422     Value schemaVal;
423     int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
424     if (errCode != E_OK) {
425         return errCode;
426     }
427     if (schemaVal.empty()) {
428         return -E_NOT_FOUND;
429     }
430     std::string schemaStr;
431     DBCommon::VectorToString(schemaVal, schemaStr);
432     errCode = schema.ParseFromTrackerSchemaString(schemaStr);
433     if (errCode != E_OK) {
434         LOGE("Parse from tracker schema string err.");
435     }
436     return errCode;
437 }
438 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)439 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
440 {
441     sqlite3_stmt *statement = nullptr;
442     int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
443     if (errCode != E_OK) {
444         LOGE("Execute sql failed when prepare stmt.");
445         return errCode;
446     }
447     if (condition.readOnly && !SQLiteUtils::IsStmtReadOnly(statement)) {
448         LOGW("[ExecuteSql] The condition is read-only, but SQL is not read-only");
449     }
450     size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
451     if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
452         LOGE("Sql bind args mismatch.");
453         SQLiteUtils::ResetStatement(statement, true, errCode);
454         return -E_INVALID_ARGS;
455     }
456     for (size_t i = 0; i < condition.bindArgs.size(); i++) {
457         Type type = condition.bindArgs[i];
458         errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
459         if (errCode != E_OK) {
460             int ret = E_OK;
461             SQLiteUtils::ResetStatement(statement, true, ret);
462             return errCode;
463         }
464     }
465     while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
466         VBucket bucket;
467         errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
468         if (errCode != E_OK) {
469             int ret = E_OK;
470             SQLiteUtils::ResetStatement(statement, true, ret);
471             return errCode;
472         }
473         records.push_back(std::move(bucket));
474     }
475     int ret = E_OK;
476     SQLiteUtils::ResetStatement(statement, true, ret);
477     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
478 }
479 
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)480 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
481     const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
482     std::set<std::string> &clearWaterMarkTables)
483 {
484     std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
485     for (const auto &table : changeTables) {
486         std::string logTableName = DBCommon::GetLogTableName(table);
487         bool isExists = false;
488         int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
489         if (errCode != E_OK) {
490             LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
491             return errCode;
492         }
493         if (!isExists) { // table maybe dropped after set reference
494             LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
495             continue;
496         }
497 
498         bool isEmpty = true;
499         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
500         if (errCode != E_OK) {
501             LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
502             clearWaterMarkTables.clear();
503             return errCode;
504         }
505         if (!isEmpty) {
506             clearWaterMarkTables.insert(table);
507         }
508     }
509     LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
510     return E_OK;
511 }
512 
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)513 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
514 {
515     std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
516     if (schemaChanged) {
517         std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
518             "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
519         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
520         if (ret != E_OK) {
521             LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
522             return ret;
523         }
524     }
525     if (tableInfo.GetTrackerTable().IsEmpty()) {
526         return E_OK;
527     }
528     LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
529     int errCode = SetLogTriggerStatus(false);
530     if (errCode != E_OK) {
531         return errCode;
532     }
533     std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
534     TrackerTable trackerTable = tableInfo.GetTrackerTable();
535     errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
536         [this, &sql]() {
537         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
538         if (ret != E_OK) {
539             LOGE("Upgrade log for extend field failed.");
540         }
541         return ret;
542     });
543     return SetLogTriggerStatus(true);
544 }
545 
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)546 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
547 {
548     int errCode = E_OK;
549     std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
550     for (const auto &sql: dropSql) {
551         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
552         if (errCode != E_OK) {
553             LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
554             return errCode;
555         }
556     }
557     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
558     if (errCode != E_OK) {
559         LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
560         return errCode;
561     }
562     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
563     if (errCode != E_OK) {
564         LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
565         return errCode;
566     }
567     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
568     if (errCode != E_OK) {
569         LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
570     }
571     return errCode;
572 }
573 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)574 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
575     ChangeProperties &changeProperties)
576 {
577     std::string fileName;
578     if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
579         LOGE("get db file name failed.");
580         return -E_INVALID_DB;
581     }
582     SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
583     return E_OK;
584 }
585 
ClearAllTempSyncTrigger()586 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
587 {
588     sqlite3_stmt *stmt = nullptr;
589     static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
590     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
591     if (errCode != E_OK) {
592         LOGE("get clear all temp trigger stmt failed %d.", errCode);
593         return errCode;
594     }
595     int ret = E_OK;
596     while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
597         std::string str;
598         (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
599         if (errCode != E_OK) {
600             SQLiteUtils::ResetStatement(stmt, true, ret);
601             return errCode;
602         }
603         std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
604         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
605         if (errCode != E_OK) {
606             LOGE("drop temp trigger failed %d.", errCode);
607             SQLiteUtils::ResetStatement(stmt, true, ret);
608             return errCode;
609         }
610     }
611     SQLiteUtils::ResetStatement(stmt, true, ret);
612     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
613 }
614 
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)615 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
616     bool isOnlyTrackTable)
617 {
618     std::string sql;
619     if (isOnlyTrackTable) {
620         sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
621     } else {
622         sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log SET extend_field = NULL";
623     }
624     sql += " where data_key = -1 and cursor <= ?;";
625     sqlite3_stmt *statement = nullptr;
626     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
627     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
628         LOGE("get clean tracker data stmt failed %d.", errCode);
629         return errCode;
630     }
631     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
632     int ret = E_OK;
633     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
634         LOGE("bind clean tracker data stmt failed %d.", errCode);
635         SQLiteUtils::ResetStatement(statement, true, ret);
636         return errCode;
637     }
638     errCode = SQLiteUtils::StepWithRetry(statement);
639     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
640         errCode = E_OK;
641     } else {
642         LOGE("clean tracker step failed: %d.", errCode);
643     }
644     SQLiteUtils::ResetStatement(statement, true, ret);
645     return errCode == E_OK ? ret : errCode;
646 }
647 
CreateSharedTable(const TableSchema & tableSchema)648 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
649 {
650     LOGI("Create shared table[%s length[%u]]", DBCommon::StringMiddleMasking(tableSchema.name).c_str(),
651         tableSchema.name.length());
652     std::map<int32_t, std::string> cloudFieldTypeMap;
653     cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
654     cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
655     cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
656     cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
657     cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
658     cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
659     cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
660     cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
661 
662     std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
663     std::string primaryKey = ", PRIMARY KEY (";
664     createTableSql += CloudDbConstant::CLOUD_OWNER;
665     createTableSql += " TEXT, ";
666     createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
667     createTableSql += " TEXT";
668     primaryKey += CloudDbConstant::CLOUD_OWNER;
669     bool hasPrimaryKey = false;
670     for (const auto &field : tableSchema.fields) {
671         createTableSql += ", " + field.colName + " ";
672         createTableSql += cloudFieldTypeMap[field.type];
673         createTableSql += field.nullable ? "" : " NOT NULL";
674         if (field.primary) {
675             primaryKey += ", " + field.colName;
676             hasPrimaryKey = true;
677         }
678     }
679     if (hasPrimaryKey) {
680         createTableSql += primaryKey + ")";
681     }
682     createTableSql += ");";
683     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
684     if (errCode != E_OK) {
685         LOGE("Create shared table failed, %d.", errCode);
686     }
687     return errCode;
688 }
689 
DeleteTable(const std::vector<std::string> & tableNames)690 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
691 {
692     for (const auto &tableName : tableNames) {
693         std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
694         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
695         if (errCode != E_OK) {
696             LOGE("Delete table failed, %d.", errCode);
697             return errCode;
698         }
699     }
700     return E_OK;
701 }
702 
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)703 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
704     const std::map<std::string, std::vector<Field>> &updateTableNames)
705 {
706     int errCode = E_OK;
707     std::map<int32_t, std::string> fieldTypeMap;
708     fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
709     fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
710     fieldTypeMap[TYPE_INDEX<double>] = "REAL";
711     fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
712     fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
713     fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
714     fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
715     fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
716     for (const auto &table : updateTableNames) {
717         if (table.second.empty()) {
718             continue;
719         }
720         std::string addColumnSql = "";
721         for (const auto &field : table.second) {
722             addColumnSql += "ALTER TABLE " + table.first + " ADD ";
723             addColumnSql += field.colName + " ";
724             addColumnSql += fieldTypeMap[field.type];
725             addColumnSql += field.primary ? " PRIMARY KEY" : "";
726             addColumnSql += field.nullable ? ";" : " NOT NULL;";
727         }
728         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
729         if (errCode != E_OK) {
730             LOGE("Shared table add column failed, %d.", errCode);
731             return errCode;
732         }
733     }
734     return errCode;
735 }
736 
AlterTableName(const std::map<std::string,std::string> & tableNames)737 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
738 {
739     for (const auto &tableName : tableNames) {
740         std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
741         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
742         if (errCode != E_OK) {
743             LOGE("Alter table name failed, %d.", errCode);
744             return errCode;
745         }
746     }
747     return E_OK;
748 }
749 
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)750 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
751     const VBucket &vBucket, std::string &sql)
752 {
753     std::string gidStr;
754     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
755     if (errCode != E_OK) {
756         LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
757         return errCode;
758     }
759 
760     sql += " WHERE ";
761     if (!gidStr.empty()) {
762         sql += "cloud_gid = '" + gidStr + "'";
763     }
764     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
765     if (!pkMap.empty()) {
766         if (!gidStr.empty()) {
767             sql += " OR ";
768         }
769         sql += "(hash_key = ?);";
770     }
771     return E_OK;
772 }
773 
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)774 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
775 {
776     int errCode = E_OK;
777     for (const auto &tableName: tableNameList) {
778         if (isLogicDelete_) {
779             std::string logTableName = DBCommon::GetLogTableName(tableName);
780             errCode = SetDataOnShareTableWithLogicDelete(tableName, logTableName);
781         } else {
782             errCode = CleanShareTable(tableName);
783         }
784         if (errCode != E_OK) {
785             LOGE("clean share table failed at table:%s, length:%d, deleteType:%d, errCode:%d.",
786                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), isLogicDelete_ ? 1 : 0, errCode);
787             return errCode;
788         }
789     }
790     return errCode;
791 }
792 
CleanShareTable(const std::string & tableName)793 int SQLiteSingleVerRelationalStorageExecutor::CleanShareTable(const std::string &tableName)
794 {
795     int ret = E_OK;
796     int errCode = E_OK;
797     std::string delDataSql = "DELETE FROM '" + tableName + "';";
798     sqlite3_stmt *statement = nullptr;
799     errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
800     if (errCode != E_OK) {
801         LOGE("get clean shared data stmt failed %d.", errCode);
802         return errCode;
803     }
804     errCode = SQLiteUtils::StepWithRetry(statement);
805     SQLiteUtils::ResetStatement(statement, true, ret);
806     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
807         errCode = E_OK;
808     } else {
809         LOGE("clean shared data failed: %d.", errCode);
810         return errCode;
811     }
812     statement = nullptr;
813     std::string delLogSql = "DELETE FROM '" + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log';";
814     errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
815     if (errCode != E_OK) {
816         LOGE("get clean shared log stmt failed %d.", errCode);
817         return errCode;
818     }
819     errCode = SQLiteUtils::StepWithRetry(statement);
820     SQLiteUtils::ResetStatement(statement, true, ret);
821     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
822         errCode = E_OK;
823     } else {
824         LOGE("clean shared log failed: %d.", errCode);
825         return errCode;
826     }
827     // here errCode must be E_OK, just return ret
828     return ret;
829 }
830 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)831 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
832     const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
833     std::map<int64_t, Entries> &referenceGid)
834 {
835     int errCode = E_OK;
836     for (const auto &[targetTable, targetReference] : tableReference) {
837         errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
838         if (errCode != E_OK) {
839             LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
840             return errCode;
841         }
842     }
843     return errCode;
844 }
845 
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)846 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
847     const std::string &targetTable, const CloudSyncBatch &syncBatch,
848     const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
849 {
850     auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
851     if (sourceFields.empty()) {
852         LOGD("[RDBExecutor] source field is empty.");
853         return E_OK;
854     }
855     if (sourceFields.size() != targetFields.size()) {
856         LOGE("[RDBExecutor] reference field size not equal.");
857         return -E_INTERNAL_ERROR;
858     }
859     std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
860     sqlite3_stmt *statement = nullptr;
861     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
862     if (errCode != E_OK) {
863         LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
864         return errCode;
865     }
866     errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
867     int ret = E_OK;
868     SQLiteUtils::ResetStatement(statement, true, ret);
869     return errCode == E_OK ? ret : errCode;
870 }
871 
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)872 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
873     const std::string &targetTable, const std::vector<std::string> &sourceFields,
874     const std::vector<std::string> &targetFields)
875 {
876     // sql like this:
877     // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
878     //   (SELECT parent._rowid_ AS rowid_b FROM parent,
879     //     (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
880     //     WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
881     //   WHERE parent.name = source_a.name ) temp_table
882     // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
883     std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
884     std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
885     std::string sql;
886     sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
887     sql += "(";
888     sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
889            + ", ";
890     sql += "(SELECT " + sourceTable + "._rowid_,";
891     std::set<std::string> sourceFieldSet;
892     for (const auto &item : sourceFields) {
893         sourceFieldSet.insert(item);
894     }
895     for (const auto &sourceField : sourceFieldSet) {
896         sql += sourceField + ",";
897     }
898     sql.pop_back();
899     sql += " FROM " + sourceTable + ", " + logSourceTable;
900     sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
901     sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
902     sql += " WHERE ";
903     for (size_t i = 0u; i < sourceFields.size(); ++i) {
904         if (i != 0u) {
905             sql += " AND ";
906         }
907         sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
908     }
909     sql += ") temp_table ";
910     sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
911     sql += " AND " + logTargetTable + ".flag&0x08=0x00";
912     return sql;
913 }
914 
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)915 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
916     const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
917 {
918     int errCode = E_OK;
919     if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
920         LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
921             syncBatch.timestamp.size());
922         return -E_INVALID_ARGS;
923     }
924     int matchCount = 0;
925     for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
926         errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
927         if (errCode != E_OK) {
928             LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
929             break;
930         }
931         errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
932         if (errCode != E_OK) {
933             LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
934             break;
935         }
936         while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
937             std::string gid;
938             (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
939             if (gid.empty()) {
940                 LOGE("[RDBExecutor] reference data don't contain gid.");
941                 errCode = -E_CLOUD_ERROR;
942                 break;
943             }
944             referenceGid[syncBatch.rowid[i]][targetTable] = gid;
945             matchCount++;
946         }
947         if (errCode == -E_FINISHED) {
948             errCode = E_OK;
949         }
950         if (errCode != E_OK) {
951             LOGE("[RDBExecutor] step stmt failed %d.", errCode);
952             break;
953         }
954         SQLiteUtils::ResetStatement(statement, false, errCode);
955         if (errCode != E_OK) {
956             LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
957             break;
958         }
959     }
960     if (matchCount != 0) {
961         LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
962     }
963     return errCode;
964 }
965 
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)966 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
967     const std::vector<TableReferenceProperty> &targetTableReference)
968 {
969     PairStringVector sourceTargetFiled;
970     for (const auto &reference : targetTableReference) {
971         for (const auto &column : reference.columns) {
972             sourceTargetFiled.first.push_back(column.first);
973             sourceTargetFiled.second.push_back(column.second);
974         }
975     }
976     return sourceTargetFiled;
977 }
978 
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)979 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
980     bool ignoreEmptyGid, sqlite3_stmt *&stmt)
981 {
982     int fillGidCount = 0;
983     int errCode = E_OK;
984     for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
985         auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
986         if (gidEntry == cloudDataResult.insData.extend[i].end()) {
987             bool isSkipAssetsMissRecord = false;
988             if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
989                 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
990                 isSkipAssetsMissRecord = true;
991             }
992             if (ignoreEmptyGid || isSkipAssetsMissRecord) {
993                 continue;
994             }
995             errCode = -E_INVALID_ARGS;
996             LOGE("[RDBExecutor] Extend not contain gid.");
997             break;
998         }
999         bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
1000         if (ignoreEmptyGid && containError) {
1001             continue;
1002         }
1003         std::string val;
1004         if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
1005             cloudDataResult.insData.extend[i], val) != E_OK) {
1006             errCode = -E_CLOUD_ERROR;
1007             LOGE("[RDBExecutor] Can't get string gid from extend.");
1008             break;
1009         }
1010         if (val.empty()) {
1011             errCode = -E_CLOUD_ERROR;
1012             LOGE("[RDBExecutor] Get empty gid from extend.");
1013             break;
1014         }
1015         errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
1016         if (errCode != E_OK) {
1017             LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
1018             break;
1019         }
1020     }
1021     LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
1022     return errCode;
1023 }
1024 
CleanExtendAndCursorForDeleteData(const std::string & tableName)1025 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
1026 {
1027     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1028     std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
1029     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1030     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1031         LOGE("update extend field and cursor failed %d.", errCode);
1032     }
1033     return errCode;
1034 }
1035 
CheckIfExistUserTable(const std::string & tableName)1036 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
1037 {
1038     std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
1039     sqlite3_stmt *statement = nullptr;
1040     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1041     if (errCode != E_OK) {
1042         LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
1043         return errCode;
1044     }
1045     errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
1046     if (errCode != E_OK) {
1047         LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
1048         SQLiteUtils::ResetStatement(statement, true, errCode);
1049         return errCode;
1050     }
1051     if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1052         LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
1053         SQLiteUtils::ResetStatement(statement, true, errCode);
1054         return -E_INVALID_ARGS;
1055     }
1056     SQLiteUtils::ResetStatement(statement, true, errCode);
1057     return E_OK;
1058 }
1059 
GetCloudDeleteSql(const std::string & table,std::string & sql)1060 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table, std::string &sql)
1061 {
1062     uint64_t cursor = DBConstant::INVALID_CURSOR;
1063     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, table, cursor);
1064     if (errCode != E_OK) {
1065         LOGE("[GetCloudDeleteSql] Get cursor of table[%s length[%u]] failed: %d",
1066             DBCommon::StringMiddleMasking(table).c_str(), table.length(), errCode);
1067         return errCode;
1068     }
1069     sql += " cloud_gid = '', version = '', ";
1070     if (isLogicDelete_) {
1071         // cursor already increased by DeleteCloudData, can be assigned directly here
1072         // 1001 which is logicDelete|cloudForcePush|local|delete
1073         sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1074             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1075             static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = " + std::to_string(cursor) + " ";
1076     } else {
1077         sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1078             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1079         errCode = SetCursor(table, cursor + 1);
1080         if (errCode == E_OK) {
1081             sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1082         } else {
1083             LOGE("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1084             return errCode;
1085         }
1086     }
1087     return E_OK;
1088 }
1089 
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1090 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1091 {
1092     int errCode = E_OK;
1093     std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1094         std::to_string(dataKey);
1095     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1096     if (errCode != E_OK) {
1097         LOGE("[RDBExecutor] remove data failed %d", errCode);
1098         return errCode;
1099     }
1100     std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1101         std::to_string(dataKey);
1102     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1103     if (errCode != E_OK) {
1104         LOGE("[RDBExecutor] remove log failed %d", errCode);
1105     }
1106     return errCode;
1107 }
1108 
GetLocalDataKey(size_t index,const DownloadData & downloadData)1109 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1110     const DownloadData &downloadData)
1111 {
1112     if (index >= downloadData.existDataKey.size()) {
1113         LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1114         return -1; // -1 means not exist
1115     }
1116     return downloadData.existDataKey[index];
1117 }
1118 
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1119 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1120     sqlite3_stmt *&stmt, int &fillGidCount)
1121 {
1122     int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1123     if (errCode != E_OK) {
1124         return errCode;
1125     }
1126     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1127     if (errCode != E_OK) {
1128         return errCode;
1129     }
1130     errCode = SQLiteUtils::StepWithRetry(stmt, false);
1131     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1132         errCode = E_OK;
1133         fillGidCount++;
1134         SQLiteUtils::ResetStatement(stmt, false, errCode);
1135     } else {
1136         LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1137     }
1138     return errCode;
1139 }
1140 
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1141 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1142     const TableInfo &tableInfo, TableSyncType syncType)
1143 {
1144     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1145     return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1146 }
1147 
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1148 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1149     const RelationalSchemaObject &localSchema)
1150 {
1151     std::vector<int64_t> dataKeys;
1152     std::string logTableName = DBCommon::GetLogTableName(tableName);
1153     int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1154     if (errCode != E_OK) {
1155         LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1156         return errCode;
1157     }
1158     std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1159     errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1160     if (errCode != E_OK) {
1161         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1162     }
1163     return errCode;
1164 }
1165 
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1166 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1167     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1168 {
1169     int errCode = E_OK;
1170     for (const auto &fieldInfo : fieldInfos) {
1171         if (fieldInfo.IsAssetType()) {
1172             Assets assets;
1173             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1174             if (errCode != E_OK) {
1175                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1176                 return errCode;
1177             }
1178             errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1179             if (errCode != E_OK) {
1180                 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1181                 return errCode;
1182             }
1183         } else if (fieldInfo.IsAssetsType()) {
1184             errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1185             if (errCode != E_OK) {
1186                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1187                 return errCode;
1188             }
1189         }
1190     }
1191     return errCode;
1192 }
1193 
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1194 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1195     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1196 {
1197     if (assets.empty()) { // LCOV_EXCL_BR_LINE
1198         return E_OK;
1199     }
1200     int errCode = E_OK;
1201     int ret = E_OK;
1202     sqlite3_stmt *stmt = nullptr;
1203     size_t index = 0;
1204     for (const auto &rowId : dataKeys) {
1205         if (rowId == -1) { // -1 means data is deleted
1206             continue;
1207         }
1208         if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1209             index++;
1210             continue;
1211         }
1212         std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1213             std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1214         errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1215         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1216             LOGE("Get statement failed, %d", errCode);
1217             return errCode;
1218         }
1219         assets[index].assetId = "";
1220         assets[index].status &= ~AssetStatus::UPLOADING;
1221         errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1222         index++;
1223         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1224             LOGE("Bind asset to blob statement failed, %d", errCode);
1225             goto END;
1226         }
1227         errCode = SQLiteUtils::StepWithRetry(stmt);
1228         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1229             errCode = E_OK;
1230         } else {
1231             LOGE("Step statement failed, %d", errCode);
1232             goto END;
1233         }
1234         SQLiteUtils::ResetStatement(stmt, true, ret);
1235     }
1236     return errCode != E_OK ? errCode : ret;
1237 END:
1238     SQLiteUtils::ResetStatement(stmt, true, ret);
1239     return errCode != E_OK ? errCode : ret;
1240 }
1241 
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1242 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1243     const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1244 {
1245     int errCode = E_OK;
1246     int ret = E_OK;
1247     sqlite3_stmt *selectStmt = nullptr;
1248     for (const auto &rowId : dataKeys) {
1249         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1250             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1251         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1252         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1253             LOGE("Get select assets statement failed, %d.", errCode);
1254             goto END;
1255         }
1256         Assets assets;
1257         errCode = GetAssetsByRowId(selectStmt, assets);
1258         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1259             LOGE("Get assets by rowId failed, %d.", errCode);
1260             goto END;
1261         }
1262         SQLiteUtils::ResetStatement(selectStmt, true, ret);
1263         if (assets.empty()) { // LCOV_EXCL_BR_LINE
1264             continue;
1265         }
1266         for (auto &asset : assets) {
1267             asset.assetId = "";
1268             asset.status &= ~AssetStatus::UPLOADING;
1269         }
1270         std::vector<uint8_t> assetsValue;
1271         errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1272         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1273             LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1274             return errCode;
1275         }
1276         errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1277         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1278             LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1279             return errCode;
1280         }
1281     }
1282     return errCode != E_OK ? errCode : ret;
1283 END:
1284     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1285     return errCode != E_OK ? errCode : ret;
1286 }
1287 
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1288 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1289     const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1290 {
1291     std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1292         std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1293     sqlite3_stmt *stmt = nullptr;
1294     int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1295     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1296         LOGE("Get statement failed, %d", errCode);
1297         SQLiteUtils::ResetStatement(stmt, true, errCode);
1298         return errCode;
1299     }
1300     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1301     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1302         SQLiteUtils::ResetStatement(stmt, true, errCode);
1303         return errCode;
1304     }
1305     errCode = SQLiteUtils::StepWithRetry(stmt);
1306     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1307         errCode = E_OK;
1308     }
1309     SQLiteUtils::ResetStatement(stmt, true, errCode);
1310     return errCode;
1311 }
1312 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1313 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1314     const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1315 {
1316     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1317     auto &[errCode, status] = res;
1318     std::vector<Field> assetFields;
1319     std::string sql = "SELECT";
1320     for (const auto &field: tableSchema.fields) {
1321         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1322             assetFields.emplace_back(field);
1323             sql += " b." + field.colName + ",";
1324         }
1325     }
1326     if (assetFields.empty()) {
1327         return { -E_NOT_FOUND, status };
1328     }
1329     sql += "a.cloud_gid, a.status ";
1330     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1331         (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR  a.hash_key = ?);");
1332     sqlite3_stmt *stmt = nullptr;
1333     errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1334     if (errCode != E_OK) {
1335         return res;
1336     }
1337     errCode = SQLiteUtils::StepWithRetry(stmt);
1338     int index = 0;
1339     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1340         for (const auto &field: assetFields) {
1341             Type cloudValue;
1342             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1343             if (errCode != E_OK) {
1344                 break;
1345             }
1346             errCode = PutVBucketByType(assets, field, cloudValue);
1347             if (errCode != E_OK) {
1348                 break;
1349             }
1350         }
1351         std::string curGid;
1352         errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1353         if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1354             // Gid is different, there may be duplicate primary keys in the cloud
1355             errCode = -E_CLOUD_GID_MISMATCH;
1356         }
1357         status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1358     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1359         errCode = -E_NOT_FOUND;
1360     } else {
1361         LOGE("step get asset stmt failed %d.", errCode);
1362     }
1363     SQLiteUtils::ResetStatement(stmt, true, errCode);
1364     return res;
1365 }
1366 
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1367 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1368     const Bytes &hashKey, sqlite3_stmt *&stmt)
1369 {
1370     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1371     if (errCode != E_OK) {
1372         LOGE("Get asset statement failed, %d.", errCode);
1373         return errCode;
1374     }
1375     int index = 1;
1376     if (!gid.empty()) {
1377         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1378         if (errCode != E_OK) {
1379             LOGE("bind gid failed %d.", errCode);
1380             SQLiteUtils::ResetStatement(stmt, true, errCode);
1381             return errCode;
1382         }
1383     }
1384     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1385     if (errCode != E_OK) {
1386         LOGE("bind hash failed %d.", errCode);
1387         SQLiteUtils::ResetStatement(stmt, true, errCode);
1388     }
1389     return errCode;
1390 }
1391 
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1392 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1393     bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1394 {
1395     int errCode = E_OK;
1396     switch (opType) {
1397         case OpType::UPDATE_VERSION: // fallthrough
1398         case OpType::INSERT_VERSION: {
1399             errCode = FillCloudVersionForUpload(opType, data);
1400             break;
1401         }
1402         case OpType::SET_UPLOADING: {
1403             errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1404             if (errCode != E_OK) {
1405                 LOGE("Failed to set uploading for ins data, %d.", errCode);
1406                 return errCode;
1407             }
1408             errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1409             break;
1410         }
1411         case OpType::INSERT: {
1412             errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1413             if (errCode != E_OK) {
1414                 LOGE("Failed to fill cloud log gid, %d.", errCode);
1415                 return errCode;
1416             }
1417             if (fillAsset) {
1418                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1419                 if (errCode != E_OK) {
1420                     LOGE("Failed to fill asset for ins, %d.", errCode);
1421                     return errCode;
1422                 }
1423             }
1424             errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1425             break;
1426         }
1427         case OpType::UPDATE: {
1428             if (fillAsset && !data.updData.assets.empty()) {
1429                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1430                 if (errCode != E_OK) {
1431                     LOGE("Failed to fill asset for upd, %d.", errCode);
1432                     return errCode;
1433                 }
1434             }
1435             errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1436             break;
1437         }
1438         default:
1439             break;
1440     }
1441     return errCode;
1442 }
1443 
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1444 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1445 {
1446     int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1447     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1448         std::vector<uint8_t> blobValue;
1449         errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1450         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1451             LOGE("Get column blob value failed %d.", errCode);
1452             return errCode;
1453         }
1454         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1455         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1456             LOGE("Transfer blob to assets failed %d", errCode);
1457         }
1458         return errCode;
1459     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1460         return E_OK;
1461     } else {
1462         LOGE("Step select statement failed %d.", errCode);
1463         return errCode;
1464     }
1465 }
1466 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1467 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1468 {
1469     assetLoader_ = loader;
1470 }
1471 
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1472 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1473     int beginIndex, const std::string &cloudGid)
1474 {
1475     int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1476     if (errCode != E_OK) {
1477         LOGE("Bind cloud gid to statement failed %d.", errCode);
1478         int ret = E_OK;
1479         SQLiteUtils::ResetStatement(stmt, true, ret);
1480         return errCode;
1481     }
1482     errCode = SQLiteUtils::StepWithRetry(stmt);
1483     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1484         errCode = E_OK;
1485     } else {
1486         LOGE("Fill cloud asset failed: %d.", errCode);
1487     }
1488     int ret = E_OK;
1489     SQLiteUtils::ResetStatement(stmt, true, ret);
1490     return errCode != E_OK ? errCode : ret;
1491 }
1492 
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1493 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1494     const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1495 {
1496     if (assetLoader_ == nullptr) {
1497         LOGE("assetLoader may be not set.");
1498         return -E_NOT_SET;
1499     }
1500     std::vector<Asset> toDeleteAssets;
1501     CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1502     if (toDeleteAssets.empty()) {
1503         return E_OK;
1504     }
1505     DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1506     if (ret != OK) {
1507         LOGE("remove local assets failed %d.", ret);
1508         return -E_REMOVE_ASSETS_FAILED;
1509     }
1510     return E_OK;
1511 }
1512 
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1513 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1514     const VBucket &assets, sqlite3_stmt *&statement)
1515 {
1516     std::string sql = "UPDATE '" + tableName + "' SET ";
1517     for (const auto &item: assets) {
1518         sql += item.first + " = ?,";
1519     }
1520     sql.pop_back();
1521     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1522     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1523     if (errCode != E_OK) {
1524         return errCode;
1525     }
1526     int bindIndex = 1;
1527     for (const auto &item: assets) {
1528         Field field = {
1529             .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1530         };
1531         errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1532         if (errCode != E_OK) {
1533             return errCode;
1534         }
1535     }
1536     return errCode;
1537 }
1538 
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1539 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1540     const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1541 {
1542     if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1543         opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1544         return E_OK;
1545     }
1546     if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1547         // this is shared table, not need to update asset id.
1548         return E_OK;
1549     }
1550     bool isNotIncCursor = false; // if isNotIncCursor is false, will increase cursor
1551     if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket, isNotIncCursor)) {
1552         return E_OK;
1553     }
1554     int error = SetCursorIncFlag(!isNotIncCursor);
1555     if (error != E_OK) {
1556         LOGE("[Storage Executor] failed to set cursor_inc_flag, %d.", error);
1557         return error;
1558     }
1559     int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1560     if (errCode != E_OK) {
1561         LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1562     }
1563     if (isNotIncCursor) {
1564         error = SetCursorIncFlag(true);
1565         if (error != E_OK) {
1566             LOGE("[Storage Executor] failed to set cursor_inc_flag true, %d.", error);
1567             return error;
1568         }
1569     }
1570     return errCode;
1571 }
1572 
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1573 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1574     Asset &asset)
1575 {
1576     for (const auto &[col, value] : vBucket) {
1577         if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1578             asset = std::get<Asset>(value);
1579         }
1580     }
1581 }
1582 
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1583 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1584     Assets &assets)
1585 {
1586     for (const auto &[col, value] : vBucket) {
1587         if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1588             assets = std::get<Assets>(value);
1589         }
1590     }
1591 }
1592 
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1593 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1594     sqlite3_stmt *&stmt)
1595 {
1596     std::vector<uint8_t> blobValue;
1597     int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1598     if (errCode != E_OK) {
1599         LOGE("Transfer asset to blob failed, %d.", errCode);
1600         return errCode;
1601     }
1602     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1603     if (errCode != E_OK) {
1604         LOGE("Bind asset blob to statement failed, %d.", errCode);
1605     }
1606     return errCode;
1607 }
1608 
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1609 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1610     sqlite3_stmt *&stmt)
1611 {
1612     std::vector<uint8_t> blobValue;
1613     int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1614     if (errCode != E_OK) {
1615         LOGE("Transfer asset to blob failed, %d.", errCode);
1616         return errCode;
1617     }
1618     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1619     if (errCode != E_OK) {
1620         LOGE("Bind asset blob to statement failed, %d.", errCode);
1621     }
1622     return errCode;
1623 }
1624 
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1625 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1626     const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1627 {
1628     int assetIndex = 0;
1629     int assetsIndex = 0;
1630     for (const auto &field : tableSchema.fields) {
1631         if (field.type == TYPE_INDEX<Asset>) {
1632             if (assetOfOneRecord[assetIndex].name.empty()) {
1633                 continue;
1634             }
1635             int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1636             if (errCode != E_OK) {
1637                 LOGE("Bind asset to blob statement failed, %d.", errCode);
1638                 return errCode;
1639             }
1640             assetIndex++;
1641         } else if (field.type == TYPE_INDEX<Assets>) {
1642             if (assetsOfOneRecord[assetsIndex].empty()) {
1643                 continue;
1644             }
1645             int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1646             if (errCode != E_OK) {
1647                 LOGE("Bind assets to blob statement failed, %d.", errCode);
1648                 return errCode;
1649             }
1650             assetsIndex++;
1651         }
1652     }
1653     return E_OK;
1654 }
1655 
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1656 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1657     const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1658 {
1659     int errCode = E_OK;
1660     int ret = E_OK;
1661     sqlite3_stmt *stmt = nullptr;
1662     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1663     if (errCode != E_OK) {
1664         LOGE("Get update asset statement failed, %d.", errCode);
1665         return errCode;
1666     }
1667     errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1668     if (errCode != E_OK) {
1669         LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1670         SQLiteUtils::ResetStatement(stmt, true, ret);
1671         return errCode != E_OK ? errCode : ret;
1672     }
1673     errCode = SQLiteUtils::StepWithRetry(stmt);
1674     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1675         errCode = E_OK;
1676     } else {
1677         LOGE("Step statement failed, %d", errCode);
1678     }
1679     SQLiteUtils::ResetStatement(stmt, true, ret);
1680     return errCode != E_OK ? errCode : ret;
1681 }
1682 
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1683 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1684     const VBucket &vBucket)
1685 {
1686     int errCode = E_OK;
1687     std::vector<Asset> assetOfOneRecord;
1688     std::vector<Assets> assetsOfOneRecord;
1689     std::string updateAssetIdSql = "UPDATE " + tableSchema.name  + " SET";
1690     for (const auto &field : tableSchema.fields) {
1691         if (field.type == TYPE_INDEX<Asset>) {
1692             Asset asset;
1693             UpdateLocalAssetId(vBucket, field.colName, asset);
1694             assetOfOneRecord.push_back(asset);
1695             if (!asset.name.empty()) {
1696                 updateAssetIdSql += " " + field.colName + " = ?,";
1697             }
1698         }
1699         if (field.type == TYPE_INDEX<Assets>) {
1700             Assets assets;
1701             UpdateLocalAssetsId(vBucket, field.colName, assets);
1702             assetsOfOneRecord.push_back(assets);
1703             if (!assets.empty()) {
1704                 updateAssetIdSql += " " + field.colName + " = ?,";
1705             }
1706         }
1707     }
1708     if (updateAssetIdSql == "UPDATE " + tableSchema.name  + " SET") {
1709         return E_OK;
1710     }
1711     updateAssetIdSql.pop_back();
1712     updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1713     errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1714     if (errCode != E_OK) {
1715         LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1716     }
1717     return errCode;
1718 }
1719 
SetPutDataMode(PutDataMode mode)1720 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1721 {
1722     putDataMode_ = mode;
1723 }
1724 
SetMarkFlagOption(MarkFlagOption option)1725 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1726 {
1727     markFlagOption_ = option;
1728 }
1729 
GetDataFlag()1730 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1731 {
1732     if (putDataMode_ != PutDataMode::USER) {
1733         return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1734             static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1735     }
1736     uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1737     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1738         flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1739     }
1740     flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1741     return static_cast<int64_t>(flag);
1742 }
1743 
GetUpdateDataFlagSql(const VBucket & data)1744 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql(const VBucket &data)
1745 {
1746     std::string retentionFlag = "flag = flag & " +
1747         std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY) |
1748         static_cast<uint32_t>(LogInfoFlag::FLAG_ASSET_DOWNLOADING_FOR_ASYNC));
1749     if (putDataMode_ == PutDataMode::SYNC) {
1750         if (CloudStorageUtils::IsAssetsContainDownloadRecord(data)) {
1751             return retentionFlag;
1752         }
1753         return UPDATE_FLAG_CLOUD;
1754     }
1755     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1756         return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1757     }
1758     return retentionFlag;
1759 }
1760 
GetDev()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1762 {
1763     return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1764 }
1765 
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1766 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1767     const TableSchema &tableSchema)
1768 {
1769     std::set<std::string> useFields;
1770     std::vector<Field> fields;
1771     if (putDataMode_ == PutDataMode::SYNC) {
1772         for (const auto &field : tableSchema.fields) {
1773             useFields.insert(field.colName);
1774         }
1775         fields = tableSchema.fields;
1776     } else {
1777         for (const auto &field : vBucket) {
1778             if (field.first.empty() || field.first[0] == '#') {
1779                 continue;
1780             }
1781             useFields.insert(field.first);
1782         }
1783         for (const auto &field : tableSchema.fields) {
1784             if (useFields.find(field.colName) == useFields.end()) {
1785                 continue;
1786             }
1787             fields.push_back(field);
1788         }
1789     }
1790     return fields;
1791 }
1792 
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1793 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1794     const LogInfo &logInfo)
1795 {
1796     bool useHashKey = false;
1797     if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1798         if (logInfo.hashKey.empty()) {
1799             LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1800             return -E_INVALID_ARGS;
1801         }
1802         useHashKey = true;
1803     }
1804     sqlite3_stmt *stmt = nullptr;
1805     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1806     if (errCode != E_OK) {
1807         LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1808         return errCode;
1809     }
1810     int ret = E_OK;
1811     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1812     if (errCode != E_OK) {
1813         LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1814         SQLiteUtils::ResetStatement(stmt, true, ret);
1815         return errCode;
1816     }
1817     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1818     if (errCode != E_OK) {
1819         LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1820         SQLiteUtils::ResetStatement(stmt, true, ret);
1821         return errCode;
1822     }
1823     if (useHashKey) {
1824         errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1825         if (errCode != E_OK) {
1826             LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1827             SQLiteUtils::ResetStatement(stmt, true, ret);
1828             return errCode;
1829         }
1830     }
1831     errCode = SQLiteUtils::StepWithRetry(stmt);
1832     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1833         errCode = E_OK;
1834     } else {
1835         LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1836     }
1837     SQLiteUtils::ResetStatement(stmt, true, ret);
1838     return errCode == E_OK ? ret : errCode;
1839 }
1840 
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp,bool isExistAssetsDownload)1841 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1842     const Key &hashKey, Timestamp timestamp, bool isExistAssetsDownload)
1843 {
1844     sqlite3_stmt *stmt = nullptr;
1845     int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName,
1846         isExistAssetsDownload), stmt);
1847     int index = 1;
1848     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1849     if (errCode != E_OK) {
1850         SQLiteUtils::ResetStatement(stmt, true, errCode);
1851         LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1852         return;
1853     }
1854     errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1855     if (errCode != E_OK) {
1856         SQLiteUtils::ResetStatement(stmt, true, errCode);
1857         LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1858         return;
1859     }
1860     errCode = SQLiteUtils::StepWithRetry(stmt);
1861     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1862         errCode = E_OK;
1863     } else {
1864         LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1865     }
1866     SQLiteUtils::ResetStatement(stmt, true, errCode);
1867 }
1868 
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data,bool isQueryDownloadRecords)1869 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1870     std::vector<VBucket> &data, bool isQueryDownloadRecords)
1871 {
1872     std::string sql = "SELECT ";
1873     std::vector<Field> pkFields;
1874     for (const auto &field : table.fields) {
1875         if (!field.primary) {
1876             continue;
1877         }
1878         sql += "b." + field.colName + ",";
1879         pkFields.push_back(field);
1880     }
1881     if (pkFields.empty()) {
1882         // ignore no pk table
1883         return E_OK;
1884     }
1885     sql.pop_back();
1886     if (isQueryDownloadRecords) {
1887         sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " +
1888             FLAG_IS_WAIT_COMPENSATED_CONTAIN_DOWNLOAD_SYNC;
1889     } else {
1890         sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1891     }
1892     sqlite3_stmt *stmt = nullptr;
1893     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1894     if (errCode != E_OK) {
1895         LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1896         return errCode;
1897     }
1898     do {
1899         errCode = SQLiteUtils::StepWithRetry(stmt);
1900         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1901             VBucket pkData;
1902             errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1903             if (errCode != E_OK) {
1904                 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1905                 break;
1906             }
1907             data.push_back(pkData);
1908         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1909             errCode = E_OK;
1910             break;
1911         } else {
1912             LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1913             break;
1914         }
1915     } while (errCode == E_OK);
1916     int ret = E_OK;
1917     SQLiteUtils::ResetStatement(stmt, true, ret);
1918     return errCode == E_OK ? ret : errCode;
1919 }
1920 
ClearUnLockingStatus(const std::string & tableName)1921 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1922 {
1923     std::string sql;
1924     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1925         "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1926     sqlite3_stmt *stmt = nullptr;
1927     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1928     if (errCode != E_OK) {
1929         LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1930         return errCode;
1931     }
1932     int ret = E_OK;
1933     errCode = SQLiteUtils::StepWithRetry(stmt);
1934     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1935         errCode = E_OK;
1936     } else {
1937         LOGE("[Storage Executor] Step update record status stmt failed, %d.", errCode);
1938     }
1939     SQLiteUtils::ResetStatement(stmt, true, ret);
1940     return errCode == E_OK ? ret : errCode;
1941 }
1942 
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1943 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1944     int startIndex, VBucket &record)
1945 {
1946     int errCode = E_OK;
1947     for (const auto &field : fields) {
1948         Type cloudValue;
1949         errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1950         if (errCode != E_OK) {
1951             break;
1952         }
1953         errCode = PutVBucketByType(record, field, cloudValue);
1954         if (errCode != E_OK) {
1955             break;
1956         }
1957         startIndex++;
1958     }
1959     return errCode;
1960 }
1961 
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt,int & index)1962 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1963     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt, int &index)
1964 {
1965     int errCode = E_OK;
1966     std::string version;
1967     if (putDataMode_ == PutDataMode::SYNC) {
1968         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1969         if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1970             LOGE("get version for insert log statement failed, %d", errCode);
1971             return -E_CLOUD_ERROR;
1972         }
1973     }
1974     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, version); // next is version
1975     if (errCode != E_OK) {
1976         LOGE("Bind version to insert log statement failed, %d", errCode);
1977         return errCode;
1978     }
1979 
1980     std::string shareUri;
1981     if (putDataMode_ == PutDataMode::SYNC) {
1982         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1983             vBucket, shareUri);
1984         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1985             LOGE("get shareUri for insert log statement failed, %d", errCode);
1986             return -E_CLOUD_ERROR;
1987         }
1988     }
1989 
1990     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, shareUri); // next is sharing_resource
1991     if (errCode != E_OK) {
1992         LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1993     }
1994     return errCode;
1995 }
1996 
CheckAndCreateTrigger(const TableInfo & table)1997 void SQLiteSingleVerRelationalStorageExecutor::CheckAndCreateTrigger(const TableInfo &table)
1998 {
1999     auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
2000     tableManager->CheckAndCreateTrigger(dbHandle_, table, "");
2001 }
2002 } // namespace DistributedDB
2003 #endif
2004