1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36 std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38 if (assetFields.empty() && pkSet.empty()) {
39 return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40 }
41 std::string gid;
42 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44 LOGE("Get cloud gid fail when query log table.");
45 return errCode;
46 }
47
48 if (pkSet.empty() && gid.empty()) {
49 LOGE("query log table failed because of both primary key and gid are empty.");
50 return -E_CLOUD_ERROR;
51 }
52 std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53 " a.cloud_gid, a.sharing_resource, a.status, a.version";
54 for (const auto &field : assetFields) {
55 sql += ", b." + field.colName;
56 }
57 for (const auto &pk : pkSet) {
58 sql += ", b." + pk;
59 }
60 sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61 if (!gid.empty()) {
62 sql += " a.cloud_gid = ? or ";
63 }
64 sql += "a.hash_key = ?";
65 querySql = sql;
66 return E_OK;
67 }
68
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70 const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72 std::string sql = "UPDATE " + tableName + " SET ";
73 for (const auto &field: fields) {
74 sql += field.colName + " = ?,";
75 }
76 sql.pop_back();
77 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78 sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79 sqlite3_stmt *stmt = nullptr;
80 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81 if (errCode != E_OK) {
82 LOGE("Get fill asset statement failed, %d.", errCode);
83 return errCode;
84 }
85 int ret = E_OK;
86 for (size_t i = 0; i < fields.size(); ++i) {
87 errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
88 if (errCode != E_OK) {
89 SQLiteUtils::ResetStatement(stmt, true, ret);
90 return errCode;
91 }
92 }
93 statement = stmt;
94 return errCode;
95 }
96
CleanDownloadingFlagByGid(const std::string & tableName,const std::string & gid,VBucket dbAssets)97 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlagByGid(const std::string &tableName,
98 const std::string &gid, VBucket dbAssets)
99 {
100 if (CloudStorageUtils::IsAssetsContainDownloadRecord(dbAssets)) {
101 return E_OK;
102 }
103 std::string sql;
104 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000) where cloud_gid = ?;";
105 sqlite3_stmt *stmt = nullptr;
106 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
107 if (errCode != E_OK) {
108 LOGE("[RDBExecutor]Get stmt failed clean downloading flag:%d, tableName:%s, length:%zu",
109 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
110 return errCode;
111 }
112 int ret = E_OK;
113 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
114 if (errCode != E_OK) {
115 LOGE("[RDBExecutor]bind gid failed when clean downloading flag:%d, tableName:%s, length:%zu",
116 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
117 SQLiteUtils::ResetStatement(stmt, true, ret);
118 return errCode;
119 }
120 errCode = SQLiteUtils::StepWithRetry(stmt);
121 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
122 errCode = E_OK;
123 } else {
124 LOGE("[RDBExecutor]clean downloading flag failed:%d, tableName:%s, length:%zu",
125 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
126 }
127 SQLiteUtils::ResetStatement(stmt, true, ret);
128 if (ret != E_OK) {
129 LOGE("[RDBExecutor]reset stmt when clean downloading flag:%d, tableName:%s, length:%zu",
130 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
131 }
132 return errCode != E_OK ? errCode : ret;
133 }
134
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess,uint64_t & currCursor)135 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
136 VBucket &vBucket, bool isDownloadSuccess, uint64_t &currCursor)
137 {
138 std::string cloudGid;
139 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
140 if (errCode != E_OK) {
141 LOGE("Miss gid when fill Asset.");
142 return errCode;
143 }
144 std::vector<Field> assetsField;
145 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
146 if (errCode != E_OK) {
147 LOGE("No assets need to be filled.");
148 return errCode;
149 }
150 CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
151
152 Bytes hashKey;
153 (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
154 VBucket dbAssets;
155 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
156 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
157 LOGE("get assets by gid or hashkey failed %d.", errCode);
158 return errCode;
159 }
160 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
161 AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
162
163 if (isDownloadSuccess) {
164 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
165 CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
166 errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid, currCursor);
167 if (errCode != E_OK) {
168 return errCode;
169 }
170 } else {
171 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
172 CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
173 }
174 sqlite3_stmt *stmt = nullptr;
175 errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179 errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
180 if (errCode != E_OK) {
181 return errCode;
182 }
183 errCode = CleanDownloadingFlagByGid(tableSchema.name, cloudGid, dbAssets);
184 int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
185 return errCode == E_OK ? ret : errCode;
186 }
187
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid,uint64_t & currCursor)188 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
189 const std::string &gid, uint64_t &currCursor)
190 {
191 uint64_t cursor = DBConstant::INVALID_CURSOR;
192 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
193 if (errCode != E_OK) {
194 LOGE("Get cursor of table[%s length[%u]] failed when increase cursor: %d, gid[%s]",
195 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode, gid.c_str());
196 return errCode;
197 }
198 cursor++;
199 std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log";
200 sql += " SET cursor = ? where cloud_gid = ?;";
201 sqlite3_stmt *statement = nullptr;
202 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
203 if (errCode != E_OK) {
204 LOGE("get update asset data cursor stmt failed %d.", errCode);
205 return errCode;
206 }
207 ResFinalizer finalizer([statement]() {
208 sqlite3_stmt *statementInner = statement;
209 int ret = E_OK;
210 SQLiteUtils::ResetStatement(statementInner, true, ret);
211 if (ret != E_OK) {
212 LOGW("Reset stmt failed %d when increase cursor on asset data", ret);
213 }
214 });
215 int index = 1;
216 errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
217 if (errCode != E_OK) {
218 LOGE("bind cursor data stmt failed %d.", errCode);
219 return errCode;
220 }
221 errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
222 if (errCode != E_OK) {
223 LOGE("bind cursor gid data stmt failed %d.", errCode);
224 return errCode;
225 }
226 errCode = SQLiteUtils::StepWithRetry(statement, false);
227 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
228 LOGE("Fill upload asset failed:%d.", errCode);
229 return errCode;
230 }
231 currCursor = cursor;
232 errCode = SetCursor(tableName, cursor);
233 if (errCode != E_OK) {
234 LOGE("Upgrade cursor failed after asset download success %d.", errCode);
235 }
236 return errCode;
237 }
238
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)239 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
240 const CloudSyncBatch &data)
241 {
242 int errCode = E_OK;
243 if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
244 return errCode;
245 }
246 errCode = SetLogTriggerStatus(false);
247 if (errCode != E_OK) {
248 LOGE("Fail to set log trigger off, %d.", errCode);
249 return errCode;
250 }
251 sqlite3_stmt *stmt = nullptr;
252 for (size_t i = 0; i < data.assets.size(); ++i) {
253 if (data.assets.at(i).empty()) {
254 continue;
255 }
256 if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
257 DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
258 continue;
259 }
260 errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
261 if (errCode != E_OK) {
262 if (errCode == -E_NOT_FOUND) {
263 errCode = E_OK;
264 continue;
265 }
266 break;
267 }
268 errCode = SQLiteUtils::StepWithRetry(stmt, false);
269 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
270 LOGE("Fill upload asset failed:%d.", errCode);
271 break;
272 }
273 errCode = E_OK;
274 SQLiteUtils::ResetStatement(stmt, true, errCode);
275 stmt = nullptr;
276 if (errCode != E_OK) {
277 break;
278 }
279 }
280 int ret = E_OK;
281 SQLiteUtils::ResetStatement(stmt, true, ret);
282 int endCode = SetLogTriggerStatus(true);
283 if (endCode != E_OK) {
284 LOGE("Fail to set log trigger off, %d.", endCode);
285 return endCode;
286 }
287 return errCode != E_OK ? errCode : ret;
288 }
289
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)290 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
291 {
292 switch (opType) {
293 case OpType::UPDATE_VERSION:
294 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
295 case OpType::INSERT_VERSION:
296 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
297 default:
298 LOGE("Fill version with unknown type %d", static_cast<int>(opType));
299 return -E_INVALID_ARGS;
300 }
301 }
302
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)303 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
304 sqlite3_stmt *&stmt)
305 {
306 int errCode = E_OK;
307 std::string version;
308 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
309 vBucket, version) != E_OK) {
310 LOGW("get version from vBucket failed.");
311 }
312 if (hashKey.empty()) {
313 LOGE("hash key is empty when update version.");
314 return -E_CLOUD_ERROR;
315 }
316 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
317 if (errCode != E_OK) {
318 return errCode;
319 }
320 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
321 if (errCode != E_OK) {
322 return errCode;
323 }
324 errCode = SQLiteUtils::StepWithRetry(stmt, false);
325 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
326 errCode = E_OK;
327 SQLiteUtils::ResetStatement(stmt, false, errCode);
328 } else {
329 LOGE("step version stmt failed: %d.", errCode);
330 }
331 return errCode;
332 }
333
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)334 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
335 const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
336 {
337 VBucket vBucket = data.assets.at(index);
338 VBucket dbAssets;
339 std::string cloudGid;
340 int errCode;
341 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
342 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
343 if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
344 return errCode;
345 }
346 AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
347 AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
348 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
349 action);
350 if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
351 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
352 CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
353 } else {
354 if (DBCommon::IsRecordError(data.extend.at(index)) || DBCommon::IsRecordAssetsMissing(data.extend.at(index))) {
355 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
356 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
357 } else {
358 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
359 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
360 }
361 }
362
363 errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
364 if (errCode != E_OK) {
365 LOGE("get and bind asset failed %d.", errCode);
366 return errCode;
367 }
368 int64_t rowid = data.rowid[index];
369 return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
370 }
371
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)372 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
373 TableInfo &tableInfo)
374 {
375 return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
376 }
377
CreateTrackerTable(const TrackerTable & trackerTable,const TableInfo & table,bool checkData)378 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable,
379 const TableInfo &table, bool checkData)
380 {
381 std::unique_ptr<SqliteLogTableManager> tableManager = std::make_unique<SimpleTrackerLogTableManager>();
382 if (trackerTable.IsEmpty()) {
383 // drop trigger
384 return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
385 }
386
387 // create log table
388 int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
389 if (errCode != E_OK) {
390 return errCode;
391 }
392 // init cursor
393 errCode = InitCursorToMeta(table.GetTableName());
394 if (errCode != E_OK) {
395 return errCode;
396 }
397 errCode = GeneLogInfoForExistedData(dbHandle_, "", table, tableManager, true);
398 if (errCode != E_OK) {
399 LOGE("general tracker log info for existed data failed %d.", errCode);
400 return errCode;
401 }
402 errCode = SetLogTriggerStatus(true);
403 if (errCode != E_OK) {
404 return errCode;
405 }
406 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
407 if (errCode != E_OK) {
408 return errCode;
409 }
410 if (checkData) {
411 return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
412 }
413 return E_OK;
414 }
415
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)416 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
417 {
418 if (!schema.ToSchemaString().empty()) {
419 return E_OK;
420 }
421 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY,
422 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY));
423 Value schemaVal;
424 int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
425 if (errCode != E_OK) {
426 return errCode;
427 }
428 if (schemaVal.empty()) {
429 return -E_NOT_FOUND;
430 }
431 std::string schemaStr;
432 DBCommon::VectorToString(schemaVal, schemaStr);
433 errCode = schema.ParseFromTrackerSchemaString(schemaStr);
434 if (errCode != E_OK) {
435 LOGE("Parse from tracker schema string err.");
436 }
437 return errCode;
438 }
439
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)440 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
441 {
442 sqlite3_stmt *statement = nullptr;
443 int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
444 if (errCode != E_OK) {
445 LOGE("Execute sql failed when prepare stmt.");
446 return errCode;
447 }
448 if (condition.readOnly && !SQLiteUtils::IsStmtReadOnly(statement)) {
449 LOGW("[ExecuteSql] The condition is read-only, but SQL is not read-only");
450 }
451 size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
452 if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
453 LOGE("Sql bind args mismatch.");
454 SQLiteUtils::ResetStatement(statement, true, errCode);
455 return -E_INVALID_ARGS;
456 }
457 int ret = E_OK;
458 for (size_t i = 0; i < condition.bindArgs.size(); i++) {
459 Type type = condition.bindArgs[i];
460 errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
461 if (errCode != E_OK) {
462 SQLiteUtils::ResetStatement(statement, true, ret);
463 return errCode;
464 }
465 }
466 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
467 VBucket bucket;
468 errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
469 if (errCode != E_OK) {
470 SQLiteUtils::ResetStatement(statement, true, ret);
471 return errCode;
472 }
473 records.push_back(std::move(bucket));
474 }
475 SQLiteUtils::ResetStatement(statement, true, ret);
476 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
477 }
478
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)479 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
480 const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
481 std::set<std::string> &clearWaterMarkTables)
482 {
483 bool isRefNotSet = false;
484 std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty, isRefNotSet);
485 for (const auto &table : changeTables) {
486 std::string logTableName = DBCommon::GetLogTableName(table);
487 bool isExists = false;
488 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
489 if (errCode != E_OK) {
490 LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
491 return errCode;
492 }
493 if (!isExists) { // table maybe dropped after set reference
494 LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
495 continue;
496 }
497
498 bool isEmpty = true;
499 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
500 if (errCode != E_OK) {
501 LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
502 clearWaterMarkTables.clear();
503 return errCode;
504 }
505 if (!isEmpty) {
506 clearWaterMarkTables.insert(table);
507 }
508 }
509 LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu, changeTables size = %zu, isFirst:%d.",
510 clearWaterMarkTables.size(), changeTables.size(), isRefNotSet);
511 // If reference set for the first time and has no data, or the reference has not changed, return E_OK.
512 return ((isRefNotSet && clearWaterMarkTables.empty()) || changeTables.empty()) ? E_OK : -E_TABLE_REFERENCE_CHANGED;
513 }
514
UpgradedLogForExistedData(const TableInfo & tableInfo,bool schemaChanged)515 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(const TableInfo &tableInfo, bool schemaChanged)
516 {
517 std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
518 if (schemaChanged) {
519 std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
520 "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
521 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
522 if (ret != E_OK) {
523 LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
524 return ret;
525 }
526 }
527 if (tableInfo.GetTrackerTable().IsEmpty()) {
528 return E_OK;
529 }
530 LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
531 int errCode = SetLogTriggerStatus(false);
532 if (errCode != E_OK) {
533 return errCode;
534 }
535 std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
536 TrackerTable trackerTable = tableInfo.GetTrackerTable();
537 errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
538 [this, &sql]() {
539 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
540 if (ret != E_OK) {
541 LOGE("Upgrade log for extend field failed.");
542 }
543 return ret;
544 });
545 return SetLogTriggerStatus(true);
546 }
547
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)548 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
549 {
550 int errCode = E_OK;
551 std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
552 for (const auto &sql: dropSql) {
553 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
554 if (errCode != E_OK) {
555 LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
556 return errCode;
557 }
558 }
559 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
560 if (errCode != E_OK) {
561 LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
562 return errCode;
563 }
564 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
565 if (errCode != E_OK) {
566 LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
567 return errCode;
568 }
569 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
570 if (errCode != E_OK) {
571 LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
572 }
573 return errCode;
574 }
575
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)576 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
577 ChangeProperties &changeProperties)
578 {
579 std::string fileName;
580 if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
581 LOGE("get db file name failed.");
582 return -E_INVALID_DB;
583 }
584 SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
585 return E_OK;
586 }
587
ClearAllTempSyncTrigger()588 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
589 {
590 sqlite3_stmt *stmt = nullptr;
591 static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
592 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
593 if (errCode != E_OK) {
594 LOGE("get clear all temp trigger stmt failed %d.", errCode);
595 return errCode;
596 }
597 int ret = E_OK;
598 while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
599 std::string str;
600 (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
601 if (errCode != E_OK) {
602 SQLiteUtils::ResetStatement(stmt, true, ret);
603 return errCode;
604 }
605 std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
606 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
607 if (errCode != E_OK) {
608 LOGE("drop temp trigger failed %d.", errCode);
609 SQLiteUtils::ResetStatement(stmt, true, ret);
610 return errCode;
611 }
612 }
613 SQLiteUtils::ResetStatement(stmt, true, ret);
614 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
615 }
616
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)617 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
618 bool isOnlyTrackTable)
619 {
620 return SQLiteRelationalUtils::CleanTrackerData(dbHandle_, tableName, cursor, isOnlyTrackTable);
621 }
622
CreateSharedTable(const TableSchema & tableSchema)623 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
624 {
625 LOGI("Create shared table[%s length[%u]]", DBCommon::StringMiddleMasking(tableSchema.name).c_str(),
626 tableSchema.name.length());
627 std::map<int32_t, std::string> cloudFieldTypeMap;
628 cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
629 cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
630 cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
631 cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
632 cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
633 cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
634 cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
635 cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
636
637 std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
638 std::string primaryKey = ", PRIMARY KEY (";
639 createTableSql += CloudDbConstant::CLOUD_OWNER;
640 createTableSql += " TEXT, ";
641 createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
642 createTableSql += " TEXT";
643 primaryKey += CloudDbConstant::CLOUD_OWNER;
644 bool hasPrimaryKey = false;
645 for (const auto &field : tableSchema.fields) {
646 createTableSql += ", " + field.colName + " ";
647 createTableSql += cloudFieldTypeMap[field.type];
648 createTableSql += field.nullable ? "" : " NOT NULL";
649 if (field.primary) {
650 primaryKey += ", " + field.colName;
651 hasPrimaryKey = true;
652 }
653 }
654 if (hasPrimaryKey) {
655 createTableSql += primaryKey + ")";
656 }
657 createTableSql += ");";
658 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
659 if (errCode != E_OK) {
660 LOGE("Create shared table failed, %d.", errCode);
661 }
662 return errCode;
663 }
664
DeleteTable(const std::vector<std::string> & tableNames)665 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
666 {
667 for (const auto &tableName : tableNames) {
668 std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
669 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
670 if (errCode != E_OK) {
671 LOGE("Delete table failed, %d.", errCode);
672 return errCode;
673 }
674 }
675 return E_OK;
676 }
677
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)678 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
679 const std::map<std::string, std::vector<Field>> &updateTableNames)
680 {
681 int errCode = E_OK;
682 std::map<int32_t, std::string> fieldTypeMap;
683 fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
684 fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
685 fieldTypeMap[TYPE_INDEX<double>] = "REAL";
686 fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
687 fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
688 fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
689 fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
690 fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
691 for (const auto &table : updateTableNames) {
692 if (table.second.empty()) {
693 continue;
694 }
695 std::string addColumnSql = "";
696 for (const auto &field : table.second) {
697 addColumnSql += "ALTER TABLE " + table.first + " ADD ";
698 addColumnSql += field.colName + " ";
699 addColumnSql += fieldTypeMap[field.type];
700 addColumnSql += field.primary ? " PRIMARY KEY" : "";
701 addColumnSql += field.nullable ? ";" : " NOT NULL;";
702 }
703 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
704 if (errCode != E_OK) {
705 LOGE("Shared table add column failed, %d.", errCode);
706 return errCode;
707 }
708 }
709 return errCode;
710 }
711
AlterTableName(const std::map<std::string,std::string> & tableNames)712 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
713 {
714 for (const auto &tableName : tableNames) {
715 std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
716 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
717 if (errCode != E_OK) {
718 LOGE("Alter table name failed, %d.", errCode);
719 return errCode;
720 }
721 }
722 return E_OK;
723 }
724
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)725 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
726 const VBucket &vBucket, std::string &sql)
727 {
728 std::string gidStr;
729 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
730 if (errCode != E_OK) {
731 LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
732 return errCode;
733 }
734
735 sql += " WHERE ";
736 if (!gidStr.empty()) {
737 sql += "cloud_gid = '" + gidStr + "'";
738 }
739 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
740 if (!pkMap.empty()) {
741 if (!gidStr.empty()) {
742 sql += " OR ";
743 }
744 sql += "(hash_key = ?);";
745 }
746 return E_OK;
747 }
748
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)749 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
750 {
751 int errCode = E_OK;
752 for (const auto &tableName: tableNameList) {
753 int32_t count = 0;
754 std::string logTableName = DBCommon::GetLogTableName(tableName);
755 (void)GetFlagIsLocalCount(logTableName, count);
756 LOGI("[DoCleanShareTableDataAndLog]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
757 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
758 if (isLogicDelete_) {
759 errCode = SetDataOnShareTableWithLogicDelete(tableName, logTableName);
760 } else {
761 errCode = CleanShareTable(tableName);
762 }
763 if (errCode != E_OK) {
764 LOGE("clean share table failed at table:%s, length:%d, deleteType:%d, errCode:%d.",
765 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), isLogicDelete_ ? 1 : 0, errCode);
766 return errCode;
767 }
768 (void)GetFlagIsLocalCount(logTableName, count);
769 LOGI("[DoCleanShareTableDataAndLog]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
770 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
771 }
772 return errCode;
773 }
774
CleanShareTable(const std::string & tableName)775 int SQLiteSingleVerRelationalStorageExecutor::CleanShareTable(const std::string &tableName)
776 {
777 int ret = E_OK;
778 int errCode = E_OK;
779 std::string delDataSql = "DELETE FROM '" + tableName + "';";
780 sqlite3_stmt *statement = nullptr;
781 errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
782 if (errCode != E_OK) {
783 LOGE("get clean shared data stmt failed %d.", errCode);
784 return errCode;
785 }
786 errCode = SQLiteUtils::StepWithRetry(statement);
787 SQLiteUtils::ResetStatement(statement, true, ret);
788 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
789 errCode = E_OK;
790 } else {
791 LOGE("clean shared data failed: %d.", errCode);
792 return errCode;
793 }
794 statement = nullptr;
795 std::string delLogSql = "DELETE FROM '" + std::string(DBConstant::RELATIONAL_PREFIX) + tableName + "_log';";
796 errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
797 if (errCode != E_OK) {
798 LOGE("get clean shared log stmt failed %d.", errCode);
799 return errCode;
800 }
801 errCode = SQLiteUtils::StepWithRetry(statement);
802 SQLiteUtils::ResetStatement(statement, true, ret);
803 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
804 errCode = E_OK;
805 } else {
806 LOGE("clean shared log failed: %d.", errCode);
807 return errCode;
808 }
809 // here errCode must be E_OK, just return ret
810 return ret;
811 }
812
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)813 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
814 const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
815 std::map<int64_t, Entries> &referenceGid)
816 {
817 int errCode = E_OK;
818 for (const auto &[targetTable, targetReference] : tableReference) {
819 errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
820 if (errCode != E_OK) {
821 LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
822 return errCode;
823 }
824 }
825 return errCode;
826 }
827
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)828 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
829 const std::string &targetTable, const CloudSyncBatch &syncBatch,
830 const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
831 {
832 auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
833 if (sourceFields.empty()) {
834 LOGD("[RDBExecutor] source field is empty.");
835 return E_OK;
836 }
837 if (sourceFields.size() != targetFields.size()) {
838 LOGE("[RDBExecutor] reference field size not equal.");
839 return -E_INTERNAL_ERROR;
840 }
841 std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
842 sqlite3_stmt *statement = nullptr;
843 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
844 if (errCode != E_OK) {
845 LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
846 return errCode;
847 }
848 errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
849 int ret = E_OK;
850 SQLiteUtils::ResetStatement(statement, true, ret);
851 return errCode == E_OK ? ret : errCode;
852 }
853
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)854 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
855 const std::string &targetTable, const std::vector<std::string> &sourceFields,
856 const std::vector<std::string> &targetFields)
857 {
858 // sql like this:
859 // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
860 // (SELECT parent._rowid_ AS rowid_b FROM parent,
861 // (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
862 // WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
863 // WHERE parent.name = source_a.name ) temp_table
864 // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
865 std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
866 std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
867 std::string sql;
868 sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
869 sql += "(";
870 sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
871 + ", ";
872 sql += "(SELECT " + sourceTable + "._rowid_,";
873 std::set<std::string> sourceFieldSet;
874 for (const auto &item : sourceFields) {
875 sourceFieldSet.insert(item);
876 }
877 for (const auto &sourceField : sourceFieldSet) {
878 sql += sourceField + ",";
879 }
880 sql.pop_back();
881 sql += " FROM " + sourceTable + ", " + logSourceTable;
882 sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
883 sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
884 sql += " WHERE ";
885 for (size_t i = 0u; i < sourceFields.size(); ++i) {
886 if (i != 0u) {
887 sql += " AND ";
888 }
889 sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
890 }
891 sql += ") temp_table ";
892 sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
893 sql += " AND " + logTargetTable + ".flag&0x08=0x00";
894 return sql;
895 }
896
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)897 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
898 const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
899 {
900 int errCode = E_OK;
901 if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
902 LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
903 syncBatch.timestamp.size());
904 return -E_INVALID_ARGS;
905 }
906 int matchCount = 0;
907 for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
908 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
909 if (errCode != E_OK) {
910 LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
911 break;
912 }
913 errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
914 if (errCode != E_OK) {
915 LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
916 break;
917 }
918 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
919 std::string gid;
920 (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
921 if (gid.empty()) {
922 LOGE("[RDBExecutor] reference data don't contain gid.");
923 errCode = -E_CLOUD_ERROR;
924 break;
925 }
926 referenceGid[syncBatch.rowid[i]][targetTable] = gid;
927 matchCount++;
928 }
929 if (errCode == -E_FINISHED) {
930 errCode = E_OK;
931 }
932 if (errCode != E_OK) {
933 LOGE("[RDBExecutor] step stmt failed %d.", errCode);
934 break;
935 }
936 SQLiteUtils::ResetStatement(statement, false, errCode);
937 if (errCode != E_OK) {
938 LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
939 break;
940 }
941 }
942 if (matchCount != 0) {
943 LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
944 }
945 return errCode;
946 }
947
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)948 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
949 const std::vector<TableReferenceProperty> &targetTableReference)
950 {
951 PairStringVector sourceTargetFiled;
952 for (const auto &reference : targetTableReference) {
953 for (const auto &column : reference.columns) {
954 sourceTargetFiled.first.push_back(column.first);
955 sourceTargetFiled.second.push_back(column.second);
956 }
957 }
958 return sourceTargetFiled;
959 }
960
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)961 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
962 bool ignoreEmptyGid, sqlite3_stmt *&stmt)
963 {
964 int fillGidCount = 0;
965 int errCode = E_OK;
966 for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
967 auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
968 if (gidEntry == cloudDataResult.insData.extend[i].end()) {
969 bool isSkipAssetsMissRecord = false;
970 if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
971 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
972 isSkipAssetsMissRecord = true;
973 }
974 if (ignoreEmptyGid || isSkipAssetsMissRecord) {
975 continue;
976 }
977 errCode = -E_INVALID_ARGS;
978 LOGE("[RDBExecutor] Extend not contain gid.");
979 break;
980 }
981 bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
982 if (ignoreEmptyGid && containError) {
983 continue;
984 }
985 std::string val;
986 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
987 cloudDataResult.insData.extend[i], val) != E_OK) {
988 errCode = -E_CLOUD_ERROR;
989 LOGE("[RDBExecutor] Can't get string gid from extend.");
990 break;
991 }
992 if (val.empty()) {
993 errCode = -E_CLOUD_ERROR;
994 LOGE("[RDBExecutor] Get empty gid from extend.");
995 break;
996 }
997 errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
998 if (errCode != E_OK) {
999 LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
1000 break;
1001 }
1002 }
1003 LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
1004 return errCode;
1005 }
1006
CleanExtendAndCursorForDeleteData(const std::string & tableName)1007 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
1008 {
1009 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1010 std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
1011 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1012 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1013 LOGE("update extend field and cursor failed %d.", errCode);
1014 }
1015 return errCode;
1016 }
1017
CheckIfExistUserTable(const std::string & tableName)1018 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
1019 {
1020 std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
1021 sqlite3_stmt *statement = nullptr;
1022 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1023 if (errCode != E_OK) {
1024 LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
1025 return errCode;
1026 }
1027 errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
1028 if (errCode != E_OK) {
1029 LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
1030 int ret = E_OK;
1031 SQLiteUtils::ResetStatement(statement, true, ret);
1032 return errCode;
1033 }
1034 if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1035 LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
1036 SQLiteUtils::ResetStatement(statement, true, errCode);
1037 return -E_INVALID_ARGS;
1038 }
1039 SQLiteUtils::ResetStatement(statement, true, errCode);
1040 return E_OK;
1041 }
1042
GetCloudDeleteSql(const std::string & table,std::string & sql)1043 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table, std::string &sql)
1044 {
1045 uint64_t cursor = DBConstant::INVALID_CURSOR;
1046 int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, table, cursor);
1047 if (errCode != E_OK) {
1048 LOGE("[GetCloudDeleteSql] Get cursor of table[%s length[%u]] failed: %d",
1049 DBCommon::StringMiddleMasking(table).c_str(), table.length(), errCode);
1050 return errCode;
1051 }
1052 sql += " cloud_gid = '', version = '', ";
1053 if (isLogicDelete_) {
1054 // cursor already increased by DeleteCloudData, can be assigned directly here
1055 // 1001 which is logicDelete|cloudForcePush|local|delete
1056 sql += "flag = (flag&" + std::string(CONSISTENT_FLAG) + "|" +
1057 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1058 static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) +
1059 ")&" + std::to_string(~static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) +
1060 ", cursor = " + std::to_string(cursor) + " ";
1061 } else {
1062 sql += "data_key = -1, flag = (flag&" + std::string(CONSISTENT_FLAG) + "|" +
1063 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ")&" +
1064 std::to_string(~static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL)) + ", sharing_resource = ''";
1065 errCode = SetCursor(table, cursor + 1);
1066 if (errCode == E_OK) {
1067 sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1068 } else {
1069 LOGE("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1070 return errCode;
1071 }
1072 }
1073 return E_OK;
1074 }
1075
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1076 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1077 {
1078 int errCode = E_OK;
1079 std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1080 std::to_string(dataKey);
1081 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1082 if (errCode != E_OK) {
1083 LOGE("[RDBExecutor] remove data failed %d", errCode);
1084 return errCode;
1085 }
1086 std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1087 std::to_string(dataKey);
1088 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1089 if (errCode != E_OK) {
1090 LOGE("[RDBExecutor] remove log failed %d", errCode);
1091 }
1092 return errCode;
1093 }
1094
GetLocalDataKey(size_t index,const DownloadData & downloadData)1095 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1096 const DownloadData &downloadData)
1097 {
1098 if (index >= downloadData.existDataKey.size()) {
1099 LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1100 return -1; // -1 means not exist
1101 }
1102 return downloadData.existDataKey[index];
1103 }
1104
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1105 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1106 sqlite3_stmt *&stmt, int &fillGidCount)
1107 {
1108 int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1109 if (errCode != E_OK) {
1110 return errCode;
1111 }
1112 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1113 if (errCode != E_OK) {
1114 return errCode;
1115 }
1116 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1117 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1118 errCode = E_OK;
1119 fillGidCount++;
1120 SQLiteUtils::ResetStatement(stmt, false, errCode);
1121 } else {
1122 LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1123 }
1124 return errCode;
1125 }
1126
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1127 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1128 const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity)
1129 {
1130 auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1131 return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, localIdentity);
1132 }
1133
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1134 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1135 const RelationalSchemaObject &localSchema)
1136 {
1137 std::vector<int64_t> dataKeys;
1138 std::string logTableName = DBCommon::GetLogTableName(tableName);
1139 int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1140 if (errCode != E_OK) {
1141 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1142 return errCode;
1143 }
1144 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1145 errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1146 if (errCode != E_OK) {
1147 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1148 }
1149 return errCode;
1150 }
1151
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1152 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1153 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1154 {
1155 int errCode = E_OK;
1156 for (const auto &fieldInfo : fieldInfos) {
1157 if (fieldInfo.IsAssetType()) {
1158 Assets assets;
1159 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1160 if (errCode != E_OK) {
1161 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1162 return errCode;
1163 }
1164 errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1165 if (errCode != E_OK) {
1166 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1167 return errCode;
1168 }
1169 } else if (fieldInfo.IsAssetsType()) {
1170 errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1171 if (errCode != E_OK) {
1172 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1173 return errCode;
1174 }
1175 }
1176 }
1177 return errCode;
1178 }
1179
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1180 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1181 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1182 {
1183 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1184 return E_OK;
1185 }
1186 int errCode = E_OK;
1187 int ret = E_OK;
1188 sqlite3_stmt *stmt = nullptr;
1189 size_t index = 0;
1190 for (const auto &rowId : dataKeys) {
1191 if (rowId == -1) { // -1 means data is deleted
1192 continue;
1193 }
1194 if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1195 index++;
1196 continue;
1197 }
1198 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1199 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1200 errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1201 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1202 LOGE("Get statement failed, %d", errCode);
1203 return errCode;
1204 }
1205 assets[index].assetId = "";
1206 assets[index].status &= ~AssetStatus::UPLOADING;
1207 errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1208 index++;
1209 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1210 LOGE("Bind asset to blob statement failed, %d", errCode);
1211 goto END;
1212 }
1213 errCode = SQLiteUtils::StepWithRetry(stmt);
1214 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1215 errCode = E_OK;
1216 } else {
1217 LOGE("Step statement failed, %d", errCode);
1218 goto END;
1219 }
1220 SQLiteUtils::ResetStatement(stmt, true, ret);
1221 }
1222 return errCode != E_OK ? errCode : ret;
1223 END:
1224 SQLiteUtils::ResetStatement(stmt, true, ret);
1225 return errCode != E_OK ? errCode : ret;
1226 }
1227
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1228 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1229 const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1230 {
1231 int errCode = E_OK;
1232 int ret = E_OK;
1233 sqlite3_stmt *selectStmt = nullptr;
1234 for (const auto &rowId : dataKeys) {
1235 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1236 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1237 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1238 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1239 LOGE("Get select assets statement failed, %d.", errCode);
1240 goto END;
1241 }
1242 Assets assets;
1243 errCode = GetAssetsByRowId(selectStmt, assets);
1244 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1245 LOGE("Get assets by rowId failed, %d.", errCode);
1246 goto END;
1247 }
1248 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1249 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1250 continue;
1251 }
1252 for (auto &asset : assets) {
1253 asset.assetId = "";
1254 asset.status &= ~AssetStatus::UPLOADING;
1255 }
1256 std::vector<uint8_t> assetsValue;
1257 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1258 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1259 LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1260 return errCode;
1261 }
1262 errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1263 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1264 LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1265 return errCode;
1266 }
1267 }
1268 return errCode != E_OK ? errCode : ret;
1269 END:
1270 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1271 return errCode != E_OK ? errCode : ret;
1272 }
1273
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1274 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1275 const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1276 {
1277 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1278 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1279 sqlite3_stmt *stmt = nullptr;
1280 int ret = E_OK;
1281 int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1282 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1283 LOGE("Get statement failed, %d", errCode);
1284 SQLiteUtils::ResetStatement(stmt, true, ret);
1285 return errCode;
1286 }
1287 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1288 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1289 SQLiteUtils::ResetStatement(stmt, true, ret);
1290 return errCode;
1291 }
1292 errCode = SQLiteUtils::StepWithRetry(stmt);
1293 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1294 errCode = E_OK;
1295 }
1296 SQLiteUtils::ResetStatement(stmt, true, ret);
1297 return errCode != E_OK ? errCode : ret;
1298 }
1299
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1300 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1301 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1302 {
1303 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1304 auto &[errCode, status] = res;
1305 std::vector<Field> assetFields;
1306 std::string sql = "SELECT";
1307 for (const auto &field: tableSchema.fields) {
1308 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1309 assetFields.emplace_back(field);
1310 sql += " b." + field.colName + ",";
1311 }
1312 }
1313 if (assetFields.empty()) {
1314 return { -E_NOT_FOUND, status };
1315 }
1316 sql += "a.cloud_gid, a.status ";
1317 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1318 (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR a.hash_key = ?);");
1319 sqlite3_stmt *stmt = nullptr;
1320 errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1321 if (errCode != E_OK) {
1322 return res;
1323 }
1324 errCode = SQLiteUtils::StepWithRetry(stmt);
1325 int index = 0;
1326 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1327 for (const auto &field: assetFields) {
1328 Type cloudValue;
1329 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1330 if (errCode != E_OK) {
1331 break;
1332 }
1333 errCode = PutVBucketByType(assets, field, cloudValue);
1334 if (errCode != E_OK) {
1335 break;
1336 }
1337 }
1338 std::string curGid;
1339 errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1340 if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1341 // Gid is different, there may be duplicate primary keys in the cloud
1342 errCode = -E_CLOUD_GID_MISMATCH;
1343 }
1344 status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1345 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1346 errCode = -E_NOT_FOUND;
1347 } else {
1348 LOGE("step get asset stmt failed %d.", errCode);
1349 }
1350 SQLiteUtils::ResetStatement(stmt, true, errCode);
1351 return res;
1352 }
1353
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1354 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1355 const Bytes &hashKey, sqlite3_stmt *&stmt)
1356 {
1357 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1358 if (errCode != E_OK) {
1359 LOGE("Get asset statement failed, %d.", errCode);
1360 return errCode;
1361 }
1362 int index = 1;
1363 int ret = E_OK;
1364 if (!gid.empty()) {
1365 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1366 if (errCode != E_OK) {
1367 LOGE("bind gid failed %d.", errCode);
1368 SQLiteUtils::ResetStatement(stmt, true, ret);
1369 return errCode;
1370 }
1371 }
1372 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1373 if (errCode != E_OK) {
1374 LOGE("bind hash failed %d.", errCode);
1375 SQLiteUtils::ResetStatement(stmt, true, ret);
1376 }
1377 return errCode;
1378 }
1379
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1380 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1381 bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1382 {
1383 int errCode = E_OK;
1384 switch (opType) {
1385 case OpType::UPDATE_VERSION: // fallthrough
1386 case OpType::INSERT_VERSION: {
1387 errCode = FillCloudVersionForUpload(opType, data);
1388 break;
1389 }
1390 case OpType::SET_UPLOADING: {
1391 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1392 if (errCode != E_OK) {
1393 LOGE("Failed to set uploading for ins data, %d.", errCode);
1394 return errCode;
1395 }
1396 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1397 break;
1398 }
1399 case OpType::INSERT: {
1400 errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1401 if (errCode != E_OK) {
1402 LOGE("Failed to fill cloud log gid, %d.", errCode);
1403 return errCode;
1404 }
1405 if (fillAsset) {
1406 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1407 if (errCode != E_OK) {
1408 LOGE("Failed to fill asset for ins, %d.", errCode);
1409 return errCode;
1410 }
1411 }
1412 errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1413 break;
1414 }
1415 case OpType::UPDATE: {
1416 if (fillAsset && !data.updData.assets.empty()) {
1417 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1418 if (errCode != E_OK) {
1419 LOGE("Failed to fill asset for upd, %d.", errCode);
1420 return errCode;
1421 }
1422 }
1423 errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1424 break;
1425 }
1426 default:
1427 break;
1428 }
1429 return errCode;
1430 }
1431
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1432 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1433 {
1434 int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1435 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1436 std::vector<uint8_t> blobValue;
1437 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1438 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1439 LOGE("Get column blob value failed %d.", errCode);
1440 return errCode;
1441 }
1442 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1443 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1444 LOGE("Transfer blob to assets failed %d", errCode);
1445 }
1446 return errCode;
1447 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1448 return E_OK;
1449 } else {
1450 LOGE("Step select statement failed %d.", errCode);
1451 return errCode;
1452 }
1453 }
1454
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1455 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1456 {
1457 assetLoader_ = loader;
1458 }
1459
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1460 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1461 int beginIndex, const std::string &cloudGid)
1462 {
1463 int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1464 if (errCode != E_OK) {
1465 LOGE("Bind cloud gid to statement failed %d.", errCode);
1466 int ret = E_OK;
1467 SQLiteUtils::ResetStatement(stmt, true, ret);
1468 return errCode;
1469 }
1470 errCode = SQLiteUtils::StepWithRetry(stmt);
1471 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1472 errCode = E_OK;
1473 } else {
1474 LOGE("Fill cloud asset failed: %d.", errCode);
1475 }
1476 int ret = E_OK;
1477 SQLiteUtils::ResetStatement(stmt, true, ret);
1478 return errCode != E_OK ? errCode : ret;
1479 }
1480
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1481 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1482 const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1483 {
1484 if (assetLoader_ == nullptr) {
1485 LOGE("assetLoader may be not set.");
1486 return -E_NOT_SET;
1487 }
1488 std::vector<Asset> toDeleteAssets;
1489 CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1490 if (toDeleteAssets.empty()) {
1491 return E_OK;
1492 }
1493 DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1494 if (ret != OK) {
1495 LOGE("remove local assets failed %d.", ret);
1496 return -E_REMOVE_ASSETS_FAILED;
1497 }
1498 return E_OK;
1499 }
1500
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1501 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1502 const VBucket &assets, sqlite3_stmt *&statement)
1503 {
1504 std::string sql = "UPDATE '" + tableName + "' SET ";
1505 for (const auto &item: assets) {
1506 sql += item.first + " = ?,";
1507 }
1508 sql.pop_back();
1509 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1510 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1511 if (errCode != E_OK) {
1512 return errCode;
1513 }
1514 int bindIndex = 1;
1515 for (const auto &item: assets) {
1516 Field field = {
1517 .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1518 };
1519 errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1520 if (errCode != E_OK) {
1521 return errCode;
1522 }
1523 }
1524 return errCode;
1525 }
1526
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1527 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1528 const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1529 {
1530 if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1531 opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1532 return E_OK;
1533 }
1534 if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1535 // this is shared table, not need to update asset id.
1536 return E_OK;
1537 }
1538 bool isNotIncCursor = false; // if isNotIncCursor is false, will increase cursor
1539 if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket, isNotIncCursor)) {
1540 return E_OK;
1541 }
1542 int error = SetCursorIncFlag(!isNotIncCursor);
1543 if (error != E_OK) {
1544 LOGE("[Storage Executor] failed to set cursor_inc_flag, %d.", error);
1545 return error;
1546 }
1547 int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1548 if (errCode != E_OK) {
1549 LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1550 }
1551 if (isNotIncCursor) {
1552 error = SetCursorIncFlag(true);
1553 if (error != E_OK) {
1554 LOGE("[Storage Executor] failed to set cursor_inc_flag true, %d.", error);
1555 return error;
1556 }
1557 }
1558 return errCode;
1559 }
1560
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1561 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1562 Asset &asset)
1563 {
1564 for (const auto &[col, value] : vBucket) {
1565 if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1566 asset = std::get<Asset>(value);
1567 }
1568 }
1569 }
1570
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1571 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1572 Assets &assets)
1573 {
1574 for (const auto &[col, value] : vBucket) {
1575 if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1576 assets = std::get<Assets>(value);
1577 }
1578 }
1579 }
1580
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1581 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1582 sqlite3_stmt *&stmt)
1583 {
1584 std::vector<uint8_t> blobValue;
1585 int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1586 if (errCode != E_OK) {
1587 LOGE("Transfer asset to blob failed, %d.", errCode);
1588 return errCode;
1589 }
1590 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1591 if (errCode != E_OK) {
1592 LOGE("Bind asset blob to statement failed, %d.", errCode);
1593 }
1594 return errCode;
1595 }
1596
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1597 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1598 sqlite3_stmt *&stmt)
1599 {
1600 std::vector<uint8_t> blobValue;
1601 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1602 if (errCode != E_OK) {
1603 LOGE("Transfer asset to blob failed, %d.", errCode);
1604 return errCode;
1605 }
1606 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1607 if (errCode != E_OK) {
1608 LOGE("Bind asset blob to statement failed, %d.", errCode);
1609 }
1610 return errCode;
1611 }
1612
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1613 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1614 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1615 {
1616 int assetIndex = 0;
1617 int assetsIndex = 0;
1618 for (const auto &field : tableSchema.fields) {
1619 if (field.type == TYPE_INDEX<Asset>) {
1620 if (assetOfOneRecord[assetIndex].name.empty()) {
1621 continue;
1622 }
1623 int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1624 if (errCode != E_OK) {
1625 LOGE("Bind asset to blob statement failed, %d.", errCode);
1626 return errCode;
1627 }
1628 assetIndex++;
1629 } else if (field.type == TYPE_INDEX<Assets>) {
1630 if (assetsOfOneRecord[assetsIndex].empty()) {
1631 continue;
1632 }
1633 int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1634 if (errCode != E_OK) {
1635 LOGE("Bind assets to blob statement failed, %d.", errCode);
1636 return errCode;
1637 }
1638 assetsIndex++;
1639 }
1640 }
1641 return E_OK;
1642 }
1643
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1644 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1645 const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1646 {
1647 int errCode = E_OK;
1648 int ret = E_OK;
1649 sqlite3_stmt *stmt = nullptr;
1650 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1651 if (errCode != E_OK) {
1652 LOGE("Get update asset statement failed, %d.", errCode);
1653 return errCode;
1654 }
1655 errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1656 if (errCode != E_OK) {
1657 LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1658 SQLiteUtils::ResetStatement(stmt, true, ret);
1659 return errCode != E_OK ? errCode : ret;
1660 }
1661 errCode = SQLiteUtils::StepWithRetry(stmt);
1662 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1663 errCode = E_OK;
1664 } else {
1665 LOGE("Step statement failed, %d", errCode);
1666 }
1667 SQLiteUtils::ResetStatement(stmt, true, ret);
1668 return errCode != E_OK ? errCode : ret;
1669 }
1670
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1671 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1672 const VBucket &vBucket)
1673 {
1674 int errCode = E_OK;
1675 std::vector<Asset> assetOfOneRecord;
1676 std::vector<Assets> assetsOfOneRecord;
1677 std::string updateAssetIdSql = "UPDATE " + tableSchema.name + " SET";
1678 for (const auto &field : tableSchema.fields) {
1679 if (field.type == TYPE_INDEX<Asset>) {
1680 Asset asset;
1681 UpdateLocalAssetId(vBucket, field.colName, asset);
1682 assetOfOneRecord.push_back(asset);
1683 if (!asset.name.empty()) {
1684 updateAssetIdSql += " " + field.colName + " = ?,";
1685 }
1686 }
1687 if (field.type == TYPE_INDEX<Assets>) {
1688 Assets assets;
1689 UpdateLocalAssetsId(vBucket, field.colName, assets);
1690 assetsOfOneRecord.push_back(assets);
1691 if (!assets.empty()) {
1692 updateAssetIdSql += " " + field.colName + " = ?,";
1693 }
1694 }
1695 }
1696 if (updateAssetIdSql == "UPDATE " + tableSchema.name + " SET") {
1697 return E_OK;
1698 }
1699 updateAssetIdSql.pop_back();
1700 updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1701 errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1702 if (errCode != E_OK) {
1703 LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1704 }
1705 return errCode;
1706 }
1707
SetPutDataMode(PutDataMode mode)1708 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1709 {
1710 putDataMode_ = mode;
1711 }
1712
SetMarkFlagOption(MarkFlagOption option)1713 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1714 {
1715 markFlagOption_ = option;
1716 }
1717
GetDataFlag()1718 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1719 {
1720 if (putDataMode_ != PutDataMode::USER) {
1721 return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1722 static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1723 }
1724 uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1725 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1726 flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1727 }
1728 flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1729 return static_cast<int64_t>(flag);
1730 }
1731
GetUpdateDataFlagSql(const VBucket & data)1732 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql(const VBucket &data)
1733 {
1734 std::string retentionFlag = "flag = (flag & " +
1735 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY) |
1736 static_cast<uint32_t>(LogInfoFlag::FLAG_ASSET_DOWNLOADING_FOR_ASYNC)) +
1737 ") | " + std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_CLOUD_UPDATE_LOCAL));
1738 if (putDataMode_ == PutDataMode::SYNC) {
1739 if (CloudStorageUtils::IsAssetsContainDownloadRecord(data)) {
1740 return retentionFlag;
1741 }
1742 return UPDATE_FLAG_CLOUD;
1743 }
1744 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1745 return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1746 }
1747 return retentionFlag;
1748 }
1749
GetDev()1750 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1751 {
1752 return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1753 }
1754
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1755 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1756 const TableSchema &tableSchema)
1757 {
1758 std::set<std::string> useFields;
1759 std::vector<Field> fields;
1760 if (putDataMode_ == PutDataMode::SYNC) {
1761 for (const auto &field : tableSchema.fields) {
1762 useFields.insert(field.colName);
1763 }
1764 fields = tableSchema.fields;
1765 } else {
1766 for (const auto &field : vBucket) {
1767 if (field.first.empty() || field.first[0] == '#') {
1768 continue;
1769 }
1770 useFields.insert(field.first);
1771 }
1772 for (const auto &field : tableSchema.fields) {
1773 if (useFields.find(field.colName) == useFields.end()) {
1774 continue;
1775 }
1776 fields.push_back(field);
1777 }
1778 }
1779 return fields;
1780 }
1781
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1782 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1783 const LogInfo &logInfo)
1784 {
1785 bool useHashKey = false;
1786 if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1787 if (logInfo.hashKey.empty()) {
1788 LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1789 return -E_INVALID_ARGS;
1790 }
1791 useHashKey = true;
1792 }
1793 sqlite3_stmt *stmt = nullptr;
1794 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1795 if (errCode != E_OK) {
1796 LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1797 return errCode;
1798 }
1799 int ret = E_OK;
1800 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1801 if (errCode != E_OK) {
1802 LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1803 SQLiteUtils::ResetStatement(stmt, true, ret);
1804 return errCode;
1805 }
1806 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1807 if (errCode != E_OK) {
1808 LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1809 SQLiteUtils::ResetStatement(stmt, true, ret);
1810 return errCode;
1811 }
1812 if (useHashKey) {
1813 errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1814 if (errCode != E_OK) {
1815 LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1816 SQLiteUtils::ResetStatement(stmt, true, ret);
1817 return errCode;
1818 }
1819 }
1820 errCode = SQLiteUtils::StepWithRetry(stmt);
1821 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1822 errCode = E_OK;
1823 } else {
1824 LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1825 }
1826 SQLiteUtils::ResetStatement(stmt, true, ret);
1827 return errCode == E_OK ? ret : errCode;
1828 }
1829
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp,bool isExistAssetsDownload)1830 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1831 const Key &hashKey, Timestamp timestamp, bool isExistAssetsDownload)
1832 {
1833 sqlite3_stmt *stmt = nullptr;
1834 int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName,
1835 isExistAssetsDownload), stmt);
1836 if (errCode != E_OK) {
1837 LOGW("[Storage Executor] Get statement fail! errCode:%d", errCode);
1838 return;
1839 }
1840 int index = 1;
1841 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1842 if (errCode != E_OK) {
1843 SQLiteUtils::ResetStatement(stmt, true, errCode);
1844 LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1845 return;
1846 }
1847 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1848 if (errCode != E_OK) {
1849 SQLiteUtils::ResetStatement(stmt, true, errCode);
1850 LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1851 return;
1852 }
1853 errCode = SQLiteUtils::StepWithRetry(stmt);
1854 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1855 errCode = E_OK;
1856 } else {
1857 LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1858 }
1859 SQLiteUtils::ResetStatement(stmt, true, errCode);
1860 }
1861
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data,bool isQueryDownloadRecords)1862 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1863 std::vector<VBucket> &data, bool isQueryDownloadRecords)
1864 {
1865 std::string sql = "SELECT ";
1866 std::vector<Field> pkFields;
1867 for (const auto &field : table.fields) {
1868 if (!field.primary) {
1869 continue;
1870 }
1871 sql += "b." + field.colName + ",";
1872 pkFields.push_back(field);
1873 }
1874 if (pkFields.empty()) {
1875 // ignore no pk table
1876 return E_OK;
1877 }
1878 sql.pop_back();
1879 if (isQueryDownloadRecords) {
1880 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " +
1881 FLAG_IS_WAIT_COMPENSATED_CONTAIN_DOWNLOAD_SYNC;
1882 } else {
1883 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1884 }
1885 sqlite3_stmt *stmt = nullptr;
1886 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1887 if (errCode != E_OK) {
1888 LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1889 return errCode;
1890 }
1891 do {
1892 errCode = SQLiteUtils::StepWithRetry(stmt);
1893 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1894 VBucket pkData;
1895 errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1896 if (errCode != E_OK) {
1897 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1898 break;
1899 }
1900 data.push_back(pkData);
1901 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1902 errCode = E_OK;
1903 break;
1904 } else {
1905 LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1906 break;
1907 }
1908 } while (errCode == E_OK);
1909 int ret = E_OK;
1910 SQLiteUtils::ResetStatement(stmt, true, ret);
1911 return errCode == E_OK ? ret : errCode;
1912 }
1913
ClearUnLockingStatus(const std::string & tableName)1914 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1915 {
1916 std::string sql;
1917 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1918 "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1919 sqlite3_stmt *stmt = nullptr;
1920 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1921 if (errCode != E_OK) {
1922 LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1923 return errCode;
1924 }
1925 int ret = E_OK;
1926 errCode = SQLiteUtils::StepWithRetry(stmt);
1927 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1928 errCode = E_OK;
1929 } else {
1930 LOGE("[Storage Executor] Step update record status stmt failed, %d.", errCode);
1931 }
1932 SQLiteUtils::ResetStatement(stmt, true, ret);
1933 return errCode == E_OK ? ret : errCode;
1934 }
1935
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1936 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1937 int startIndex, VBucket &record)
1938 {
1939 int errCode = E_OK;
1940 for (const auto &field : fields) {
1941 Type cloudValue;
1942 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1943 if (errCode != E_OK) {
1944 break;
1945 }
1946 errCode = PutVBucketByType(record, field, cloudValue);
1947 if (errCode != E_OK) {
1948 break;
1949 }
1950 startIndex++;
1951 }
1952 return errCode;
1953 }
1954
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt,int & index)1955 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1956 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt, int &index)
1957 {
1958 int errCode = E_OK;
1959 std::string version;
1960 if (putDataMode_ == PutDataMode::SYNC) {
1961 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1962 if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1963 LOGE("get version for insert log statement failed, %d", errCode);
1964 return -E_CLOUD_ERROR;
1965 }
1966 }
1967 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, version); // next is version
1968 if (errCode != E_OK) {
1969 LOGE("Bind version to insert log statement failed, %d", errCode);
1970 return errCode;
1971 }
1972
1973 std::string shareUri;
1974 if (putDataMode_ == PutDataMode::SYNC) {
1975 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1976 vBucket, shareUri);
1977 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1978 LOGE("get shareUri for insert log statement failed, %d", errCode);
1979 return -E_CLOUD_ERROR;
1980 }
1981 }
1982
1983 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, shareUri); // next is sharing_resource
1984 if (errCode != E_OK) {
1985 LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1986 }
1987 return errCode;
1988 }
1989
CheckAndCreateTrigger(const TableInfo & table)1990 void SQLiteSingleVerRelationalStorageExecutor::CheckAndCreateTrigger(const TableInfo &table)
1991 {
1992 auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
1993 tableManager->CheckAndCreateTrigger(dbHandle_, table, "");
1994 }
1995
ClearLogOfMismatchedData(const std::string & tableName)1996 void SQLiteSingleVerRelationalStorageExecutor::ClearLogOfMismatchedData(const std::string &tableName)
1997 {
1998 bool isLogTableExist = false;
1999 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
2000 if (!isLogTableExist) {
2001 return;
2002 }
2003 std::string sql = "SELECT data_key from " + DBCommon::GetLogTableName(tableName) + " WHERE data_key NOT IN " +
2004 "(SELECT _rowid_ FROM " + tableName + ") AND data_key != -1";
2005 sqlite3_stmt *stmt = nullptr;
2006 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2007 if (errCode != E_OK) {
2008 LOGW("[RDBExecutor][ClearMisLog] Get stmt failed, %d", errCode);
2009 return;
2010 }
2011 std::vector<int64_t> dataKeys;
2012 std::string misDataKeys = "(";
2013 errCode = SQLiteUtils::StepWithRetry(stmt, false);
2014 while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2015 int dataKey = sqlite3_column_int64(stmt, 0);
2016 dataKeys.push_back(dataKey);
2017 misDataKeys += std::to_string(dataKey) + ",";
2018 errCode = SQLiteUtils::StepWithRetry(stmt, false);
2019 }
2020 SQLiteUtils::ResetStatement(stmt, true, errCode);
2021 stmt = nullptr;
2022 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2023 LOGW("[RDBExecutor][ClearMisLog] Step failed. %d", errCode);
2024 return;
2025 }
2026 if (dataKeys.empty()) {
2027 return;
2028 }
2029 misDataKeys.pop_back();
2030 misDataKeys += ")";
2031 LOGW("[RDBExecutor][ClearMisLog] Mismatched:%s", misDataKeys.c_str());
2032 std::string delSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key IN " + misDataKeys;
2033 errCode = SQLiteUtils::GetStatement(dbHandle_, delSql, stmt);
2034 if (errCode != E_OK) {
2035 LOGW("[RDBExecutor][ClearMisLog] Get del stmt failed, %d", errCode);
2036 return;
2037 }
2038 errCode = SQLiteUtils::StepWithRetry(stmt, false);
2039 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2040 LOGW("[RDBExecutor][ClearMisLog] Step del failed, %d", errCode);
2041 }
2042 SQLiteUtils::ResetStatement(stmt, true, errCode);
2043 }
2044
RecoverNullExtendLog(const TrackerSchema & trackerSchema,const TrackerTable & table)2045 void SQLiteSingleVerRelationalStorageExecutor::RecoverNullExtendLog(const TrackerSchema &trackerSchema,
2046 const TrackerTable &table)
2047 {
2048 if (trackerSchema.extendColNames.empty()) {
2049 return;
2050 }
2051 std::string sql = "SELECT COUNT(1) FROM " + DBCommon::GetLogTableName(trackerSchema.tableName) +
2052 " WHERE (json_valid(extend_field) = 0 OR json_type(extend_field) IS NOT 'object') AND data_key != -1";
2053 if (!SQLiteRelationalUtils::ExecuteCheckSql(dbHandle_, sql).second) {
2054 return;
2055 }
2056 auto errCode = SQLiteUtils::BeginTransaction(dbHandle_, TransactType::IMMEDIATE);
2057 if (errCode != E_OK) {
2058 LOGE("[RDBExecutor][RecoverTracker] Begin transaction fail %d", errCode);
2059 return;
2060 }
2061 errCode = RecoverNullExtendLogInner(trackerSchema, table);
2062 int changeRow = 0;
2063 if (errCode == E_OK) {
2064 changeRow = sqlite3_changes(dbHandle_);
2065 errCode = SQLiteUtils::CommitTransaction(dbHandle_);
2066 } else {
2067 (void)SQLiteUtils::RollbackTransaction(dbHandle_);
2068 }
2069 LOGI("[RDBExecutor][RecoverTracker] Recover tracker[%s][%zu] finished[%d] changeRow[%d]",
2070 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(),
2071 errCode, changeRow);
2072 }
2073
RecoverNullExtendLogInner(const TrackerSchema & trackerSchema,const TrackerTable & table)2074 int SQLiteSingleVerRelationalStorageExecutor::RecoverNullExtendLogInner(const TrackerSchema &trackerSchema,
2075 const TrackerTable &table)
2076 {
2077 std::vector<std::function<int()>> actions;
2078 actions.emplace_back([this]() {
2079 return SetLogTriggerStatus(false);
2080 });
2081 actions.emplace_back([this, &table]() {
2082 return SQLiteUtils::ExecuteRawSQL(dbHandle_, table.GetTempUpdateLogCursorTriggerSql());
2083 });
2084 actions.emplace_back([this, &trackerSchema]() {
2085 return UpdateExtendField(trackerSchema.tableName, trackerSchema.extendColNames,
2086 " AND (json_valid(log.extend_field) = 0 OR json_type(log.extend_field) IS NOT 'object')");
2087 });
2088 actions.emplace_back([this]() {
2089 return ClearAllTempSyncTrigger();
2090 });
2091 actions.emplace_back([this]() {
2092 return SetLogTriggerStatus(true);
2093 });
2094 return SQLiteRelationalUtils::ExecuteListAction(actions);
2095 }
2096 } // namespace DistributedDB
2097 #endif
2098