1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36 std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38 if (assetFields.empty() && pkSet.empty()) {
39 return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40 }
41 std::string gid;
42 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44 LOGE("Get cloud gid fail when query log table.");
45 return errCode;
46 }
47
48 if (pkSet.empty() && gid.empty()) {
49 LOGE("query log table failed because of both primary key and gid are empty.");
50 return -E_CLOUD_ERROR;
51 }
52 std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53 " a.cloud_gid, a.sharing_resource, a.status, a.version";
54 for (const auto &field : assetFields) {
55 sql += ", b." + field.colName;
56 }
57 for (const auto &pk : pkSet) {
58 sql += ", b." + pk;
59 }
60 sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61 if (!gid.empty()) {
62 sql += " a.cloud_gid = ? or ";
63 }
64 sql += "a.hash_key = ?";
65 querySql = sql;
66 return E_OK;
67 }
68
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70 const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72 std::string sql = "UPDATE " + tableName + " SET ";
73 for (const auto &field: fields) {
74 sql += field.colName + " = ?,";
75 }
76 sql.pop_back();
77 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78 sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79 sqlite3_stmt *stmt = nullptr;
80 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81 if (errCode != E_OK) {
82 LOGE("Get fill asset statement failed, %d.", errCode);
83 return errCode;
84 }
85 for (size_t i = 0; i < fields.size(); ++i) {
86 errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87 if (errCode != E_OK) {
88 SQLiteUtils::ResetStatement(stmt, true, errCode);
89 return errCode;
90 }
91 }
92 statement = stmt;
93 return errCode;
94 }
95
CleanDownloadingFlagByGid(const std::string & tableName,const std::string & gid,VBucket dbAssets)96 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlagByGid(const std::string &tableName,
97 const std::string &gid, VBucket dbAssets)
98 {
99 if (CloudStorageUtils::IsAssetsContainDownloadRecord(dbAssets)) {
100 return E_OK;
101 }
102 std::string sql;
103 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000) where cloud_gid = ?;";
104 sqlite3_stmt *stmt = nullptr;
105 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
106 if (errCode != E_OK) {
107 LOGE("[RDBExecutor]Get stmt failed clean downloading flag:%d, tableName:%s, length:%zu",
108 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
109 return errCode;
110 }
111 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
112 if (errCode != E_OK) {
113 LOGE("[RDBExecutor]bind gid failed when clean downloading flag:%d, tableName:%s, length:%zu",
114 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
115 SQLiteUtils::ResetStatement(stmt, true, errCode);
116 return errCode;
117 }
118 errCode = SQLiteUtils::StepWithRetry(stmt);
119 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
120 errCode = E_OK;
121 } else {
122 LOGE("[RDBExecutor]clean downloading flag failed:%d, tableName:%s, length:%zu",
123 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
124 }
125 int ret = E_OK;
126 SQLiteUtils::ResetStatement(stmt, true, ret);
127 if (ret != E_OK) {
128 LOGE("[RDBExecutor]reset stmt when clean downloading flag:%d, tableName:%s, length:%zu",
129 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
130 }
131 return errCode != E_OK ? errCode : ret;
132 }
133
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess,uint64_t & currCursor)134 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
135 VBucket &vBucket, bool isDownloadSuccess, uint64_t &currCursor)
136 {
137 std::string cloudGid;
138 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
139 if (errCode != E_OK) {
140 LOGE("Miss gid when fill Asset.");
141 return errCode;
142 }
143 std::vector<Field> assetsField;
144 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
145 if (errCode != E_OK) {
146 LOGE("No assets need to be filled.");
147 return errCode;
148 }
149 CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
150
151 Bytes hashKey;
152 (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
153 VBucket dbAssets;
154 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
155 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
156 LOGE("get assets by gid or hashkey failed %d.", errCode);
157 return errCode;
158 }
159 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
160 AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
161
162 if (isDownloadSuccess) {
163 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
164 CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
165 errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid, currCursor);
166 if (errCode != E_OK) {
167 return errCode;
168 }
169 } else {
170 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
171 CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
172 }
173 sqlite3_stmt *stmt = nullptr;
174 errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
175 if (errCode != E_OK) {
176 return errCode;
177 }
178 errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
179 if (errCode != E_OK) {
180 return errCode;
181 }
182 errCode = CleanDownloadingFlagByGid(tableSchema.name, cloudGid, dbAssets);
183 int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
184 return errCode == E_OK ? ret : errCode;
185 }
186
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid,uint64_t & currCursor)187 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
188 const std::string &gid, uint64_t &currCursor)
189 {
190 uint64_t cursor = DBConstant::INVALID_CURSOR;
191 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
192 if (errCode != E_OK) {
193 LOGE("Get cursor of table[%s length[%u]] failed when increase cursor: %d, gid[%s]",
194 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode, gid.c_str());
195 return errCode;
196 }
197 cursor++;
198 std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
199 sql += " SET cursor = ? where cloud_gid = ?;";
200 sqlite3_stmt *statement = nullptr;
201 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
202 if (errCode != E_OK) {
203 LOGE("get update asset data cursor stmt failed %d.", errCode);
204 return errCode;
205 }
206 ResFinalizer finalizer([statement]() {
207 sqlite3_stmt *statementInner = statement;
208 int ret = E_OK;
209 SQLiteUtils::ResetStatement(statementInner, true, ret);
210 if (ret != E_OK) {
211 LOGW("Reset stmt failed %d when increase cursor on asset data", ret);
212 }
213 });
214 int index = 1;
215 errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
216 if (errCode != E_OK) {
217 LOGE("bind cursor data stmt failed %d.", errCode);
218 return errCode;
219 }
220 errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
221 if (errCode != E_OK) {
222 LOGE("bind cursor gid data stmt failed %d.", errCode);
223 return errCode;
224 }
225 errCode = SQLiteUtils::StepWithRetry(statement, false);
226 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
227 LOGE("Fill upload asset failed:%d.", errCode);
228 return errCode;
229 }
230 currCursor = cursor;
231 errCode = SetCursor(tableName, cursor);
232 if (errCode != E_OK) {
233 LOGE("Upgrade cursor failed after asset download success %d.", errCode);
234 }
235 return errCode;
236 }
237
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)238 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
239 const CloudSyncBatch &data)
240 {
241 int errCode = E_OK;
242 if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
243 return errCode;
244 }
245 errCode = SetLogTriggerStatus(false);
246 if (errCode != E_OK) {
247 LOGE("Fail to set log trigger off, %d.", errCode);
248 return errCode;
249 }
250 sqlite3_stmt *stmt = nullptr;
251 for (size_t i = 0; i < data.assets.size(); ++i) {
252 if (data.assets.at(i).empty()) {
253 continue;
254 }
255 if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
256 DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
257 continue;
258 }
259 errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
260 if (errCode != E_OK) {
261 if (errCode == -E_NOT_FOUND) {
262 errCode = E_OK;
263 continue;
264 }
265 break;
266 }
267 errCode = SQLiteUtils::StepWithRetry(stmt, false);
268 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
269 LOGE("Fill upload asset failed:%d.", errCode);
270 break;
271 }
272 errCode = E_OK;
273 SQLiteUtils::ResetStatement(stmt, true, errCode);
274 stmt = nullptr;
275 if (errCode != E_OK) {
276 break;
277 }
278 }
279 int ret = E_OK;
280 SQLiteUtils::ResetStatement(stmt, true, ret);
281 int endCode = SetLogTriggerStatus(true);
282 if (endCode != E_OK) {
283 LOGE("Fail to set log trigger off, %d.", endCode);
284 return endCode;
285 }
286 return errCode != E_OK ? errCode : ret;
287 }
288
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)289 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
290 {
291 switch (opType) {
292 case OpType::UPDATE_VERSION:
293 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
294 case OpType::INSERT_VERSION:
295 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
296 default:
297 LOGE("Fill version with unknown type %d", static_cast<int>(opType));
298 return -E_INVALID_ARGS;
299 }
300 }
301
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)302 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
303 sqlite3_stmt *&stmt)
304 {
305 int errCode = E_OK;
306 std::string version;
307 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
308 vBucket, version) != E_OK) {
309 LOGW("get version from vBucket failed.");
310 }
311 if (hashKey.empty()) {
312 LOGE("hash key is empty when update version.");
313 return -E_CLOUD_ERROR;
314 }
315 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
316 if (errCode != E_OK) {
317 return errCode;
318 }
319 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
320 if (errCode != E_OK) {
321 return errCode;
322 }
323 errCode = SQLiteUtils::StepWithRetry(stmt, false);
324 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
325 errCode = E_OK;
326 SQLiteUtils::ResetStatement(stmt, false, errCode);
327 } else {
328 LOGE("step version stmt failed: %d.", errCode);
329 }
330 return errCode;
331 }
332
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)333 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
334 const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
335 {
336 VBucket vBucket = data.assets.at(index);
337 VBucket dbAssets;
338 std::string cloudGid;
339 int errCode;
340 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
341 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
342 if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
343 return errCode;
344 }
345 AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
346 AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
347 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
348 action);
349 if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
350 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
351 CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
352 } else {
353 if (DBCommon::IsRecordError(data.extend.at(index)) || DBCommon::IsRecordAssetsMissing(data.extend.at(index))) {
354 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
355 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
356 } else {
357 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
358 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
359 }
360 }
361
362 errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
363 if (errCode != E_OK) {
364 LOGE("get and bind asset failed %d.", errCode);
365 return errCode;
366 }
367 int64_t rowid = data.rowid[index];
368 return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
369 }
370
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)371 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
372 TableInfo &tableInfo)
373 {
374 return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
375 }
376
CreateTrackerTable(const TrackerTable & trackerTable,const TableInfo & table,bool checkData)377 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable,
378 const TableInfo &table, bool checkData)
379 {
380 std::unique_ptr<SqliteLogTableManager> tableManager = std::make_unique<SimpleTrackerLogTableManager>();
381 if (trackerTable.IsEmpty()) {
382 // drop trigger
383 return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
384 }
385
386 // create log table
387 int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
388 if (errCode != E_OK) {
389 return errCode;
390 }
391 // init cursor
392 errCode = InitCursorToMeta(table.GetTableName());
393 if (errCode != E_OK) {
394 return errCode;
395 }
396 errCode = GeneLogInfoForExistedData(dbHandle_, "", table, tableManager, true);
397 if (errCode != E_OK) {
398 LOGE("general tracker log info for existed data failed %d.", errCode);
399 return errCode;
400 }
401 errCode = SetLogTriggerStatus(true);
402 if (errCode != E_OK) {
403 return errCode;
404 }
405 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
406 if (errCode != E_OK) {
407 return errCode;
408 }
409 if (checkData) {
410 return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
411 }
412 return E_OK;
413 }
414
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)415 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
416 {
417 if (!schema.ToSchemaString().empty()) {
418 return E_OK;
419 }
420 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
421 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
422 Value schemaVal;
423 int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
424 if (errCode != E_OK) {
425 return errCode;
426 }
427 if (schemaVal.empty()) {
428 return -E_NOT_FOUND;
429 }
430 std::string schemaStr;
431 DBCommon::VectorToString(schemaVal, schemaStr);
432 errCode = schema.ParseFromTrackerSchemaString(schemaStr);
433 if (errCode != E_OK) {
434 LOGE("Parse from tracker schema string err.");
435 }
436 return errCode;
437 }
438
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)439 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
440 {
441 sqlite3_stmt *statement = nullptr;
442 int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
443 if (errCode != E_OK) {
444 LOGE("Execute sql failed when prepare stmt.");
445 return errCode;
446 }
447 if (condition.readOnly && !SQLiteUtils::IsStmtReadOnly(statement)) {
448 LOGW("[ExecuteSql] The condition is read-only, but SQL is not read-only");
449 }
450 size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
451 if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
452 LOGE("Sql bind args mismatch.");
453 SQLiteUtils::ResetStatement(statement, true, errCode);
454 return -E_INVALID_ARGS;
455 }
456 for (size_t i = 0; i < condition.bindArgs.size(); i++) {
457 Type type = condition.bindArgs[i];
458 errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
459 if (errCode != E_OK) {
460 int ret = E_OK;
461 SQLiteUtils::ResetStatement(statement, true, ret);
462 return errCode;
463 }
464 }
465 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
466 VBucket bucket;
467 errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
468 if (errCode != E_OK) {
469 int ret = E_OK;
470 SQLiteUtils::ResetStatement(statement, true, ret);
471 return errCode;
472 }
473 records.push_back(std::move(bucket));
474 }
475 int ret = E_OK;
476 SQLiteUtils::ResetStatement(statement, true, ret);
477 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
478 }
479
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)480 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
481 const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
482 std::set<std::string> &clearWaterMarkTables)
483 {
484 std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
485 for (const auto &table : changeTables) {
486 std::string logTableName = DBCommon::GetLogTableName(table);
487 bool isExists = false;
488 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
489 if (errCode != E_OK) {
490 LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
491 return errCode;
492 }
493 if (!isExists) { // table maybe dropped after set reference
494 LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
495 continue;
496 }
497
498 bool isEmpty = true;
499 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
500 if (errCode != E_OK) {
501 LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
502 clearWaterMarkTables.clear();
503 return errCode;
504 }
505 if (!isEmpty) {
506 clearWaterMarkTables.insert(table);
507 }
508 }
509 LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
510 return E_OK;
511 }
512
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)513 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
514 {
515 std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
516 if (schemaChanged) {
517 std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
518 "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
519 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
520 if (ret != E_OK) {
521 LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
522 return ret;
523 }
524 }
525 if (tableInfo.GetTrackerTable().IsEmpty()) {
526 return E_OK;
527 }
528 LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
529 int errCode = SetLogTriggerStatus(false);
530 if (errCode != E_OK) {
531 return errCode;
532 }
533 std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
534 TrackerTable trackerTable = tableInfo.GetTrackerTable();
535 errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
536 [this, &sql]() {
537 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
538 if (ret != E_OK) {
539 LOGE("Upgrade log for extend field failed.");
540 }
541 return ret;
542 });
543 return SetLogTriggerStatus(true);
544 }
545
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)546 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
547 {
548 int errCode = E_OK;
549 std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
550 for (const auto &sql: dropSql) {
551 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
552 if (errCode != E_OK) {
553 LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
554 return errCode;
555 }
556 }
557 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
558 if (errCode != E_OK) {
559 LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
560 return errCode;
561 }
562 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
563 if (errCode != E_OK) {
564 LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
565 return errCode;
566 }
567 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
568 if (errCode != E_OK) {
569 LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
570 }
571 return errCode;
572 }
573
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)574 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
575 ChangeProperties &changeProperties)
576 {
577 std::string fileName;
578 if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
579 LOGE("get db file name failed.");
580 return -E_INVALID_DB;
581 }
582 SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
583 return E_OK;
584 }
585
ClearAllTempSyncTrigger()586 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
587 {
588 sqlite3_stmt *stmt = nullptr;
589 static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
590 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
591 if (errCode != E_OK) {
592 LOGE("get clear all temp trigger stmt failed %d.", errCode);
593 return errCode;
594 }
595 int ret = E_OK;
596 while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
597 std::string str;
598 (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
599 if (errCode != E_OK) {
600 SQLiteUtils::ResetStatement(stmt, true, ret);
601 return errCode;
602 }
603 std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
604 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
605 if (errCode != E_OK) {
606 LOGE("drop temp trigger failed %d.", errCode);
607 SQLiteUtils::ResetStatement(stmt, true, ret);
608 return errCode;
609 }
610 }
611 SQLiteUtils::ResetStatement(stmt, true, ret);
612 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
613 }
614
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)615 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
616 bool isOnlyTrackTable)
617 {
618 std::string sql;
619 if (isOnlyTrackTable) {
620 sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
621 } else {
622 sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log SET extend_field = NULL";
623 }
624 sql += " where data_key = -1 and cursor <= ?;";
625 sqlite3_stmt *statement = nullptr;
626 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
627 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
628 LOGE("get clean tracker data stmt failed %d.", errCode);
629 return errCode;
630 }
631 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
632 int ret = E_OK;
633 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
634 LOGE("bind clean tracker data stmt failed %d.", errCode);
635 SQLiteUtils::ResetStatement(statement, true, ret);
636 return errCode;
637 }
638 errCode = SQLiteUtils::StepWithRetry(statement);
639 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
640 errCode = E_OK;
641 } else {
642 LOGE("clean tracker step failed: %d.", errCode);
643 }
644 SQLiteUtils::ResetStatement(statement, true, ret);
645 return errCode == E_OK ? ret : errCode;
646 }
647
CreateSharedTable(const TableSchema & tableSchema)648 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
649 {
650 LOGI("Create shared table[%s length[%u]]", DBCommon::StringMiddleMasking(tableSchema.name).c_str(),
651 tableSchema.name.length());
652 std::map<int32_t, std::string> cloudFieldTypeMap;
653 cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
654 cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
655 cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
656 cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
657 cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
658 cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
659 cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
660 cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
661
662 std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
663 std::string primaryKey = ", PRIMARY KEY (";
664 createTableSql += CloudDbConstant::CLOUD_OWNER;
665 createTableSql += " TEXT, ";
666 createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
667 createTableSql += " TEXT";
668 primaryKey += CloudDbConstant::CLOUD_OWNER;
669 bool hasPrimaryKey = false;
670 for (const auto &field : tableSchema.fields) {
671 createTableSql += ", " + field.colName + " ";
672 createTableSql += cloudFieldTypeMap[field.type];
673 createTableSql += field.nullable ? "" : " NOT NULL";
674 if (field.primary) {
675 primaryKey += ", " + field.colName;
676 hasPrimaryKey = true;
677 }
678 }
679 if (hasPrimaryKey) {
680 createTableSql += primaryKey + ")";
681 }
682 createTableSql += ");";
683 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
684 if (errCode != E_OK) {
685 LOGE("Create shared table failed, %d.", errCode);
686 }
687 return errCode;
688 }
689
DeleteTable(const std::vector<std::string> & tableNames)690 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
691 {
692 for (const auto &tableName : tableNames) {
693 std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
694 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
695 if (errCode != E_OK) {
696 LOGE("Delete table failed, %d.", errCode);
697 return errCode;
698 }
699 }
700 return E_OK;
701 }
702
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)703 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
704 const std::map<std::string, std::vector<Field>> &updateTableNames)
705 {
706 int errCode = E_OK;
707 std::map<int32_t, std::string> fieldTypeMap;
708 fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
709 fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
710 fieldTypeMap[TYPE_INDEX<double>] = "REAL";
711 fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
712 fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
713 fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
714 fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
715 fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
716 for (const auto &table : updateTableNames) {
717 if (table.second.empty()) {
718 continue;
719 }
720 std::string addColumnSql = "";
721 for (const auto &field : table.second) {
722 addColumnSql += "ALTER TABLE " + table.first + " ADD ";
723 addColumnSql += field.colName + " ";
724 addColumnSql += fieldTypeMap[field.type];
725 addColumnSql += field.primary ? " PRIMARY KEY" : "";
726 addColumnSql += field.nullable ? ";" : " NOT NULL;";
727 }
728 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
729 if (errCode != E_OK) {
730 LOGE("Shared table add column failed, %d.", errCode);
731 return errCode;
732 }
733 }
734 return errCode;
735 }
736
AlterTableName(const std::map<std::string,std::string> & tableNames)737 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
738 {
739 for (const auto &tableName : tableNames) {
740 std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
741 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
742 if (errCode != E_OK) {
743 LOGE("Alter table name failed, %d.", errCode);
744 return errCode;
745 }
746 }
747 return E_OK;
748 }
749
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)750 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
751 const VBucket &vBucket, std::string &sql)
752 {
753 std::string gidStr;
754 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
755 if (errCode != E_OK) {
756 LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
757 return errCode;
758 }
759
760 sql += " WHERE ";
761 if (!gidStr.empty()) {
762 sql += "cloud_gid = '" + gidStr + "'";
763 }
764 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
765 if (!pkMap.empty()) {
766 if (!gidStr.empty()) {
767 sql += " OR ";
768 }
769 sql += "(hash_key = ?);";
770 }
771 return E_OK;
772 }
773
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)774 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
775 {
776 int errCode = E_OK;
777 for (const auto &tableName: tableNameList) {
778 if (isLogicDelete_) {
779 std::string logTableName = DBCommon::GetLogTableName(tableName);
780 errCode = SetDataOnShareTableWithLogicDelete(tableName, logTableName);
781 } else {
782 errCode = CleanShareTable(tableName);
783 }
784 if (errCode != E_OK) {
785 LOGE("clean share table failed at table:%s, length:%d, deleteType:%d, errCode:%d.",
786 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), isLogicDelete_ ? 1 : 0, errCode);
787 return errCode;
788 }
789 }
790 return errCode;
791 }
792
CleanShareTable(const std::string & tableName)793 int SQLiteSingleVerRelationalStorageExecutor::CleanShareTable(const std::string &tableName)
794 {
795 int ret = E_OK;
796 int errCode = E_OK;
797 std::string delDataSql = "DELETE FROM '" + tableName + "';";
798 sqlite3_stmt *statement = nullptr;
799 errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
800 if (errCode != E_OK) {
801 LOGE("get clean shared data stmt failed %d.", errCode);
802 return errCode;
803 }
804 errCode = SQLiteUtils::StepWithRetry(statement);
805 SQLiteUtils::ResetStatement(statement, true, ret);
806 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
807 errCode = E_OK;
808 } else {
809 LOGE("clean shared data failed: %d.", errCode);
810 return errCode;
811 }
812 statement = nullptr;
813 std::string delLogSql = "DELETE FROM '" + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log';";
814 errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
815 if (errCode != E_OK) {
816 LOGE("get clean shared log stmt failed %d.", errCode);
817 return errCode;
818 }
819 errCode = SQLiteUtils::StepWithRetry(statement);
820 SQLiteUtils::ResetStatement(statement, true, ret);
821 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
822 errCode = E_OK;
823 } else {
824 LOGE("clean shared log failed: %d.", errCode);
825 return errCode;
826 }
827 // here errCode must be E_OK, just return ret
828 return ret;
829 }
830
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)831 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
832 const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
833 std::map<int64_t, Entries> &referenceGid)
834 {
835 int errCode = E_OK;
836 for (const auto &[targetTable, targetReference] : tableReference) {
837 errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
838 if (errCode != E_OK) {
839 LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
840 return errCode;
841 }
842 }
843 return errCode;
844 }
845
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)846 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
847 const std::string &targetTable, const CloudSyncBatch &syncBatch,
848 const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
849 {
850 auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
851 if (sourceFields.empty()) {
852 LOGD("[RDBExecutor] source field is empty.");
853 return E_OK;
854 }
855 if (sourceFields.size() != targetFields.size()) {
856 LOGE("[RDBExecutor] reference field size not equal.");
857 return -E_INTERNAL_ERROR;
858 }
859 std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
860 sqlite3_stmt *statement = nullptr;
861 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
862 if (errCode != E_OK) {
863 LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
864 return errCode;
865 }
866 errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
867 int ret = E_OK;
868 SQLiteUtils::ResetStatement(statement, true, ret);
869 return errCode == E_OK ? ret : errCode;
870 }
871
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)872 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
873 const std::string &targetTable, const std::vector<std::string> &sourceFields,
874 const std::vector<std::string> &targetFields)
875 {
876 // sql like this:
877 // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
878 // (SELECT parent._rowid_ AS rowid_b FROM parent,
879 // (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
880 // WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
881 // WHERE parent.name = source_a.name ) temp_table
882 // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
883 std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
884 std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
885 std::string sql;
886 sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
887 sql += "(";
888 sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
889 + ", ";
890 sql += "(SELECT " + sourceTable + "._rowid_,";
891 std::set<std::string> sourceFieldSet;
892 for (const auto &item : sourceFields) {
893 sourceFieldSet.insert(item);
894 }
895 for (const auto &sourceField : sourceFieldSet) {
896 sql += sourceField + ",";
897 }
898 sql.pop_back();
899 sql += " FROM " + sourceTable + ", " + logSourceTable;
900 sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
901 sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
902 sql += " WHERE ";
903 for (size_t i = 0u; i < sourceFields.size(); ++i) {
904 if (i != 0u) {
905 sql += " AND ";
906 }
907 sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
908 }
909 sql += ") temp_table ";
910 sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
911 sql += " AND " + logTargetTable + ".flag&0x08=0x00";
912 return sql;
913 }
914
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)915 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
916 const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
917 {
918 int errCode = E_OK;
919 if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
920 LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
921 syncBatch.timestamp.size());
922 return -E_INVALID_ARGS;
923 }
924 int matchCount = 0;
925 for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
926 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
927 if (errCode != E_OK) {
928 LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
929 break;
930 }
931 errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
932 if (errCode != E_OK) {
933 LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
934 break;
935 }
936 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
937 std::string gid;
938 (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
939 if (gid.empty()) {
940 LOGE("[RDBExecutor] reference data don't contain gid.");
941 errCode = -E_CLOUD_ERROR;
942 break;
943 }
944 referenceGid[syncBatch.rowid[i]][targetTable] = gid;
945 matchCount++;
946 }
947 if (errCode == -E_FINISHED) {
948 errCode = E_OK;
949 }
950 if (errCode != E_OK) {
951 LOGE("[RDBExecutor] step stmt failed %d.", errCode);
952 break;
953 }
954 SQLiteUtils::ResetStatement(statement, false, errCode);
955 if (errCode != E_OK) {
956 LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
957 break;
958 }
959 }
960 if (matchCount != 0) {
961 LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
962 }
963 return errCode;
964 }
965
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)966 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
967 const std::vector<TableReferenceProperty> &targetTableReference)
968 {
969 PairStringVector sourceTargetFiled;
970 for (const auto &reference : targetTableReference) {
971 for (const auto &column : reference.columns) {
972 sourceTargetFiled.first.push_back(column.first);
973 sourceTargetFiled.second.push_back(column.second);
974 }
975 }
976 return sourceTargetFiled;
977 }
978
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)979 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
980 bool ignoreEmptyGid, sqlite3_stmt *&stmt)
981 {
982 int fillGidCount = 0;
983 int errCode = E_OK;
984 for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
985 auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
986 if (gidEntry == cloudDataResult.insData.extend[i].end()) {
987 bool isSkipAssetsMissRecord = false;
988 if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
989 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
990 isSkipAssetsMissRecord = true;
991 }
992 if (ignoreEmptyGid || isSkipAssetsMissRecord) {
993 continue;
994 }
995 errCode = -E_INVALID_ARGS;
996 LOGE("[RDBExecutor] Extend not contain gid.");
997 break;
998 }
999 bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
1000 if (ignoreEmptyGid && containError) {
1001 continue;
1002 }
1003 std::string val;
1004 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
1005 cloudDataResult.insData.extend[i], val) != E_OK) {
1006 errCode = -E_CLOUD_ERROR;
1007 LOGE("[RDBExecutor] Can't get string gid from extend.");
1008 break;
1009 }
1010 if (val.empty()) {
1011 errCode = -E_CLOUD_ERROR;
1012 LOGE("[RDBExecutor] Get empty gid from extend.");
1013 break;
1014 }
1015 errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
1016 if (errCode != E_OK) {
1017 LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
1018 break;
1019 }
1020 }
1021 LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
1022 return errCode;
1023 }
1024
CleanExtendAndCursorForDeleteData(const std::string & tableName)1025 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
1026 {
1027 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1028 std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
1029 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1030 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1031 LOGE("update extend field and cursor failed %d.", errCode);
1032 }
1033 return errCode;
1034 }
1035
CheckIfExistUserTable(const std::string & tableName)1036 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
1037 {
1038 std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
1039 sqlite3_stmt *statement = nullptr;
1040 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1041 if (errCode != E_OK) {
1042 LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
1043 return errCode;
1044 }
1045 errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
1046 if (errCode != E_OK) {
1047 LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
1048 SQLiteUtils::ResetStatement(statement, true, errCode);
1049 return errCode;
1050 }
1051 if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1052 LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
1053 SQLiteUtils::ResetStatement(statement, true, errCode);
1054 return -E_INVALID_ARGS;
1055 }
1056 SQLiteUtils::ResetStatement(statement, true, errCode);
1057 return E_OK;
1058 }
1059
GetCloudDeleteSql(const std::string & table,std::string & sql)1060 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table, std::string &sql)
1061 {
1062 uint64_t cursor = DBConstant::INVALID_CURSOR;
1063 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, table, cursor);
1064 if (errCode != E_OK) {
1065 LOGE("[GetCloudDeleteSql] Get cursor of table[%s length[%u]] failed: %d",
1066 DBCommon::StringMiddleMasking(table).c_str(), table.length(), errCode);
1067 return errCode;
1068 }
1069 sql += " cloud_gid = '', version = '', ";
1070 if (isLogicDelete_) {
1071 // cursor already increased by DeleteCloudData, can be assigned directly here
1072 // 1001 which is logicDelete|cloudForcePush|local|delete
1073 sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1074 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1075 static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = " + std::to_string(cursor) + " ";
1076 } else {
1077 sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1078 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1079 errCode = SetCursor(table, cursor + 1);
1080 if (errCode == E_OK) {
1081 sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1082 } else {
1083 LOGE("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1084 return errCode;
1085 }
1086 }
1087 return E_OK;
1088 }
1089
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1090 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1091 {
1092 int errCode = E_OK;
1093 std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1094 std::to_string(dataKey);
1095 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1096 if (errCode != E_OK) {
1097 LOGE("[RDBExecutor] remove data failed %d", errCode);
1098 return errCode;
1099 }
1100 std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1101 std::to_string(dataKey);
1102 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1103 if (errCode != E_OK) {
1104 LOGE("[RDBExecutor] remove log failed %d", errCode);
1105 }
1106 return errCode;
1107 }
1108
GetLocalDataKey(size_t index,const DownloadData & downloadData)1109 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1110 const DownloadData &downloadData)
1111 {
1112 if (index >= downloadData.existDataKey.size()) {
1113 LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1114 return -1; // -1 means not exist
1115 }
1116 return downloadData.existDataKey[index];
1117 }
1118
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1119 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1120 sqlite3_stmt *&stmt, int &fillGidCount)
1121 {
1122 int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1123 if (errCode != E_OK) {
1124 return errCode;
1125 }
1126 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1127 if (errCode != E_OK) {
1128 return errCode;
1129 }
1130 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1131 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1132 errCode = E_OK;
1133 fillGidCount++;
1134 SQLiteUtils::ResetStatement(stmt, false, errCode);
1135 } else {
1136 LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1137 }
1138 return errCode;
1139 }
1140
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1141 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1142 const TableInfo &tableInfo, TableSyncType syncType)
1143 {
1144 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1145 return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1146 }
1147
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1148 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1149 const RelationalSchemaObject &localSchema)
1150 {
1151 std::vector<int64_t> dataKeys;
1152 std::string logTableName = DBCommon::GetLogTableName(tableName);
1153 int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1154 if (errCode != E_OK) {
1155 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1156 return errCode;
1157 }
1158 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1159 errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1160 if (errCode != E_OK) {
1161 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1162 }
1163 return errCode;
1164 }
1165
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1166 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1167 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1168 {
1169 int errCode = E_OK;
1170 for (const auto &fieldInfo : fieldInfos) {
1171 if (fieldInfo.IsAssetType()) {
1172 Assets assets;
1173 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1174 if (errCode != E_OK) {
1175 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1176 return errCode;
1177 }
1178 errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1179 if (errCode != E_OK) {
1180 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1181 return errCode;
1182 }
1183 } else if (fieldInfo.IsAssetsType()) {
1184 errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1185 if (errCode != E_OK) {
1186 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1187 return errCode;
1188 }
1189 }
1190 }
1191 return errCode;
1192 }
1193
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1194 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1195 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1196 {
1197 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1198 return E_OK;
1199 }
1200 int errCode = E_OK;
1201 int ret = E_OK;
1202 sqlite3_stmt *stmt = nullptr;
1203 size_t index = 0;
1204 for (const auto &rowId : dataKeys) {
1205 if (rowId == -1) { // -1 means data is deleted
1206 continue;
1207 }
1208 if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1209 index++;
1210 continue;
1211 }
1212 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1213 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1214 errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1215 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1216 LOGE("Get statement failed, %d", errCode);
1217 return errCode;
1218 }
1219 assets[index].assetId = "";
1220 assets[index].status &= ~AssetStatus::UPLOADING;
1221 errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1222 index++;
1223 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1224 LOGE("Bind asset to blob statement failed, %d", errCode);
1225 goto END;
1226 }
1227 errCode = SQLiteUtils::StepWithRetry(stmt);
1228 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1229 errCode = E_OK;
1230 } else {
1231 LOGE("Step statement failed, %d", errCode);
1232 goto END;
1233 }
1234 SQLiteUtils::ResetStatement(stmt, true, ret);
1235 }
1236 return errCode != E_OK ? errCode : ret;
1237 END:
1238 SQLiteUtils::ResetStatement(stmt, true, ret);
1239 return errCode != E_OK ? errCode : ret;
1240 }
1241
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1242 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1243 const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1244 {
1245 int errCode = E_OK;
1246 int ret = E_OK;
1247 sqlite3_stmt *selectStmt = nullptr;
1248 for (const auto &rowId : dataKeys) {
1249 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1250 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1251 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1252 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1253 LOGE("Get select assets statement failed, %d.", errCode);
1254 goto END;
1255 }
1256 Assets assets;
1257 errCode = GetAssetsByRowId(selectStmt, assets);
1258 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1259 LOGE("Get assets by rowId failed, %d.", errCode);
1260 goto END;
1261 }
1262 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1263 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1264 continue;
1265 }
1266 for (auto &asset : assets) {
1267 asset.assetId = "";
1268 asset.status &= ~AssetStatus::UPLOADING;
1269 }
1270 std::vector<uint8_t> assetsValue;
1271 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1272 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1273 LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1274 return errCode;
1275 }
1276 errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1277 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1278 LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1279 return errCode;
1280 }
1281 }
1282 return errCode != E_OK ? errCode : ret;
1283 END:
1284 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1285 return errCode != E_OK ? errCode : ret;
1286 }
1287
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1288 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1289 const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1290 {
1291 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1292 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1293 sqlite3_stmt *stmt = nullptr;
1294 int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1295 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1296 LOGE("Get statement failed, %d", errCode);
1297 SQLiteUtils::ResetStatement(stmt, true, errCode);
1298 return errCode;
1299 }
1300 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1301 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1302 SQLiteUtils::ResetStatement(stmt, true, errCode);
1303 return errCode;
1304 }
1305 errCode = SQLiteUtils::StepWithRetry(stmt);
1306 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1307 errCode = E_OK;
1308 }
1309 SQLiteUtils::ResetStatement(stmt, true, errCode);
1310 return errCode;
1311 }
1312
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1313 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1314 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1315 {
1316 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1317 auto &[errCode, status] = res;
1318 std::vector<Field> assetFields;
1319 std::string sql = "SELECT";
1320 for (const auto &field: tableSchema.fields) {
1321 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1322 assetFields.emplace_back(field);
1323 sql += " b." + field.colName + ",";
1324 }
1325 }
1326 if (assetFields.empty()) {
1327 return { -E_NOT_FOUND, status };
1328 }
1329 sql += "a.cloud_gid, a.status ";
1330 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1331 (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR a.hash_key = ?);");
1332 sqlite3_stmt *stmt = nullptr;
1333 errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1334 if (errCode != E_OK) {
1335 return res;
1336 }
1337 errCode = SQLiteUtils::StepWithRetry(stmt);
1338 int index = 0;
1339 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1340 for (const auto &field: assetFields) {
1341 Type cloudValue;
1342 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1343 if (errCode != E_OK) {
1344 break;
1345 }
1346 errCode = PutVBucketByType(assets, field, cloudValue);
1347 if (errCode != E_OK) {
1348 break;
1349 }
1350 }
1351 std::string curGid;
1352 errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1353 if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1354 // Gid is different, there may be duplicate primary keys in the cloud
1355 errCode = -E_CLOUD_GID_MISMATCH;
1356 }
1357 status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1358 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1359 errCode = -E_NOT_FOUND;
1360 } else {
1361 LOGE("step get asset stmt failed %d.", errCode);
1362 }
1363 SQLiteUtils::ResetStatement(stmt, true, errCode);
1364 return res;
1365 }
1366
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1367 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1368 const Bytes &hashKey, sqlite3_stmt *&stmt)
1369 {
1370 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1371 if (errCode != E_OK) {
1372 LOGE("Get asset statement failed, %d.", errCode);
1373 return errCode;
1374 }
1375 int index = 1;
1376 if (!gid.empty()) {
1377 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1378 if (errCode != E_OK) {
1379 LOGE("bind gid failed %d.", errCode);
1380 SQLiteUtils::ResetStatement(stmt, true, errCode);
1381 return errCode;
1382 }
1383 }
1384 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1385 if (errCode != E_OK) {
1386 LOGE("bind hash failed %d.", errCode);
1387 SQLiteUtils::ResetStatement(stmt, true, errCode);
1388 }
1389 return errCode;
1390 }
1391
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1392 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1393 bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1394 {
1395 int errCode = E_OK;
1396 switch (opType) {
1397 case OpType::UPDATE_VERSION: // fallthrough
1398 case OpType::INSERT_VERSION: {
1399 errCode = FillCloudVersionForUpload(opType, data);
1400 break;
1401 }
1402 case OpType::SET_UPLOADING: {
1403 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1404 if (errCode != E_OK) {
1405 LOGE("Failed to set uploading for ins data, %d.", errCode);
1406 return errCode;
1407 }
1408 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1409 break;
1410 }
1411 case OpType::INSERT: {
1412 errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1413 if (errCode != E_OK) {
1414 LOGE("Failed to fill cloud log gid, %d.", errCode);
1415 return errCode;
1416 }
1417 if (fillAsset) {
1418 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1419 if (errCode != E_OK) {
1420 LOGE("Failed to fill asset for ins, %d.", errCode);
1421 return errCode;
1422 }
1423 }
1424 errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1425 break;
1426 }
1427 case OpType::UPDATE: {
1428 if (fillAsset && !data.updData.assets.empty()) {
1429 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1430 if (errCode != E_OK) {
1431 LOGE("Failed to fill asset for upd, %d.", errCode);
1432 return errCode;
1433 }
1434 }
1435 errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1436 break;
1437 }
1438 default:
1439 break;
1440 }
1441 return errCode;
1442 }
1443
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1444 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1445 {
1446 int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1447 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1448 std::vector<uint8_t> blobValue;
1449 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1450 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1451 LOGE("Get column blob value failed %d.", errCode);
1452 return errCode;
1453 }
1454 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1455 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1456 LOGE("Transfer blob to assets failed %d", errCode);
1457 }
1458 return errCode;
1459 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1460 return E_OK;
1461 } else {
1462 LOGE("Step select statement failed %d.", errCode);
1463 return errCode;
1464 }
1465 }
1466
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1467 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1468 {
1469 assetLoader_ = loader;
1470 }
1471
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1472 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1473 int beginIndex, const std::string &cloudGid)
1474 {
1475 int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1476 if (errCode != E_OK) {
1477 LOGE("Bind cloud gid to statement failed %d.", errCode);
1478 int ret = E_OK;
1479 SQLiteUtils::ResetStatement(stmt, true, ret);
1480 return errCode;
1481 }
1482 errCode = SQLiteUtils::StepWithRetry(stmt);
1483 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1484 errCode = E_OK;
1485 } else {
1486 LOGE("Fill cloud asset failed: %d.", errCode);
1487 }
1488 int ret = E_OK;
1489 SQLiteUtils::ResetStatement(stmt, true, ret);
1490 return errCode != E_OK ? errCode : ret;
1491 }
1492
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1493 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1494 const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1495 {
1496 if (assetLoader_ == nullptr) {
1497 LOGE("assetLoader may be not set.");
1498 return -E_NOT_SET;
1499 }
1500 std::vector<Asset> toDeleteAssets;
1501 CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1502 if (toDeleteAssets.empty()) {
1503 return E_OK;
1504 }
1505 DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1506 if (ret != OK) {
1507 LOGE("remove local assets failed %d.", ret);
1508 return -E_REMOVE_ASSETS_FAILED;
1509 }
1510 return E_OK;
1511 }
1512
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1513 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1514 const VBucket &assets, sqlite3_stmt *&statement)
1515 {
1516 std::string sql = "UPDATE '" + tableName + "' SET ";
1517 for (const auto &item: assets) {
1518 sql += item.first + " = ?,";
1519 }
1520 sql.pop_back();
1521 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1522 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1523 if (errCode != E_OK) {
1524 return errCode;
1525 }
1526 int bindIndex = 1;
1527 for (const auto &item: assets) {
1528 Field field = {
1529 .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1530 };
1531 errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1532 if (errCode != E_OK) {
1533 return errCode;
1534 }
1535 }
1536 return errCode;
1537 }
1538
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1539 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1540 const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1541 {
1542 if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1543 opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1544 return E_OK;
1545 }
1546 if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1547 // this is shared table, not need to update asset id.
1548 return E_OK;
1549 }
1550 bool isNotIncCursor = false; // if isNotIncCursor is false, will increase cursor
1551 if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket, isNotIncCursor)) {
1552 return E_OK;
1553 }
1554 int error = SetCursorIncFlag(!isNotIncCursor);
1555 if (error != E_OK) {
1556 LOGE("[Storage Executor] failed to set cursor_inc_flag, %d.", error);
1557 return error;
1558 }
1559 int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1560 if (errCode != E_OK) {
1561 LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1562 }
1563 if (isNotIncCursor) {
1564 error = SetCursorIncFlag(true);
1565 if (error != E_OK) {
1566 LOGE("[Storage Executor] failed to set cursor_inc_flag true, %d.", error);
1567 return error;
1568 }
1569 }
1570 return errCode;
1571 }
1572
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1573 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1574 Asset &asset)
1575 {
1576 for (const auto &[col, value] : vBucket) {
1577 if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1578 asset = std::get<Asset>(value);
1579 }
1580 }
1581 }
1582
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1583 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1584 Assets &assets)
1585 {
1586 for (const auto &[col, value] : vBucket) {
1587 if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1588 assets = std::get<Assets>(value);
1589 }
1590 }
1591 }
1592
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1593 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1594 sqlite3_stmt *&stmt)
1595 {
1596 std::vector<uint8_t> blobValue;
1597 int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1598 if (errCode != E_OK) {
1599 LOGE("Transfer asset to blob failed, %d.", errCode);
1600 return errCode;
1601 }
1602 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1603 if (errCode != E_OK) {
1604 LOGE("Bind asset blob to statement failed, %d.", errCode);
1605 }
1606 return errCode;
1607 }
1608
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1609 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1610 sqlite3_stmt *&stmt)
1611 {
1612 std::vector<uint8_t> blobValue;
1613 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1614 if (errCode != E_OK) {
1615 LOGE("Transfer asset to blob failed, %d.", errCode);
1616 return errCode;
1617 }
1618 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1619 if (errCode != E_OK) {
1620 LOGE("Bind asset blob to statement failed, %d.", errCode);
1621 }
1622 return errCode;
1623 }
1624
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1625 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1626 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1627 {
1628 int assetIndex = 0;
1629 int assetsIndex = 0;
1630 for (const auto &field : tableSchema.fields) {
1631 if (field.type == TYPE_INDEX<Asset>) {
1632 if (assetOfOneRecord[assetIndex].name.empty()) {
1633 continue;
1634 }
1635 int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1636 if (errCode != E_OK) {
1637 LOGE("Bind asset to blob statement failed, %d.", errCode);
1638 return errCode;
1639 }
1640 assetIndex++;
1641 } else if (field.type == TYPE_INDEX<Assets>) {
1642 if (assetsOfOneRecord[assetsIndex].empty()) {
1643 continue;
1644 }
1645 int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1646 if (errCode != E_OK) {
1647 LOGE("Bind assets to blob statement failed, %d.", errCode);
1648 return errCode;
1649 }
1650 assetsIndex++;
1651 }
1652 }
1653 return E_OK;
1654 }
1655
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1656 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1657 const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1658 {
1659 int errCode = E_OK;
1660 int ret = E_OK;
1661 sqlite3_stmt *stmt = nullptr;
1662 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1663 if (errCode != E_OK) {
1664 LOGE("Get update asset statement failed, %d.", errCode);
1665 return errCode;
1666 }
1667 errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1668 if (errCode != E_OK) {
1669 LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1670 SQLiteUtils::ResetStatement(stmt, true, ret);
1671 return errCode != E_OK ? errCode : ret;
1672 }
1673 errCode = SQLiteUtils::StepWithRetry(stmt);
1674 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1675 errCode = E_OK;
1676 } else {
1677 LOGE("Step statement failed, %d", errCode);
1678 }
1679 SQLiteUtils::ResetStatement(stmt, true, ret);
1680 return errCode != E_OK ? errCode : ret;
1681 }
1682
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1683 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1684 const VBucket &vBucket)
1685 {
1686 int errCode = E_OK;
1687 std::vector<Asset> assetOfOneRecord;
1688 std::vector<Assets> assetsOfOneRecord;
1689 std::string updateAssetIdSql = "UPDATE " + tableSchema.name + " SET";
1690 for (const auto &field : tableSchema.fields) {
1691 if (field.type == TYPE_INDEX<Asset>) {
1692 Asset asset;
1693 UpdateLocalAssetId(vBucket, field.colName, asset);
1694 assetOfOneRecord.push_back(asset);
1695 if (!asset.name.empty()) {
1696 updateAssetIdSql += " " + field.colName + " = ?,";
1697 }
1698 }
1699 if (field.type == TYPE_INDEX<Assets>) {
1700 Assets assets;
1701 UpdateLocalAssetsId(vBucket, field.colName, assets);
1702 assetsOfOneRecord.push_back(assets);
1703 if (!assets.empty()) {
1704 updateAssetIdSql += " " + field.colName + " = ?,";
1705 }
1706 }
1707 }
1708 if (updateAssetIdSql == "UPDATE " + tableSchema.name + " SET") {
1709 return E_OK;
1710 }
1711 updateAssetIdSql.pop_back();
1712 updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1713 errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1714 if (errCode != E_OK) {
1715 LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1716 }
1717 return errCode;
1718 }
1719
SetPutDataMode(PutDataMode mode)1720 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1721 {
1722 putDataMode_ = mode;
1723 }
1724
SetMarkFlagOption(MarkFlagOption option)1725 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1726 {
1727 markFlagOption_ = option;
1728 }
1729
GetDataFlag()1730 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1731 {
1732 if (putDataMode_ != PutDataMode::USER) {
1733 return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1734 static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1735 }
1736 uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1737 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1738 flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1739 }
1740 flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1741 return static_cast<int64_t>(flag);
1742 }
1743
GetUpdateDataFlagSql(const VBucket & data)1744 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql(const VBucket &data)
1745 {
1746 std::string retentionFlag = "flag = flag & " +
1747 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY) |
1748 static_cast<uint32_t>(LogInfoFlag::FLAG_ASSET_DOWNLOADING_FOR_ASYNC));
1749 if (putDataMode_ == PutDataMode::SYNC) {
1750 if (CloudStorageUtils::IsAssetsContainDownloadRecord(data)) {
1751 return retentionFlag;
1752 }
1753 return UPDATE_FLAG_CLOUD;
1754 }
1755 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1756 return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1757 }
1758 return retentionFlag;
1759 }
1760
GetDev()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1762 {
1763 return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1764 }
1765
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1766 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1767 const TableSchema &tableSchema)
1768 {
1769 std::set<std::string> useFields;
1770 std::vector<Field> fields;
1771 if (putDataMode_ == PutDataMode::SYNC) {
1772 for (const auto &field : tableSchema.fields) {
1773 useFields.insert(field.colName);
1774 }
1775 fields = tableSchema.fields;
1776 } else {
1777 for (const auto &field : vBucket) {
1778 if (field.first.empty() || field.first[0] == '#') {
1779 continue;
1780 }
1781 useFields.insert(field.first);
1782 }
1783 for (const auto &field : tableSchema.fields) {
1784 if (useFields.find(field.colName) == useFields.end()) {
1785 continue;
1786 }
1787 fields.push_back(field);
1788 }
1789 }
1790 return fields;
1791 }
1792
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1793 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1794 const LogInfo &logInfo)
1795 {
1796 bool useHashKey = false;
1797 if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1798 if (logInfo.hashKey.empty()) {
1799 LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1800 return -E_INVALID_ARGS;
1801 }
1802 useHashKey = true;
1803 }
1804 sqlite3_stmt *stmt = nullptr;
1805 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1806 if (errCode != E_OK) {
1807 LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1808 return errCode;
1809 }
1810 int ret = E_OK;
1811 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1812 if (errCode != E_OK) {
1813 LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1814 SQLiteUtils::ResetStatement(stmt, true, ret);
1815 return errCode;
1816 }
1817 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1818 if (errCode != E_OK) {
1819 LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1820 SQLiteUtils::ResetStatement(stmt, true, ret);
1821 return errCode;
1822 }
1823 if (useHashKey) {
1824 errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1825 if (errCode != E_OK) {
1826 LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1827 SQLiteUtils::ResetStatement(stmt, true, ret);
1828 return errCode;
1829 }
1830 }
1831 errCode = SQLiteUtils::StepWithRetry(stmt);
1832 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1833 errCode = E_OK;
1834 } else {
1835 LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1836 }
1837 SQLiteUtils::ResetStatement(stmt, true, ret);
1838 return errCode == E_OK ? ret : errCode;
1839 }
1840
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp,bool isExistAssetsDownload)1841 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1842 const Key &hashKey, Timestamp timestamp, bool isExistAssetsDownload)
1843 {
1844 sqlite3_stmt *stmt = nullptr;
1845 int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName,
1846 isExistAssetsDownload), stmt);
1847 int index = 1;
1848 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1849 if (errCode != E_OK) {
1850 SQLiteUtils::ResetStatement(stmt, true, errCode);
1851 LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1852 return;
1853 }
1854 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1855 if (errCode != E_OK) {
1856 SQLiteUtils::ResetStatement(stmt, true, errCode);
1857 LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1858 return;
1859 }
1860 errCode = SQLiteUtils::StepWithRetry(stmt);
1861 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1862 errCode = E_OK;
1863 } else {
1864 LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1865 }
1866 SQLiteUtils::ResetStatement(stmt, true, errCode);
1867 }
1868
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data,bool isQueryDownloadRecords)1869 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1870 std::vector<VBucket> &data, bool isQueryDownloadRecords)
1871 {
1872 std::string sql = "SELECT ";
1873 std::vector<Field> pkFields;
1874 for (const auto &field : table.fields) {
1875 if (!field.primary) {
1876 continue;
1877 }
1878 sql += "b." + field.colName + ",";
1879 pkFields.push_back(field);
1880 }
1881 if (pkFields.empty()) {
1882 // ignore no pk table
1883 return E_OK;
1884 }
1885 sql.pop_back();
1886 if (isQueryDownloadRecords) {
1887 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " +
1888 FLAG_IS_WAIT_COMPENSATED_CONTAIN_DOWNLOAD_SYNC;
1889 } else {
1890 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1891 }
1892 sqlite3_stmt *stmt = nullptr;
1893 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1894 if (errCode != E_OK) {
1895 LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1896 return errCode;
1897 }
1898 do {
1899 errCode = SQLiteUtils::StepWithRetry(stmt);
1900 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1901 VBucket pkData;
1902 errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1903 if (errCode != E_OK) {
1904 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1905 break;
1906 }
1907 data.push_back(pkData);
1908 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1909 errCode = E_OK;
1910 break;
1911 } else {
1912 LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1913 break;
1914 }
1915 } while (errCode == E_OK);
1916 int ret = E_OK;
1917 SQLiteUtils::ResetStatement(stmt, true, ret);
1918 return errCode == E_OK ? ret : errCode;
1919 }
1920
ClearUnLockingStatus(const std::string & tableName)1921 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1922 {
1923 std::string sql;
1924 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1925 "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1926 sqlite3_stmt *stmt = nullptr;
1927 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1928 if (errCode != E_OK) {
1929 LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1930 return errCode;
1931 }
1932 int ret = E_OK;
1933 errCode = SQLiteUtils::StepWithRetry(stmt);
1934 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1935 errCode = E_OK;
1936 } else {
1937 LOGE("[Storage Executor] Step update record status stmt failed, %d.", errCode);
1938 }
1939 SQLiteUtils::ResetStatement(stmt, true, ret);
1940 return errCode == E_OK ? ret : errCode;
1941 }
1942
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1943 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1944 int startIndex, VBucket &record)
1945 {
1946 int errCode = E_OK;
1947 for (const auto &field : fields) {
1948 Type cloudValue;
1949 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1950 if (errCode != E_OK) {
1951 break;
1952 }
1953 errCode = PutVBucketByType(record, field, cloudValue);
1954 if (errCode != E_OK) {
1955 break;
1956 }
1957 startIndex++;
1958 }
1959 return errCode;
1960 }
1961
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt,int & index)1962 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1963 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt, int &index)
1964 {
1965 int errCode = E_OK;
1966 std::string version;
1967 if (putDataMode_ == PutDataMode::SYNC) {
1968 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1969 if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1970 LOGE("get version for insert log statement failed, %d", errCode);
1971 return -E_CLOUD_ERROR;
1972 }
1973 }
1974 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, version); // next is version
1975 if (errCode != E_OK) {
1976 LOGE("Bind version to insert log statement failed, %d", errCode);
1977 return errCode;
1978 }
1979
1980 std::string shareUri;
1981 if (putDataMode_ == PutDataMode::SYNC) {
1982 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1983 vBucket, shareUri);
1984 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1985 LOGE("get shareUri for insert log statement failed, %d", errCode);
1986 return -E_CLOUD_ERROR;
1987 }
1988 }
1989
1990 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, shareUri); // next is sharing_resource
1991 if (errCode != E_OK) {
1992 LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1993 }
1994 return errCode;
1995 }
1996
CheckAndCreateTrigger(const TableInfo & table)1997 void SQLiteSingleVerRelationalStorageExecutor::CheckAndCreateTrigger(const TableInfo &table)
1998 {
1999 auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
2000 tableManager->CheckAndCreateTrigger(dbHandle_, table, "");
2001 }
2002 } // namespace DistributedDB
2003 #endif
2004