1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36 std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38 if (assetFields.empty() && pkSet.empty()) {
39 return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40 }
41 std::string gid;
42 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44 LOGE("Get cloud gid fail when query log table.");
45 return errCode;
46 }
47
48 if (pkSet.empty() && gid.empty()) {
49 LOGE("query log table failed because of both primary key and gid are empty.");
50 return -E_CLOUD_ERROR;
51 }
52 std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53 " a.cloud_gid, a.sharing_resource, a.status, a.version";
54 for (const auto &field : assetFields) {
55 sql += ", b." + field.colName;
56 }
57 for (const auto &pk : pkSet) {
58 sql += ", b." + pk;
59 }
60 sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61 if (!gid.empty()) {
62 sql += " a.cloud_gid = ? or ";
63 }
64 sql += "a.hash_key = ?";
65 querySql = sql;
66 return E_OK;
67 }
68
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70 const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72 std::string sql = "UPDATE " + tableName + " SET ";
73 for (const auto &field: fields) {
74 sql += field.colName + " = ?,";
75 }
76 sql.pop_back();
77 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78 sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79 sqlite3_stmt *stmt = nullptr;
80 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81 if (errCode != E_OK) {
82 LOGE("Get fill asset statement failed, %d.", errCode);
83 return errCode;
84 }
85 for (size_t i = 0; i < fields.size(); ++i) {
86 errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87 if (errCode != E_OK) {
88 SQLiteUtils::ResetStatement(stmt, true, errCode);
89 return errCode;
90 }
91 }
92 statement = stmt;
93 return errCode;
94 }
95
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)96 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
97 VBucket &vBucket, bool isDownloadSuccess)
98 {
99 std::string cloudGid;
100 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
101 if (errCode != E_OK) {
102 LOGE("Miss gid when fill Asset.");
103 return errCode;
104 }
105 std::vector<Field> assetsField;
106 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
107 if (errCode != E_OK) {
108 LOGE("No assets need to be filled.");
109 return errCode;
110 }
111 CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
112
113 Bytes hashKey;
114 (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
115 VBucket dbAssets;
116 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
117 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
118 LOGE("get assets by gid or hashkey failed %d.", errCode);
119 return errCode;
120 }
121 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
122 AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
123
124 if (isDownloadSuccess) {
125 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
126 CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
127 errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid);
128 if (errCode != E_OK) {
129 return errCode;
130 }
131 } else {
132 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
133 CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
134 }
135
136 sqlite3_stmt *stmt = nullptr;
137 errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
138 if (errCode != E_OK) {
139 return errCode;
140 }
141 errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
142 int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
143 return errCode == E_OK ? ret : errCode;
144 }
145
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid)146 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
147 const std::string &gid)
148 {
149 int cursor = GetCursor(tableName);
150 cursor++;
151 std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
152 sql += " SET cursor = ? where cloud_gid = ?;";
153 sqlite3_stmt *statement = nullptr;
154 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
155 if (errCode != E_OK) {
156 LOGE("get update asset data cursor stmt failed %d.", errCode);
157 return errCode;
158 }
159 ResFinalizer finalizer([statement]() {
160 sqlite3_stmt *statementInner = statement;
161 int ret = E_OK;
162 SQLiteUtils::ResetStatement(statementInner, true, ret);
163 if (ret != E_OK) {
164 LOGW("Reset stmt failed %d when increase cursor on asset data", ret);
165 }
166 });
167 int index = 1;
168 errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
169 if (errCode != E_OK) {
170 LOGE("bind cursor data stmt failed %d.", errCode);
171 return errCode;
172 }
173 errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
174 if (errCode != E_OK) {
175 LOGE("bind cursor gid data stmt failed %d.", errCode);
176 return errCode;
177 }
178 errCode = SQLiteUtils::StepWithRetry(statement, false);
179 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
180 LOGE("Fill upload asset failed:%d.", errCode);
181 return errCode;
182 }
183 LOGI("Upgrade cursor to %d after asset download success.", cursor);
184 errCode = SetCursor(tableName, cursor);
185 if (errCode != E_OK) {
186 LOGE("Upgrade cursor failed after asset download success %d.", errCode);
187 }
188 return errCode;
189 }
190
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)191 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
192 const CloudSyncBatch &data)
193 {
194 int errCode = E_OK;
195 if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
196 return errCode;
197 }
198 errCode = SetLogTriggerStatus(false);
199 if (errCode != E_OK) {
200 LOGE("Fail to set log trigger off, %d.", errCode);
201 return errCode;
202 }
203 sqlite3_stmt *stmt = nullptr;
204 for (size_t i = 0; i < data.assets.size(); ++i) {
205 if (data.assets.at(i).empty()) {
206 continue;
207 }
208 if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
209 DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
210 continue;
211 }
212 errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
213 if (errCode != E_OK) {
214 if (errCode == -E_NOT_FOUND) {
215 errCode = E_OK;
216 continue;
217 }
218 break;
219 }
220 errCode = SQLiteUtils::StepWithRetry(stmt, false);
221 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
222 LOGE("Fill upload asset failed:%d.", errCode);
223 break;
224 }
225 errCode = E_OK;
226 SQLiteUtils::ResetStatement(stmt, true, errCode);
227 stmt = nullptr;
228 if (errCode != E_OK) {
229 break;
230 }
231 }
232 int ret = E_OK;
233 SQLiteUtils::ResetStatement(stmt, true, ret);
234 int endCode = SetLogTriggerStatus(true);
235 if (endCode != E_OK) {
236 LOGE("Fail to set log trigger off, %d.", endCode);
237 return endCode;
238 }
239 return errCode != E_OK ? errCode : ret;
240 }
241
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)242 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
243 {
244 switch (opType) {
245 case OpType::UPDATE_VERSION:
246 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
247 case OpType::INSERT_VERSION:
248 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
249 default:
250 LOGE("Fill version with unknown type %d", static_cast<int>(opType));
251 return -E_INVALID_ARGS;
252 }
253 }
254
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)255 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
256 sqlite3_stmt *&stmt)
257 {
258 int errCode = E_OK;
259 std::string version;
260 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
261 vBucket, version) != E_OK) {
262 LOGW("get version from vBucket failed.");
263 }
264 if (hashKey.empty()) {
265 LOGE("hash key is empty when update version.");
266 return -E_CLOUD_ERROR;
267 }
268 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
269 if (errCode != E_OK) {
270 return errCode;
271 }
272 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
273 if (errCode != E_OK) {
274 return errCode;
275 }
276 errCode = SQLiteUtils::StepWithRetry(stmt, false);
277 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
278 errCode = E_OK;
279 SQLiteUtils::ResetStatement(stmt, false, errCode);
280 } else {
281 LOGE("step version stmt failed: %d.", errCode);
282 }
283 return errCode;
284 }
285
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)286 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
287 const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
288 {
289 VBucket vBucket = data.assets.at(index);
290 VBucket dbAssets;
291 std::string cloudGid;
292 int errCode;
293 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
294 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
295 if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
296 return errCode;
297 }
298 AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
299 AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
300 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
301 action);
302 if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
303 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
304 CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
305 } else {
306 if (DBCommon::IsRecordError(data.extend.at(index))) {
307 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
308 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
309 } else {
310 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
311 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
312 }
313 }
314
315 errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
316 if (errCode != E_OK) {
317 LOGE("get and bind asset failed %d.", errCode);
318 return errCode;
319 }
320 int64_t rowid = data.rowid[index];
321 return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
322 }
323
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)324 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
325 TableInfo &tableInfo)
326 {
327 return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
328 }
329
CreateTrackerTable(const TrackerTable & trackerTable,bool isUpgrade)330 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable, bool isUpgrade)
331 {
332 TableInfo table;
333 table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
334 int errCode = AnalysisTrackerTable(trackerTable, table);
335 if (errCode != E_OK) {
336 return errCode;
337 }
338 auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
339 if (trackerTable.GetTrackerColNames().empty()) {
340 // drop trigger
341 return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
342 }
343
344 // create log table
345 errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
346 if (errCode != E_OK) {
347 return errCode;
348 }
349 // init cursor
350 errCode = InitCursorToMeta(table.GetTableName());
351 if (errCode != E_OK) {
352 return errCode;
353 }
354 std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, "");
355 if (isUpgrade) {
356 errCode = CleanExtendAndCursorForDeleteData(table.GetTableName());
357 if (errCode != E_OK) {
358 LOGE("clean tracker log info for deleted data failed %d.", errCode);
359 return errCode;
360 }
361 }
362 errCode = GeneLogInfoForExistedData(dbHandle_, trackerTable.GetTableName(), calPrimaryKeyHash, table);
363 if (errCode != E_OK) {
364 LOGE("general tracker log info for existed data failed %d.", errCode);
365 return errCode;
366 }
367 errCode = SetLogTriggerStatus(true);
368 if (errCode != E_OK) {
369 return errCode;
370 }
371 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
372 if (errCode != E_OK) {
373 return errCode;
374 }
375 if (!isUpgrade) {
376 return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
377 }
378 return E_OK;
379 }
380
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)381 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
382 {
383 if (!schema.ToSchemaString().empty()) {
384 return E_OK;
385 }
386 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
387 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
388 Value schemaVal;
389 int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
390 if (errCode != E_OK) {
391 return errCode;
392 }
393 if (schemaVal.empty()) {
394 return -E_NOT_FOUND;
395 }
396 std::string schemaStr;
397 DBCommon::VectorToString(schemaVal, schemaStr);
398 errCode = schema.ParseFromTrackerSchemaString(schemaStr);
399 if (errCode != E_OK) {
400 LOGE("Parse from tracker schema string err.");
401 }
402 return errCode;
403 }
404
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
406 {
407 sqlite3_stmt *statement = nullptr;
408 int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
409 if (errCode != E_OK) {
410 LOGE("Execute sql failed when prepare stmt.");
411 return errCode;
412 }
413 size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
414 if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
415 LOGE("Sql bind args mismatch.");
416 SQLiteUtils::ResetStatement(statement, true, errCode);
417 return -E_INVALID_ARGS;
418 }
419 for (size_t i = 0; i < condition.bindArgs.size(); i++) {
420 Type type = condition.bindArgs[i];
421 errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
422 if (errCode != E_OK) {
423 int ret = E_OK;
424 SQLiteUtils::ResetStatement(statement, true, ret);
425 return errCode;
426 }
427 }
428 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
429 VBucket bucket;
430 errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
431 if (errCode != E_OK) {
432 int ret = E_OK;
433 SQLiteUtils::ResetStatement(statement, true, ret);
434 return errCode;
435 }
436 records.push_back(std::move(bucket));
437 }
438 int ret = E_OK;
439 SQLiteUtils::ResetStatement(statement, true, ret);
440 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
441 }
442
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)443 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
444 const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
445 std::set<std::string> &clearWaterMarkTables)
446 {
447 std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
448 for (const auto &table : changeTables) {
449 std::string logTableName = DBCommon::GetLogTableName(table);
450 bool isExists = false;
451 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
452 if (errCode != E_OK) {
453 LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
454 return errCode;
455 }
456 if (!isExists) { // table maybe dropped after set reference
457 LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
458 continue;
459 }
460
461 bool isEmpty = true;
462 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
463 if (errCode != E_OK) {
464 LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
465 clearWaterMarkTables.clear();
466 return errCode;
467 }
468 if (!isEmpty) {
469 clearWaterMarkTables.insert(table);
470 }
471 }
472 LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
473 return E_OK;
474 }
475
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)476 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
477 {
478 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
479 return E_OK;
480 }
481 std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
482 if (schemaChanged) {
483 std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
484 "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
485 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
486 if (ret != E_OK) {
487 LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
488 return ret;
489 }
490 }
491 if (tableInfo.GetTrackerTable().IsEmpty()) {
492 return E_OK;
493 }
494 LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
495 int errCode = SetLogTriggerStatus(false);
496 if (errCode != E_OK) {
497 return errCode;
498 }
499 std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
500 TrackerTable trackerTable = tableInfo.GetTrackerTable();
501 errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
502 [this, &sql]() {
503 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
504 if (ret != E_OK) {
505 LOGE("Upgrade log for extend field failed.");
506 }
507 return ret;
508 });
509 return SetLogTriggerStatus(true);
510 }
511
CreateTempSyncTrigger(const TrackerTable & trackerTable)512 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable)
513 {
514 int errCode = E_OK;
515 std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
516 for (const auto &sql: dropSql) {
517 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
518 if (errCode != E_OK) {
519 LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
520 return errCode;
521 }
522 }
523 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql());
524 if (errCode != E_OK) {
525 LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
526 return errCode;
527 }
528 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql());
529 if (errCode != E_OK) {
530 LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
531 return errCode;
532 }
533 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql());
534 if (errCode != E_OK) {
535 LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
536 }
537 return errCode;
538 }
539
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)540 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
541 ChangeProperties &changeProperties)
542 {
543 std::string fileName;
544 if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
545 LOGE("get db file name failed.");
546 return -E_INVALID_DB;
547 }
548 SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
549 return E_OK;
550 }
551
ClearAllTempSyncTrigger()552 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
553 {
554 sqlite3_stmt *stmt = nullptr;
555 static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
556 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
557 if (errCode != E_OK) {
558 LOGE("get clear all temp trigger stmt failed %d.", errCode);
559 return errCode;
560 }
561 int ret = E_OK;
562 while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
563 std::string str;
564 (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
565 if (errCode != E_OK) {
566 SQLiteUtils::ResetStatement(stmt, true, ret);
567 return errCode;
568 }
569 std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
570 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
571 if (errCode != E_OK) {
572 LOGE("drop temp trigger failed %d.", errCode);
573 SQLiteUtils::ResetStatement(stmt, true, ret);
574 return errCode;
575 }
576 }
577 SQLiteUtils::ResetStatement(stmt, true, ret);
578 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
579 }
580
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)581 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
582 bool isOnlyTrackTable)
583 {
584 std::string sql;
585 if (isOnlyTrackTable) {
586 sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
587 } else {
588 sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log SET extend_field = NULL";
589 }
590 sql += " where data_key = -1 and cursor <= ?;";
591 sqlite3_stmt *statement = nullptr;
592 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
593 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
594 LOGE("get clean tracker data stmt failed %d.", errCode);
595 return errCode;
596 }
597 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
598 int ret = E_OK;
599 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
600 LOGE("bind clean tracker data stmt failed %d.", errCode);
601 SQLiteUtils::ResetStatement(statement, true, ret);
602 return errCode;
603 }
604 errCode = SQLiteUtils::StepWithRetry(statement);
605 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
606 errCode = E_OK;
607 } else {
608 LOGE("clean tracker step failed: %d.", errCode);
609 }
610 SQLiteUtils::ResetStatement(statement, true, ret);
611 return errCode == E_OK ? ret : errCode;
612 }
613
CreateSharedTable(const TableSchema & tableSchema)614 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
615 {
616 std::map<int32_t, std::string> cloudFieldTypeMap;
617 cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
618 cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
619 cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
620 cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
621 cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
622 cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
623 cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
624 cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
625
626 std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
627 std::string primaryKey = ", PRIMARY KEY (";
628 createTableSql += CloudDbConstant::CLOUD_OWNER;
629 createTableSql += " TEXT, ";
630 createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
631 createTableSql += " TEXT";
632 primaryKey += CloudDbConstant::CLOUD_OWNER;
633 bool hasPrimaryKey = false;
634 for (const auto &field : tableSchema.fields) {
635 createTableSql += ", " + field.colName + " ";
636 createTableSql += cloudFieldTypeMap[field.type];
637 createTableSql += field.nullable ? "" : " NOT NULL";
638 if (field.primary) {
639 primaryKey += ", " + field.colName;
640 hasPrimaryKey = true;
641 }
642 }
643 if (hasPrimaryKey) {
644 createTableSql += primaryKey + ")";
645 }
646 createTableSql += ");";
647 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
648 if (errCode != E_OK) {
649 LOGE("Create shared table failed, %d.", errCode);
650 }
651 return errCode;
652 }
653
DeleteTable(const std::vector<std::string> & tableNames)654 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
655 {
656 for (const auto &tableName : tableNames) {
657 std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
658 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
659 if (errCode != E_OK) {
660 LOGE("Delete table failed, %d.", errCode);
661 return errCode;
662 }
663 }
664 return E_OK;
665 }
666
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)667 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
668 const std::map<std::string, std::vector<Field>> &updateTableNames)
669 {
670 int errCode = E_OK;
671 std::map<int32_t, std::string> fieldTypeMap;
672 fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
673 fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
674 fieldTypeMap[TYPE_INDEX<double>] = "REAL";
675 fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
676 fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
677 fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
678 fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
679 fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
680 for (const auto &table : updateTableNames) {
681 if (table.second.empty()) {
682 continue;
683 }
684 std::string addColumnSql = "";
685 for (const auto &field : table.second) {
686 addColumnSql += "ALTER TABLE " + table.first + " ADD ";
687 addColumnSql += field.colName + " ";
688 addColumnSql += fieldTypeMap[field.type];
689 addColumnSql += field.primary ? " PRIMARY KEY" : "";
690 addColumnSql += field.nullable ? ";" : " NOT NULL;";
691 }
692 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
693 if (errCode != E_OK) {
694 LOGE("Shared table add column failed, %d.", errCode);
695 return errCode;
696 }
697 }
698 return errCode;
699 }
700
AlterTableName(const std::map<std::string,std::string> & tableNames)701 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
702 {
703 for (const auto &tableName : tableNames) {
704 std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
705 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
706 if (errCode != E_OK) {
707 LOGE("Alter table name failed, %d.", errCode);
708 return errCode;
709 }
710 }
711 return E_OK;
712 }
713
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)714 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
715 const VBucket &vBucket, std::string &sql)
716 {
717 std::string gidStr;
718 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
719 if (errCode != E_OK) {
720 LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
721 return errCode;
722 }
723
724 sql += " WHERE ";
725 if (!gidStr.empty()) {
726 sql += "cloud_gid = '" + gidStr + "'";
727 }
728 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
729 if (!pkMap.empty()) {
730 if (!gidStr.empty()) {
731 sql += " OR ";
732 }
733 sql += "(hash_key = ?);";
734 }
735 return E_OK;
736 }
737
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)738 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
739 {
740 int ret = E_OK;
741 int errCode = E_OK;
742 for (const auto &tableName: tableNameList) {
743 std::string delDataSql = "DELETE FROM '" + tableName + "';";
744 sqlite3_stmt *statement = nullptr;
745 errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
746 if (errCode != E_OK) {
747 LOGE("get clean shared data stmt failed %d.", errCode);
748 return errCode;
749 }
750 errCode = SQLiteUtils::StepWithRetry(statement);
751 SQLiteUtils::ResetStatement(statement, true, ret);
752 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
753 errCode = E_OK;
754 } else {
755 LOGE("clean shared data failed: %d.", errCode);
756 break;
757 }
758 statement = nullptr;
759 std::string delLogSql = "DELETE FROM '" + DBConstant::RELATIONAL_PREFIX + tableName + "_log';";
760 errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
761 if (errCode != E_OK) {
762 LOGE("get clean shared log stmt failed %d.", errCode);
763 return errCode;
764 }
765 errCode = SQLiteUtils::StepWithRetry(statement);
766 SQLiteUtils::ResetStatement(statement, true, ret);
767 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
768 errCode = E_OK;
769 } else {
770 LOGE("clean shared log failed: %d.", errCode);
771 break;
772 }
773 }
774 return errCode == E_OK ? ret : errCode;
775 }
776
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)777 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
778 const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
779 std::map<int64_t, Entries> &referenceGid)
780 {
781 int errCode = E_OK;
782 for (const auto &[targetTable, targetReference] : tableReference) {
783 errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
784 if (errCode != E_OK) {
785 LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
786 return errCode;
787 }
788 }
789 return errCode;
790 }
791
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)792 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
793 const std::string &targetTable, const CloudSyncBatch &syncBatch,
794 const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
795 {
796 auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
797 if (sourceFields.empty()) {
798 LOGD("[RDBExecutor] source field is empty.");
799 return E_OK;
800 }
801 if (sourceFields.size() != targetFields.size()) {
802 LOGE("[RDBExecutor] reference field size not equal.");
803 return -E_INTERNAL_ERROR;
804 }
805 std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
806 sqlite3_stmt *statement = nullptr;
807 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
808 if (errCode != E_OK) {
809 LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
810 return errCode;
811 }
812 errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
813 int ret = E_OK;
814 SQLiteUtils::ResetStatement(statement, true, ret);
815 return errCode == E_OK ? ret : errCode;
816 }
817
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)818 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
819 const std::string &targetTable, const std::vector<std::string> &sourceFields,
820 const std::vector<std::string> &targetFields)
821 {
822 // sql like this:
823 // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
824 // (SELECT parent._rowid_ AS rowid_b FROM parent,
825 // (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
826 // WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
827 // WHERE parent.name = source_a.name ) temp_table
828 // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
829 std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
830 std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
831 std::string sql;
832 sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
833 sql += "(";
834 sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
835 + ", ";
836 sql += "(SELECT " + sourceTable + "._rowid_,";
837 std::set<std::string> sourceFieldSet;
838 for (const auto &item : sourceFields) {
839 sourceFieldSet.insert(item);
840 }
841 for (const auto &sourceField : sourceFieldSet) {
842 sql += sourceField + ",";
843 }
844 sql.pop_back();
845 sql += " FROM " + sourceTable + ", " + logSourceTable;
846 sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
847 sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
848 sql += " WHERE ";
849 for (size_t i = 0u; i < sourceFields.size(); ++i) {
850 if (i != 0u) {
851 sql += " AND ";
852 }
853 sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
854 }
855 sql += ") temp_table ";
856 sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
857 sql += " AND " + logTargetTable + ".flag&0x08=0x00";
858 return sql;
859 }
860
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)861 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
862 const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
863 {
864 int errCode = E_OK;
865 if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
866 LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
867 syncBatch.timestamp.size());
868 return -E_INVALID_ARGS;
869 }
870 int matchCount = 0;
871 for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
872 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
873 if (errCode != E_OK) {
874 LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
875 break;
876 }
877 errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
878 if (errCode != E_OK) {
879 LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
880 break;
881 }
882 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
883 std::string gid;
884 (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
885 if (gid.empty()) {
886 LOGE("[RDBExecutor] reference data don't contain gid.");
887 errCode = -E_CLOUD_ERROR;
888 break;
889 }
890 referenceGid[syncBatch.rowid[i]][targetTable] = gid;
891 matchCount++;
892 }
893 if (errCode == -E_FINISHED) {
894 errCode = E_OK;
895 }
896 if (errCode != E_OK) {
897 LOGE("[RDBExecutor] step stmt failed %d.", errCode);
898 break;
899 }
900 SQLiteUtils::ResetStatement(statement, false, errCode);
901 if (errCode != E_OK) {
902 LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
903 break;
904 }
905 }
906 if (matchCount != 0) {
907 LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
908 }
909 return errCode;
910 }
911
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)912 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
913 const std::vector<TableReferenceProperty> &targetTableReference)
914 {
915 PairStringVector sourceTargetFiled;
916 for (const auto &reference : targetTableReference) {
917 for (const auto &column : reference.columns) {
918 sourceTargetFiled.first.push_back(column.first);
919 sourceTargetFiled.second.push_back(column.second);
920 }
921 }
922 return sourceTargetFiled;
923 }
924
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)925 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
926 bool ignoreEmptyGid, sqlite3_stmt *&stmt)
927 {
928 int fillGidCount = 0;
929 int errCode = E_OK;
930 for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
931 auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
932 if (gidEntry == cloudDataResult.insData.extend[i].end()) {
933 bool isSkipAssetsMissRecord = false;
934 if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
935 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
936 isSkipAssetsMissRecord = true;
937 }
938 if (ignoreEmptyGid || isSkipAssetsMissRecord) {
939 continue;
940 }
941 errCode = -E_INVALID_ARGS;
942 LOGE("[RDBExecutor] Extend not contain gid.");
943 break;
944 }
945 bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
946 if (ignoreEmptyGid && containError) {
947 continue;
948 }
949 std::string val;
950 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
951 cloudDataResult.insData.extend[i], val) != E_OK) {
952 errCode = -E_INVALID_DATA;
953 LOGE("[RDBExecutor] Can't get string gid from extend.");
954 break;
955 }
956 if (val.empty()) {
957 errCode = -E_CLOUD_ERROR;
958 LOGE("[RDBExecutor] Get empty gid from extend.");
959 break;
960 }
961 errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
962 if (errCode != E_OK) {
963 LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
964 break;
965 }
966 }
967 LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
968 return errCode;
969 }
970
CleanExtendAndCursorForDeleteData(const std::string & tableName)971 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
972 {
973 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
974 std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
975 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
976 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
977 LOGE("update extend field and cursor failed %d.", errCode);
978 }
979 return errCode;
980 }
981
CheckIfExistUserTable(const std::string & tableName)982 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
983 {
984 std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
985 sqlite3_stmt *statement = nullptr;
986 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
987 if (errCode != E_OK) {
988 LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
989 return errCode;
990 }
991 errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
992 if (errCode != E_OK) {
993 LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
994 SQLiteUtils::ResetStatement(statement, true, errCode);
995 return errCode;
996 }
997 if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
998 LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
999 SQLiteUtils::ResetStatement(statement, true, errCode);
1000 return -E_INVALID_ARGS;
1001 }
1002 SQLiteUtils::ResetStatement(statement, true, errCode);
1003 return E_OK;
1004 }
1005
GetCloudDeleteSql(const std::string & logTable)1006 std::string SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &logTable)
1007 {
1008 std::string sql;
1009 sql += " cloud_gid = '', version = '', ";
1010 if (isLogicDelete_) {
1011 // 1001 which is logicDelete|cloudForcePush|local|delete
1012 sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1013 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1014 static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = (SELECT CASE WHEN (MAX(cursor) is "
1015 "null) THEN 1 ELSE MAX(cursor) + 1 END FROM " + logTable + ")";
1016 } else {
1017 sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1018 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1019 }
1020 return sql;
1021 }
1022
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1023 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1024 {
1025 int errCode = E_OK;
1026 std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1027 std::to_string(dataKey);
1028 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1029 if (errCode != E_OK) {
1030 LOGE("[RDBExecutor] remove data failed %d", errCode);
1031 return errCode;
1032 }
1033 std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1034 std::to_string(dataKey);
1035 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1036 if (errCode != E_OK) {
1037 LOGE("[RDBExecutor] remove log failed %d", errCode);
1038 }
1039 return errCode;
1040 }
1041
GetLocalDataKey(size_t index,const DownloadData & downloadData)1042 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1043 const DownloadData &downloadData)
1044 {
1045 if (index >= downloadData.existDataKey.size()) {
1046 LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1047 return -1; // -1 means not exist
1048 }
1049 return downloadData.existDataKey[index];
1050 }
1051
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1052 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1053 sqlite3_stmt *&stmt, int &fillGidCount)
1054 {
1055 int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1056 if (errCode != E_OK) {
1057 return errCode;
1058 }
1059 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1060 if (errCode != E_OK) {
1061 return errCode;
1062 }
1063 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1064 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1065 errCode = E_OK;
1066 fillGidCount++;
1067 SQLiteUtils::ResetStatement(stmt, false, errCode);
1068 } else {
1069 LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1070 }
1071 return errCode;
1072 }
1073
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1074 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1075 const TableInfo &tableInfo, TableSyncType syncType)
1076 {
1077 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1078 return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1079 }
1080
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1081 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1082 const RelationalSchemaObject &localSchema)
1083 {
1084 std::vector<int64_t> dataKeys;
1085 std::string logTableName = DBCommon::GetLogTableName(tableName);
1086 int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1087 if (errCode != E_OK) {
1088 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1089 return errCode;
1090 }
1091 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1092 errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1093 if (errCode != E_OK) {
1094 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1095 }
1096 return errCode;
1097 }
1098
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1099 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1100 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1101 {
1102 int errCode = E_OK;
1103 for (const auto &fieldInfo : fieldInfos) {
1104 if (fieldInfo.IsAssetType()) {
1105 Assets assets;
1106 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1107 if (errCode != E_OK) {
1108 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1109 return errCode;
1110 }
1111 errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1112 if (errCode != E_OK) {
1113 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1114 return errCode;
1115 }
1116 } else if (fieldInfo.IsAssetsType()) {
1117 errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1118 if (errCode != E_OK) {
1119 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1120 return errCode;
1121 }
1122 }
1123 }
1124 return errCode;
1125 }
1126
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1127 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1128 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1129 {
1130 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1131 return E_OK;
1132 }
1133 int errCode = E_OK;
1134 int ret = E_OK;
1135 sqlite3_stmt *stmt = nullptr;
1136 size_t index = 0;
1137 for (const auto &rowId : dataKeys) {
1138 if (rowId == -1) { // -1 means data is deleted
1139 continue;
1140 }
1141 if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1142 index++;
1143 continue;
1144 }
1145 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1146 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1147 errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1148 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1149 LOGE("Get statement failed, %d", errCode);
1150 return errCode;
1151 }
1152 assets[index].assetId = "";
1153 assets[index].status &= ~AssetStatus::UPLOADING;
1154 errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1155 index++;
1156 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1157 LOGE("Bind asset to blob statement failed, %d", errCode);
1158 goto END;
1159 }
1160 errCode = SQLiteUtils::StepWithRetry(stmt);
1161 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1162 errCode = E_OK;
1163 } else {
1164 LOGE("Step statement failed, %d", errCode);
1165 goto END;
1166 }
1167 SQLiteUtils::ResetStatement(stmt, true, ret);
1168 }
1169 return errCode != E_OK ? errCode : ret;
1170 END:
1171 SQLiteUtils::ResetStatement(stmt, true, ret);
1172 return errCode != E_OK ? errCode : ret;
1173 }
1174
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1175 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1176 const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1177 {
1178 int errCode = E_OK;
1179 int ret = E_OK;
1180 sqlite3_stmt *selectStmt = nullptr;
1181 for (const auto &rowId : dataKeys) {
1182 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1183 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1184 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1185 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1186 LOGE("Get select assets statement failed, %d.", errCode);
1187 goto END;
1188 }
1189 Assets assets;
1190 errCode = GetAssetsByRowId(selectStmt, assets);
1191 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1192 LOGE("Get assets by rowId failed, %d.", errCode);
1193 goto END;
1194 }
1195 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1196 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1197 continue;
1198 }
1199 for (auto &asset : assets) {
1200 asset.assetId = "";
1201 asset.status &= ~AssetStatus::UPLOADING;
1202 }
1203 std::vector<uint8_t> assetsValue;
1204 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1205 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1206 LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1207 return errCode;
1208 }
1209 errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1210 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1211 LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1212 return errCode;
1213 }
1214 }
1215 return errCode != E_OK ? errCode : ret;
1216 END:
1217 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1218 return errCode != E_OK ? errCode : ret;
1219 }
1220
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1221 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1222 const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1223 {
1224 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1225 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1226 sqlite3_stmt *stmt = nullptr;
1227 int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1228 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1229 LOGE("Get statement failed, %d", errCode);
1230 SQLiteUtils::ResetStatement(stmt, true, errCode);
1231 return errCode;
1232 }
1233 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1234 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1235 SQLiteUtils::ResetStatement(stmt, true, errCode);
1236 return errCode;
1237 }
1238 errCode = SQLiteUtils::StepWithRetry(stmt);
1239 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1240 errCode = E_OK;
1241 }
1242 SQLiteUtils::ResetStatement(stmt, true, errCode);
1243 return errCode;
1244 }
1245
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1246 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1247 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1248 {
1249 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1250 auto &[errCode, status] = res;
1251 std::vector<Field> assetFields;
1252 std::string sql = "SELECT";
1253 for (const auto &field: tableSchema.fields) {
1254 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1255 assetFields.emplace_back(field);
1256 sql += " b." + field.colName + ",";
1257 }
1258 }
1259 if (assetFields.empty()) {
1260 return { -E_NOT_FOUND, status };
1261 }
1262 sql += "a.cloud_gid, a.status ";
1263 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1264 (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR a.hash_key = ?);");
1265 sqlite3_stmt *stmt = nullptr;
1266 errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1267 if (errCode != E_OK) {
1268 return res;
1269 }
1270 errCode = SQLiteUtils::StepWithRetry(stmt);
1271 int index = 0;
1272 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1273 for (const auto &field: assetFields) {
1274 Type cloudValue;
1275 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1276 if (errCode != E_OK) {
1277 break;
1278 }
1279 errCode = PutVBucketByType(assets, field, cloudValue);
1280 if (errCode != E_OK) {
1281 break;
1282 }
1283 }
1284 std::string curGid;
1285 errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1286 if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1287 // Gid is different, there may be duplicate primary keys in the cloud
1288 errCode = -E_CLOUD_GID_MISMATCH;
1289 }
1290 status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1291 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1292 errCode = -E_NOT_FOUND;
1293 } else {
1294 LOGE("step get asset stmt failed %d.", errCode);
1295 }
1296 SQLiteUtils::ResetStatement(stmt, true, errCode);
1297 return res;
1298 }
1299
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1300 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1301 const Bytes &hashKey, sqlite3_stmt *&stmt)
1302 {
1303 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1304 if (errCode != E_OK) {
1305 LOGE("Get asset statement failed, %d.", errCode);
1306 return errCode;
1307 }
1308 int index = 1;
1309 if (!gid.empty()) {
1310 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1311 if (errCode != E_OK) {
1312 LOGE("bind gid failed %d.", errCode);
1313 SQLiteUtils::ResetStatement(stmt, true, errCode);
1314 return errCode;
1315 }
1316 }
1317 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1318 if (errCode != E_OK) {
1319 LOGE("bind hash failed %d.", errCode);
1320 SQLiteUtils::ResetStatement(stmt, true, errCode);
1321 }
1322 return errCode;
1323 }
1324
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1325 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1326 bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1327 {
1328 int errCode = E_OK;
1329 switch (opType) {
1330 case OpType::UPDATE_VERSION: // fallthrough
1331 case OpType::INSERT_VERSION: {
1332 errCode = FillCloudVersionForUpload(opType, data);
1333 break;
1334 }
1335 case OpType::SET_UPLOADING: {
1336 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1337 if (errCode != E_OK) {
1338 LOGE("Failed to set uploading for ins data, %d.", errCode);
1339 return errCode;
1340 }
1341 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1342 break;
1343 }
1344 case OpType::INSERT: {
1345 errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1346 if (errCode != E_OK) {
1347 LOGE("Failed to fill cloud log gid, %d.", errCode);
1348 return errCode;
1349 }
1350 if (fillAsset) {
1351 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1352 if (errCode != E_OK) {
1353 LOGE("Failed to fill asset for ins, %d.", errCode);
1354 return errCode;
1355 }
1356 }
1357 errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1358 break;
1359 }
1360 case OpType::UPDATE: {
1361 if (fillAsset && !data.updData.assets.empty()) {
1362 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1363 if (errCode != E_OK) {
1364 LOGE("Failed to fill asset for upd, %d.", errCode);
1365 return errCode;
1366 }
1367 }
1368 errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1369 break;
1370 }
1371 default:
1372 break;
1373 }
1374 return errCode;
1375 }
1376
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1377 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1378 {
1379 int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1380 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1381 std::vector<uint8_t> blobValue;
1382 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1383 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1384 LOGE("Get column blob value failed %d.", errCode);
1385 return errCode;
1386 }
1387 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1388 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1389 LOGE("Transfer blob to assets failed %d", errCode);
1390 }
1391 return errCode;
1392 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1393 return E_OK;
1394 } else {
1395 LOGE("Step select statement failed %d.", errCode);
1396 return errCode;
1397 }
1398 }
1399
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1400 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1401 {
1402 assetLoader_ = loader;
1403 }
1404
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1406 int beginIndex, const std::string &cloudGid)
1407 {
1408 int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1409 if (errCode != E_OK) {
1410 LOGE("Bind cloud gid to statement failed %d.", errCode);
1411 int ret = E_OK;
1412 SQLiteUtils::ResetStatement(stmt, true, ret);
1413 return errCode;
1414 }
1415 errCode = SQLiteUtils::StepWithRetry(stmt);
1416 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1417 errCode = E_OK;
1418 } else {
1419 LOGE("Fill cloud asset failed: %d.", errCode);
1420 }
1421 int ret = E_OK;
1422 SQLiteUtils::ResetStatement(stmt, true, ret);
1423 return errCode != E_OK ? errCode : ret;
1424 }
1425
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1426 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1427 const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1428 {
1429 if (assetLoader_ == nullptr) {
1430 LOGE("assetLoader may be not set.");
1431 return -E_NOT_SET;
1432 }
1433 std::vector<Asset> toDeleteAssets;
1434 CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1435 if (toDeleteAssets.empty()) {
1436 return E_OK;
1437 }
1438 DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1439 if (ret != OK) {
1440 LOGE("remove local assets failed %d.", ret);
1441 return -E_REMOVE_ASSETS_FAILED;
1442 }
1443 return E_OK;
1444 }
1445
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1446 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1447 const VBucket &assets, sqlite3_stmt *&statement)
1448 {
1449 std::string sql = "UPDATE '" + tableName + "' SET ";
1450 for (const auto &item: assets) {
1451 sql += item.first + " = ?,";
1452 }
1453 sql.pop_back();
1454 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1455 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1456 if (errCode != E_OK) {
1457 return errCode;
1458 }
1459 int bindIndex = 1;
1460 for (const auto &item: assets) {
1461 Field field = {
1462 .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1463 };
1464 errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1465 if (errCode != E_OK) {
1466 return errCode;
1467 }
1468 }
1469 return errCode;
1470 }
1471
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1472 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1473 const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1474 {
1475 if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1476 opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1477 return E_OK;
1478 }
1479 if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1480 // this is shared table, not need to update asset id.
1481 return E_OK;
1482 }
1483 int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1484 if (errCode != E_OK) {
1485 LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1486 }
1487 return errCode;
1488 }
1489
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1490 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1491 Asset &asset)
1492 {
1493 for (const auto &[col, value] : vBucket) {
1494 if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1495 asset = std::get<Asset>(value);
1496 }
1497 }
1498 }
1499
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1500 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1501 Assets &assets)
1502 {
1503 for (const auto &[col, value] : vBucket) {
1504 if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1505 assets = std::get<Assets>(value);
1506 }
1507 }
1508 }
1509
UpdateLocalAssetsIdInner(const Assets & cloudAssets,Assets & assets)1510 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsIdInner(const Assets &cloudAssets, Assets &assets)
1511 {
1512 for (const auto &cloudAsset : cloudAssets) {
1513 for (auto &asset : assets) {
1514 if (asset.name == cloudAsset.name) {
1515 asset.assetId = cloudAsset.assetId;
1516 }
1517 }
1518 }
1519 }
1520
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1521 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1522 sqlite3_stmt *&stmt)
1523 {
1524 std::vector<uint8_t> blobValue;
1525 int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1526 if (errCode != E_OK) {
1527 LOGE("Transfer asset to blob failed, %d.", errCode);
1528 return errCode;
1529 }
1530 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1531 if (errCode != E_OK) {
1532 LOGE("Bind asset blob to statement failed, %d.", errCode);
1533 }
1534 return errCode;
1535 }
1536
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1537 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1538 sqlite3_stmt *&stmt)
1539 {
1540 std::vector<uint8_t> blobValue;
1541 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1542 if (errCode != E_OK) {
1543 LOGE("Transfer asset to blob failed, %d.", errCode);
1544 return errCode;
1545 }
1546 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1547 if (errCode != E_OK) {
1548 LOGE("Bind asset blob to statement failed, %d.", errCode);
1549 }
1550 return errCode;
1551 }
1552
GetAssetOnTableInner(sqlite3_stmt * & stmt,Asset & asset)1553 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTableInner(sqlite3_stmt *&stmt, Asset &asset)
1554 {
1555 int errCode = SQLiteUtils::StepWithRetry(stmt);
1556 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1557 std::vector<uint8_t> blobValue;
1558 errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1559 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1560 LOGE("[RDBExecutor][GetAssetOnTableInner] Get column blob value failed, %d.", errCode);
1561 return errCode;
1562 }
1563 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
1564 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1565 LOGE("[RDBExecutor] Transfer blob to asset failed, %d.", errCode);
1566 }
1567 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1568 errCode = E_OK;
1569 } else {
1570 LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1571 }
1572 return errCode;
1573 }
1574
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Asset & asset)1575 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
1576 const std::string &fieldName, const int64_t dataKey, Asset &asset)
1577 {
1578 sqlite3_stmt *selectStmt = nullptr;
1579 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
1580 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1581 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
1582 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1583 LOGE("Get select asset statement failed, %d.", errCode);
1584 return errCode;
1585 }
1586 errCode = GetAssetOnTableInner(selectStmt, asset);
1587 int ret = E_OK;
1588 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1589 return errCode != E_OK ? errCode : ret;
1590 }
1591
GetAssetsOnTableInner(sqlite3_stmt * & stmt,Assets & assets)1592 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTableInner(sqlite3_stmt *&stmt, Assets &assets)
1593 {
1594 int errCode = SQLiteUtils::StepWithRetry(stmt);
1595 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1596 std::vector<uint8_t> blobValue;
1597 errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1598 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1599 LOGE("[RDBExecutor][GetAssetsOnTableInner] Get column blob value failed, %d.", errCode);
1600 return errCode;
1601 }
1602 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1603 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1604 LOGE("[RDBExecutor] Transfer blob to assets failed, %d.", errCode);
1605 }
1606 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1607 errCode = E_OK;
1608 } else {
1609 LOGE("[RDBExecutor] Step failed when get assets from table, errCode = %d.", errCode);
1610 }
1611 return errCode;
1612 }
1613
GetAssetsOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Assets & assets)1614 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTable(const std::string &tableName,
1615 const std::string &fieldName, const int64_t dataKey, Assets &assets)
1616 {
1617 sqlite3_stmt *selectStmt = nullptr;
1618 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1619 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1620 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1621 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1622 LOGE("Get select assets statement failed, %d.", errCode);
1623 return errCode;
1624 }
1625 errCode = GetAssetsOnTableInner(selectStmt, assets);
1626 int ret = E_OK;
1627 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1628 return errCode != E_OK ? errCode : ret;
1629 }
1630
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1631 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1632 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1633 {
1634 int assetIndex = 0;
1635 int assetsIndex = 0;
1636 for (const auto &field : tableSchema.fields) {
1637 if (field.type == TYPE_INDEX<Asset>) {
1638 if (assetOfOneRecord[assetIndex].name.empty()) {
1639 continue;
1640 }
1641 int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1642 if (errCode != E_OK) {
1643 LOGE("Bind asset to blob statement failed, %d.", errCode);
1644 return errCode;
1645 }
1646 assetIndex++;
1647 } else if (field.type == TYPE_INDEX<Assets>) {
1648 if (assetsOfOneRecord[assetsIndex].empty()) {
1649 continue;
1650 }
1651 int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1652 if (errCode != E_OK) {
1653 LOGE("Bind assets to blob statement failed, %d.", errCode);
1654 return errCode;
1655 }
1656 assetsIndex++;
1657 }
1658 }
1659 return E_OK;
1660 }
1661
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1662 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1663 const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1664 {
1665 int errCode = E_OK;
1666 int ret = E_OK;
1667 sqlite3_stmt *stmt = nullptr;
1668 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1669 if (errCode != E_OK) {
1670 LOGE("Get update asset statement failed, %d.", errCode);
1671 return errCode;
1672 }
1673 errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1674 if (errCode != E_OK) {
1675 LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1676 SQLiteUtils::ResetStatement(stmt, true, ret);
1677 return errCode != E_OK ? errCode : ret;
1678 }
1679 errCode = SQLiteUtils::StepWithRetry(stmt);
1680 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1681 errCode = E_OK;
1682 } else {
1683 LOGE("Step statement failed, %d", errCode);
1684 }
1685 SQLiteUtils::ResetStatement(stmt, true, ret);
1686 return errCode != E_OK ? errCode : ret;
1687 }
1688
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1689 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1690 const VBucket &vBucket)
1691 {
1692 int errCode = E_OK;
1693 std::vector<Asset> assetOfOneRecord;
1694 std::vector<Assets> assetsOfOneRecord;
1695 std::string updateAssetIdSql = "UPDATE " + tableSchema.name + " SET";
1696 for (const auto &field : tableSchema.fields) {
1697 if (field.type == TYPE_INDEX<Asset>) {
1698 Asset asset;
1699 UpdateLocalAssetId(vBucket, field.colName, asset);
1700 assetOfOneRecord.push_back(asset);
1701 if (!asset.name.empty()) {
1702 updateAssetIdSql += " " + field.colName + " = ?,";
1703 }
1704 }
1705 if (field.type == TYPE_INDEX<Assets>) {
1706 Assets assets;
1707 UpdateLocalAssetsId(vBucket, field.colName, assets);
1708 assetsOfOneRecord.push_back(assets);
1709 if (!assets.empty()) {
1710 updateAssetIdSql += " " + field.colName + " = ?,";
1711 }
1712 }
1713 }
1714 if (updateAssetIdSql == "UPDATE " + tableSchema.name + " SET") {
1715 return E_OK;
1716 }
1717 updateAssetIdSql.pop_back();
1718 updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1719 errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1720 if (errCode != E_OK) {
1721 LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1722 }
1723 return errCode;
1724 }
1725
SetPutDataMode(PutDataMode mode)1726 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1727 {
1728 putDataMode_ = mode;
1729 }
1730
SetMarkFlagOption(MarkFlagOption option)1731 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1732 {
1733 markFlagOption_ = option;
1734 }
1735
GetDataFlag()1736 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1737 {
1738 if (putDataMode_ != PutDataMode::USER) {
1739 return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1740 static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY);
1741 }
1742 uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1743 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1744 flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1745 }
1746 flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY);
1747 return static_cast<int64_t>(flag);
1748 }
1749
GetUpdateDataFlagSql()1750 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql()
1751 {
1752 if (putDataMode_ == PutDataMode::SYNC) {
1753 return UPDATE_FLAG_CLOUD;
1754 }
1755 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1756 return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1757 }
1758 return UPDATE_FLAG_CLOUD;
1759 }
1760
GetDev()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1762 {
1763 return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1764 }
1765
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1766 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1767 const TableSchema &tableSchema)
1768 {
1769 std::set<std::string> useFields;
1770 std::vector<Field> fields;
1771 if (putDataMode_ == PutDataMode::SYNC) {
1772 for (const auto &field : tableSchema.fields) {
1773 useFields.insert(field.colName);
1774 }
1775 fields = tableSchema.fields;
1776 } else {
1777 for (const auto &field : vBucket) {
1778 if (field.first.empty() || field.first[0] == '#') {
1779 continue;
1780 }
1781 useFields.insert(field.first);
1782 }
1783 for (const auto &field : tableSchema.fields) {
1784 if (useFields.find(field.colName) == useFields.end()) {
1785 continue;
1786 }
1787 fields.push_back(field);
1788 }
1789 }
1790 return fields;
1791 }
1792
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1793 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1794 const LogInfo &logInfo)
1795 {
1796 bool useHashKey = false;
1797 if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1798 if (logInfo.hashKey.empty()) {
1799 LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1800 return -E_INVALID_ARGS;
1801 }
1802 useHashKey = true;
1803 }
1804 sqlite3_stmt *stmt = nullptr;
1805 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1806 if (errCode != E_OK) {
1807 LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1808 return errCode;
1809 }
1810 int ret = E_OK;
1811 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1812 if (errCode != E_OK) {
1813 LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1814 SQLiteUtils::ResetStatement(stmt, true, ret);
1815 return errCode;
1816 }
1817 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1818 if (errCode != E_OK) {
1819 LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1820 SQLiteUtils::ResetStatement(stmt, true, ret);
1821 return errCode;
1822 }
1823 if (useHashKey) {
1824 errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1825 if (errCode != E_OK) {
1826 LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1827 SQLiteUtils::ResetStatement(stmt, true, ret);
1828 return errCode;
1829 }
1830 }
1831 errCode = SQLiteUtils::StepWithRetry(stmt);
1832 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1833 errCode = E_OK;
1834 } else {
1835 LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1836 }
1837 SQLiteUtils::ResetStatement(stmt, true, ret);
1838 return errCode == E_OK ? ret : errCode;
1839 }
1840
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp)1841 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1842 const Key &hashKey, Timestamp timestamp)
1843 {
1844 sqlite3_stmt *stmt = nullptr;
1845 int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName),
1846 stmt);
1847 int index = 1;
1848 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1849 if (errCode != E_OK) {
1850 SQLiteUtils::ResetStatement(stmt, true, errCode);
1851 LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1852 return;
1853 }
1854 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1855 if (errCode != E_OK) {
1856 SQLiteUtils::ResetStatement(stmt, true, errCode);
1857 LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1858 return;
1859 }
1860 errCode = SQLiteUtils::StepWithRetry(stmt);
1861 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1862 errCode = E_OK;
1863 } else {
1864 LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1865 }
1866 SQLiteUtils::ResetStatement(stmt, true, errCode);
1867 }
1868
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data)1869 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1870 std::vector<VBucket> &data)
1871 {
1872 std::string sql = "SELECT ";
1873 std::vector<Field> pkFields;
1874 for (const auto &field : table.fields) {
1875 if (!field.primary) {
1876 continue;
1877 }
1878 sql += "b." + field.colName + ",";
1879 pkFields.push_back(field);
1880 }
1881 if (pkFields.empty()) {
1882 // ignore no pk table
1883 return E_OK;
1884 }
1885 sql.pop_back();
1886 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1887 sqlite3_stmt *stmt = nullptr;
1888 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1889 if (errCode != E_OK) {
1890 LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1891 return errCode;
1892 }
1893 do {
1894 errCode = SQLiteUtils::StepWithRetry(stmt);
1895 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1896 VBucket pkData;
1897 errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1898 if (errCode != E_OK) {
1899 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1900 break;
1901 }
1902 data.push_back(pkData);
1903 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1904 errCode = E_OK;
1905 break;
1906 } else {
1907 LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1908 break;
1909 }
1910 } while (errCode == E_OK);
1911 int ret = E_OK;
1912 SQLiteUtils::ResetStatement(stmt, true, ret);
1913 return errCode == E_OK ? ret : errCode;
1914 }
1915
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1916 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1917 int startIndex, VBucket &record)
1918 {
1919 int errCode = E_OK;
1920 for (const auto &field : fields) {
1921 Type cloudValue;
1922 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1923 if (errCode != E_OK) {
1924 break;
1925 }
1926 errCode = PutVBucketByType(record, field, cloudValue);
1927 if (errCode != E_OK) {
1928 break;
1929 }
1930 startIndex++;
1931 }
1932 return errCode;
1933 }
1934
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)1935 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1936 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
1937 {
1938 int errCode = E_OK;
1939 std::string version;
1940 if (putDataMode_ == PutDataMode::SYNC) {
1941 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1942 if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1943 LOGE("get version for insert log statement failed, %d", errCode);
1944 return -E_CLOUD_ERROR;
1945 }
1946 }
1947 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 10, version); // 10 is version
1948 if (errCode != E_OK) {
1949 LOGE("Bind version to insert log statement failed, %d", errCode);
1950 return errCode;
1951 }
1952
1953 std::string shareUri;
1954 if (putDataMode_ == PutDataMode::SYNC) {
1955 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1956 vBucket, shareUri);
1957 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1958 LOGE("get shareUri for insert log statement failed, %d", errCode);
1959 return -E_CLOUD_ERROR;
1960 }
1961 }
1962
1963 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 11, shareUri); // 11 is sharing_resource
1964 if (errCode != E_OK) {
1965 LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1966 }
1967 return errCode;
1968 }
1969
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1970 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1971 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1972 {
1973 if (downloadData.data.size() != downloadData.opType.size()) {
1974 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1975 return -E_CLOUD_ERROR;
1976 }
1977 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1978 " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1979 sqlite3_stmt *stmt = nullptr;
1980 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1981 if (errCode != E_OK) {
1982 LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1983 return errCode;
1984 }
1985 int ret = E_OK;
1986 int index = 0;
1987 for (const auto &data: downloadData.data) {
1988 SQLiteUtils::ResetStatement(stmt, false, ret);
1989 OpType opType = downloadData.opType[index++];
1990 if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1991 continue;
1992 }
1993 errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1994 if (errCode != E_OK) {
1995 break;
1996 }
1997 }
1998 SQLiteUtils::ResetStatement(stmt, true, ret);
1999 return errCode == E_OK ? ret : errCode;
2000 }
2001
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)2002 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
2003 const CloudSyncBatch &batchData)
2004 {
2005 if (batchData.extend.empty()) {
2006 return E_OK;
2007 }
2008 if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
2009 LOGE("invalid sync data for filling version.");
2010 return -E_INVALID_ARGS;
2011 }
2012 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
2013 "' SET version = ? WHERE hash_key = ? ";
2014 sqlite3_stmt *stmt = nullptr;
2015 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2016 if (errCode != E_OK) {
2017 return errCode;
2018 }
2019 int ret = E_OK;
2020 for (size_t i = 0; i < batchData.extend.size(); ++i) {
2021 errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
2022 if (errCode != E_OK) {
2023 LOGE("bind update version stmt failed.");
2024 SQLiteUtils::ResetStatement(stmt, true, ret);
2025 return errCode;
2026 }
2027 }
2028 SQLiteUtils::ResetStatement(stmt, true, ret);
2029 return ret;
2030 }
2031
QueryCount(const std::string & tableName,int64_t & count)2032 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
2033 {
2034 return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
2035 }
2036
CheckInventoryData(const std::string & tableName)2037 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
2038 {
2039 int64_t dataCount = 0;
2040 int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
2041 if (errCode != E_OK) {
2042 LOGE("Query count failed.", errCode);
2043 return errCode;
2044 }
2045 return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
2046 }
2047 } // namespace DistributedDB
2048 #endif