1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37 std::string querySql;
38 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39 std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40 int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41 if (errCode != E_OK) {
42 LOGE("Get query log sql fail, %d", errCode);
43 return errCode;
44 }
45 if (!pkSet.empty()) {
46 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47 if (errCode != E_OK) {
48 LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49 return errCode;
50 }
51 }
52 sqlite3_stmt *selectStmt = nullptr;
53 errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54 if (errCode != E_OK) {
55 LOGE("Get query log statement fail, %d", errCode);
56 return errCode;
57 }
58
59 bool alreadyFound = false;
60 do {
61 errCode = SQLiteUtils::StepWithRetry(selectStmt);
62 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63 if (alreadyFound) {
64 LOGE("found more than one records in log table for one primary key or gid.");
65 errCode = -E_CLOUD_ERROR;
66 break;
67 }
68 alreadyFound = true;
69 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70 errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71 if (errCode != E_OK) {
72 LOGE("Get info by statement fail, %d", errCode);
73 break;
74 }
75 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76 errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77 break;
78 } else {
79 LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80 break;
81 }
82 } while (errCode == E_OK);
83
84 int ret = E_OK;
85 SQLiteUtils::ResetStatement(selectStmt, true, ret);
86 return errCode != E_OK ? errCode : ret;
87 }
88
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91 int index = 0;
92 logInfo.dataKey = sqlite3_column_int64(statement, index++);
93 std::vector<uint8_t> device;
94 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device); // 1 is device
95 DBCommon::VectorToString(device, logInfo.device);
96 std::vector<uint8_t> originDev;
97 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98 DBCommon::VectorToString(originDev, logInfo.originDev);
99 logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100 logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105 logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107 return index;
108 }
109
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111 const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112 VBucket &assetInfo)
113 {
114 int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115 int errCode = E_OK;
116 for (const auto &field: assetFields) {
117 Type cloudValue;
118 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119 if (errCode != E_OK) {
120 break;
121 }
122 errCode = PutVBucketByType(assetInfo, field, cloudValue);
123 if (errCode != E_OK) {
124 break;
125 }
126 }
127 if (errCode != E_OK) {
128 LOGE("set asset field failed, errCode = %d", errCode);
129 return errCode;
130 }
131
132 // fill primary key
133 for (const auto &item : pkMap) {
134 Type cloudValue;
135 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136 if (errCode != E_OK) {
137 break;
138 }
139 errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140 if (errCode != E_OK) {
141 break;
142 }
143 }
144 return errCode;
145 }
146
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149 std::string sql = "insert into " + tableSchema.name + "(";
150 for (const auto &field : tableSchema.fields) {
151 sql += field.colName + ",";
152 }
153 sql.pop_back();
154 sql += ") values(";
155 for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156 sql += "?,";
157 }
158 sql.pop_back();
159 sql += ");";
160 return sql;
161 }
162
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164 const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166 int errCode = E_OK;
167 TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168 // table name in cloud schema is in lower case
169 if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170 LOGE("localSchema doesn't contain table from cloud");
171 return -E_INTERNAL_ERROR;
172 }
173
174 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175 if (pkMap.size() == 0) {
176 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177 std::vector<uint8_t> value;
178 DBCommon::StringToVector(std::to_string(rowid), value);
179 errCode = DBCommon::CalcValueHash(value, hashValue);
180 } else {
181 std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182 tableSchema, localTable, pkMap, allowEmpty);
183 }
184 return errCode;
185 }
186
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188 const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190 int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191 if (errCode != E_OK) {
192 LOGE("Get select log statement failed, %d", errCode);
193 return errCode;
194 }
195
196 std::string cloudGid;
197 int ret = E_OK;
198 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200 SQLiteUtils::ResetStatement(selectStmt, true, ret);
201 LOGE("Get cloud gid fail when bind query log statement.");
202 return errCode;
203 }
204
205 int index = 0;
206 if (!cloudGid.empty()) {
207 index++;
208 errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209 if (errCode != E_OK) {
210 LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211 SQLiteUtils::ResetStatement(selectStmt, true, ret);
212 return errCode;
213 }
214 }
215
216 index++;
217 errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218 if (errCode != E_OK) {
219 LOGE("Bind hash key to query log statement failed. %d", errCode);
220 SQLiteUtils::ResetStatement(selectStmt, true, ret);
221 }
222 return errCode != E_OK ? errCode : ret;
223 }
224
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226 const std::set<std::string> &pkSet, std::string &querySql)
227 {
228 std::string cloudGid;
229 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230 if (errCode != E_OK) {
231 LOGE("Get cloud gid fail when query log table.");
232 return errCode;
233 }
234
235 if (pkSet.empty() && cloudGid.empty()) {
236 LOGE("query log table failed because of both primary key and gid are empty.");
237 return -E_CLOUD_ERROR;
238 }
239 std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240 " sharing_resource, status, version FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
241 "_log WHERE ";
242 if (!cloudGid.empty()) {
243 sql += "cloud_gid = ? OR ";
244 }
245 sql += "hash_key = ?";
246
247 querySql = sql;
248 return E_OK;
249 }
250
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)251 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
252 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
253 std::map<int, int> &statisticMap)
254 {
255 int index = 0;
256 int errCode = E_OK;
257 for (OpType op : downloadData.opType) {
258 VBucket &vBucket = downloadData.data[index];
259 switch (op) {
260 case OpType::INSERT:
261 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
262 break;
263 case OpType::UPDATE:
264 errCode = UpdateCloudData(vBucket, tableSchema);
265 break;
266 case OpType::DELETE:
267 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
268 break;
269 case OpType::ONLY_UPDATE_GID:
270 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
271 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
272 case OpType::UPDATE_TIMESTAMP:
273 case OpType::CLEAR_GID:
274 case OpType::LOCKED_NOT_HANDLE:
275 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
276 [[fallthrough]];
277 case OpType::NOT_HANDLE:
278 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
279 GetLocalDataKey(index, downloadData), op) : errCode;
280 break;
281 default:
282 errCode = -E_CLOUD_ERROR;
283 break;
284 }
285 if (errCode != E_OK) {
286 LOGE("put cloud sync data fail: %d", errCode);
287 return errCode;
288 }
289 statisticMap[static_cast<int>(op)]++;
290 index++;
291 }
292 return errCode;
293 }
294
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)295 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
296 const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
297 std::vector<Asset> &assets, std::vector<std::string> ¬ifyTableList)
298 {
299 int errCode = SetLogTriggerStatus(false);
300 if (errCode != E_OK) {
301 LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
302 return errCode;
303 }
304 if (mode == FLAG_ONLY) {
305 errCode = DoCleanLogs(tableNameList, localSchema);
306 if (errCode != E_OK) {
307 LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
308 return errCode;
309 }
310 notifyTableList = tableNameList;
311 } else if (mode == FLAG_AND_DATA) {
312 errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
313 if (errCode != E_OK) {
314 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
315 return errCode;
316 }
317 notifyTableList = tableNameList;
318 } else if (mode == CLEAR_SHARED_TABLE) {
319 errCode = DoCleanShareTableDataAndLog(tableNameList);
320 if (errCode != E_OK) {
321 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
322 return errCode;
323 }
324 notifyTableList = tableNameList;
325 }
326 for (const auto &tableName: tableNameList) {
327 errCode = CleanDownloadingFlag(tableName);
328 if (errCode != E_OK) {
329 LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu",
330 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
331 return errCode;
332 }
333 }
334 errCode = SetLogTriggerStatus(true);
335 if (errCode != E_OK) {
336 LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
337 }
338
339 return errCode;
340 }
341
DoClearCloudLogVersion(const std::vector<std::string> & tableNameList)342 int SQLiteSingleVerRelationalStorageExecutor::DoClearCloudLogVersion(const std::vector<std::string> &tableNameList)
343 {
344 int errCode = E_OK;
345 for (const auto &tableName: tableNameList) {
346 std::string logTableName = DBCommon::GetLogTableName(tableName);
347 LOGD("[Storage Executor] Start clear version on log table.");
348 errCode = ClearVersionOnLogTable(logTableName);
349 if (errCode != E_OK) {
350 std::string maskedName = DBCommon::StringMiddleMasking(tableName);
351 LOGE("[Storage Executor] failed to clear version of cloud data on log table %s (name length is %zu), %d",
352 maskedName.c_str(), maskedName.length(), errCode);
353 return errCode;
354 }
355 }
356 return errCode;
357 }
358
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)359 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
360 const RelationalSchemaObject &localSchema)
361 {
362 int errCode = E_OK;
363 int i = 1;
364 for (const auto &tableName: tableNameList) {
365 std::string logTableName = DBCommon::GetLogTableName(tableName);
366 LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
367 errCode = DoCleanAssetId(tableName, localSchema);
368 if (errCode != E_OK) {
369 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
370 return errCode;
371 }
372 int32_t count = 0;
373 (void)GetFlagIsLocalCount(logTableName, count);
374 LOGI("[DoCleanLogs]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
375 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
376 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
377 if (errCode != E_OK) {
378 LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
379 return errCode;
380 }
381 (void)GetFlagIsLocalCount(logTableName, count);
382 LOGI("[DoCleanLogs]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
383 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
384 i++;
385 }
386
387 return errCode;
388 }
389
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)390 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
391 {
392 if (ctx == nullptr || argc != 0 || argv == nullptr) {
393 LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
394 return;
395 }
396 auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
397 if (context == nullptr) {
398 LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
399 return;
400 }
401 context->cursor++;
402 sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
403 }
404
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const405 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
406 void(*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
407 {
408 std::string sql = "update_cursor";
409 int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
410 &context, updateCursor, nullptr, nullptr, nullptr);
411 if (errCode != SQLITE_OK) {
412 LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
413 return SQLiteUtils::MapSQLiteErrno(errCode);
414 }
415 return E_OK;
416 }
417
GetCursor(const std::string & tableName,uint64_t & cursor)418 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName, uint64_t &cursor)
419 {
420 return SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
421 }
422
SetCursor(const std::string & tableName,uint64_t cursor)423 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, uint64_t cursor)
424 {
425 std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata SET VALUE = ? where KEY = ?;";
426 sqlite3_stmt *stmt = nullptr;
427 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
428 if (errCode != E_OK) {
429 LOGE("Set cursor sql failed=%d", errCode);
430 return errCode;
431 }
432 ResFinalizer finalizer([stmt]() {
433 sqlite3_stmt *statement = stmt;
434 int ret = E_OK;
435 SQLiteUtils::ResetStatement(statement, true, ret);
436 if (ret != E_OK) {
437 LOGW("Reset stmt failed %d when set cursor", ret);
438 }
439 });
440 int index = 1;
441 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
442 if (errCode != E_OK) {
443 LOGE("Bind saved cursor failed:%d", errCode);
444 return errCode;
445 }
446 Key key;
447 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
448 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
449 if (errCode != E_OK) {
450 return errCode;
451 }
452 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
453 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
454 errCode = E_OK;
455 }
456 return errCode;
457 }
458
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)459 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
460 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
461 {
462 int errCode = E_OK;
463 for (size_t i = 0; i < tableNameList.size(); i++) {
464 std::string tableName = tableNameList[i];
465 std::string logTableName = DBCommon::GetLogTableName(tableName);
466 std::vector<int64_t> dataKeys;
467 errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
468 if (errCode != E_OK) {
469 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
470 return errCode;
471 }
472
473 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
474 errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
475 if (errCode != E_OK) {
476 LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
477 return errCode;
478 }
479 int32_t count = 0;
480 (void)GetFlagIsLocalCount(logTableName, count);
481 LOGI("[DoCleanLogAndData]flag is local in table:%s, len:%zu, count:%d, before remove device data.",
482 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
483 if (isLogicDelete_) {
484 errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
485 } else {
486 errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
487 }
488 if (errCode != E_OK) {
489 LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
490 return errCode;
491 }
492 (void)GetFlagIsLocalCount(logTableName, count);
493 LOGI("[DoCleanLogAndData]flag is local in table:%s, len:%zu, count:%d, after remove device data.",
494 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), count);
495 }
496 return errCode;
497 }
498
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)499 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
500 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
501 {
502 int errCode = E_OK;
503 int ret = E_OK;
504 sqlite3_stmt *selectStmt = nullptr;
505 for (const auto &rowId : dataKeys) {
506 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
507 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
508 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
509 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
510 LOGE("Get select asset statement failed, %d", errCode);
511 return errCode;
512 }
513 errCode = SQLiteUtils::StepWithRetry(selectStmt);
514 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
515 std::vector<uint8_t> blobValue;
516 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
517 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
518 LOGE("Get column blob value failed, %d", errCode);
519 goto END;
520 }
521 if (blobValue.empty()) {
522 SQLiteUtils::ResetStatement(selectStmt, true, ret);
523 continue;
524 }
525 Asset asset;
526 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
527 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
528 LOGE("Transfer blob to asset failed, %d", errCode);
529 goto END;
530 }
531 assets.push_back(asset);
532 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
533 errCode = E_OK;
534 Asset asset;
535 assets.push_back(asset);
536 }
537 SQLiteUtils::ResetStatement(selectStmt, true, ret);
538 }
539 return errCode != E_OK ? errCode : ret;
540 END:
541 SQLiteUtils::ResetStatement(selectStmt, true, ret);
542 return errCode != E_OK ? errCode : ret;
543 }
544
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)545 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
546 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
547 {
548 int errCode = E_OK;
549 int ret = E_OK;
550 sqlite3_stmt *selectStmt = nullptr;
551 for (const auto &rowId : dataKeys) {
552 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
553 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
554 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
555 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
556 LOGE("Get select assets statement failed, %d", errCode);
557 goto END;
558 }
559 errCode = SQLiteUtils::StepWithRetry(selectStmt);
560 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
561 std::vector<uint8_t> blobValue;
562 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
563 if (errCode != E_OK) {
564 goto END;
565 }
566 Assets tmpAssets;
567 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
568 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
569 goto END;
570 }
571 for (const auto &asset: tmpAssets) {
572 assets.push_back(asset);
573 }
574 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
575 errCode = E_OK;
576 }
577 SQLiteUtils::ResetStatement(selectStmt, true, ret);
578 }
579 return errCode != E_OK ? errCode : ret;
580 END:
581 SQLiteUtils::ResetStatement(selectStmt, true, ret);
582 return errCode != E_OK ? errCode : ret;
583 }
584
GetCloudDataCount(const std::string & tableName,DownloadData & downloadData,int64_t & count)585 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataCount(const std::string &tableName,
586 DownloadData &downloadData, int64_t &count)
587 {
588 if (downloadData.data.empty()) {
589 return E_OK;
590 }
591 int errCode = E_OK;
592 sqlite3_stmt *queryStmt = nullptr;
593 std::string querySql = "SELECT COUNT(*) FROM '" + DBCommon::GetLogTableName(tableName) +
594 "' WHERE FLAG & 0x02 = 0 AND CLOUD_GID IN (";
595 for (const VBucket &vBucket : downloadData.data) {
596 std::string cloudGid;
597 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
598 if (errCode != E_OK) {
599 LOGE("get gid for query log statement failed, %d", errCode);
600 return -E_CLOUD_ERROR;
601 }
602 querySql += "'" + cloudGid + "',";
603 }
604 querySql.pop_back();
605 querySql += ");";
606 errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, queryStmt);
607 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
608 LOGE("Get query count statement failed, %d", errCode);
609 return errCode;
610 }
611 errCode = SQLiteUtils::StepWithRetry(queryStmt);
612 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
613 count = static_cast<int64_t>(sqlite3_column_int64(queryStmt, 0));
614 errCode = E_OK;
615 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
616 errCode = E_OK;
617 }
618 int ret = E_OK;
619 SQLiteUtils::ResetStatement(queryStmt, true, ret);
620 return errCode != E_OK ? errCode : ret;
621 }
622
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)623 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
624 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
625 {
626 int errCode = E_OK;
627 for (const auto &fieldInfo: fieldInfos) {
628 if (fieldInfo.IsAssetType()) {
629 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
630 if (errCode != E_OK) {
631 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
632 return errCode;
633 }
634 } else if (fieldInfo.IsAssetsType()) {
635 errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
636 if (errCode != E_OK) {
637 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
638 return errCode;
639 }
640 }
641 }
642 return errCode;
643 }
644
SetCursorIncFlag(bool flag)645 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
646 {
647 std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
648 " VALUES ('cursor_inc_flag', ";
649 if (flag) {
650 sql += "'true'";
651 } else {
652 sql += "'false'";
653 }
654 sql += ");";
655 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
656 if (errCode != E_OK) {
657 LOGE("set cursor inc flag fail, errCode = %d", errCode);
658 }
659 return errCode;
660 }
661
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)662 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
663 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
664 {
665 if (downloadData.data.size() != downloadData.opType.size()) {
666 LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
667 downloadData.opType.size());
668 return -E_CLOUD_ERROR;
669 }
670
671 int errCode = SetLogTriggerStatus(false);
672 if (errCode != E_OK) {
673 LOGE("Fail to set log trigger off, %d", errCode);
674 return errCode;
675 }
676
677 std::map<int, int> statisticMap = {};
678 errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
679 if (errCode != E_OK) {
680 LOGE("ExecutePutCloudData failed, %d", errCode);
681 }
682 int ret = SetLogTriggerStatus(true);
683 if (ret != E_OK) {
684 LOGE("Fail to set log trigger on, %d", ret);
685 }
686 int64_t count = 0;
687 int errCodeCount = GetCloudDataCount(tableName, downloadData, count);
688 if (errCodeCount != E_OK) {
689 LOGW("get cloud data count failed, %d", errCodeCount);
690 }
691 LOGI("save cloud data of table %s [length %zu]:%d, cloud data count:%lld, ins:%d, upd:%d, del:%d, only gid:%d,"
692 "flag zero:%d, flag one:%d, upd timestamp:%d, clear gid:%d, not handle:%d, lock:%d",
693 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode, count,
694 statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
695 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
696 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
697 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
698 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
699 statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
700 return errCode == E_OK ? ret : errCode;
701 }
702
UpdateAssetStatusForAssetOnly(const TableSchema & tableSchema,VBucket & vBucket)703 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetStatusForAssetOnly(
704 const TableSchema &tableSchema, VBucket &vBucket)
705 {
706 std::string cloudGid;
707 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
708 if (errCode != E_OK) {
709 LOGE("Miss gid when fill Asset %d.", errCode);
710 return errCode;
711 }
712 std::vector<Field> assetsField;
713 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
714 if (errCode != E_OK) {
715 LOGE("No assets need to be filled when download assets only, err:%d.", errCode);
716 return errCode;
717 }
718
719 sqlite3_stmt *stmt = nullptr;
720 errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
721 if (errCode != E_OK) {
722 LOGE("can not get assetsField from tableSchema err:%d when download assets only.", errCode);
723 return errCode;
724 }
725 return ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
726 }
727
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)728 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
729 const TrackerTable &trackerTable, int64_t dataKey)
730 {
731 int errCode = E_OK;
732 if (dataKey > 0) {
733 errCode = RemoveDataAndLog(tableSchema.name, dataKey);
734 if (errCode != E_OK) {
735 return errCode;
736 }
737 }
738 std::string sql = GetInsertSqlForCloudSync(tableSchema);
739 sqlite3_stmt *insertStmt = nullptr;
740 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
741 if (errCode != E_OK) {
742 LOGE("Get insert statement failed when save cloud data, %d", errCode);
743 return errCode;
744 }
745 if (putDataMode_ == PutDataMode::SYNC) {
746 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
747 }
748 int ret = E_OK;
749 errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
750 if (errCode != E_OK) {
751 SQLiteUtils::ResetStatement(insertStmt, true, ret);
752 return errCode;
753 }
754 // insert data
755 errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
756 SQLiteUtils::ResetStatement(insertStmt, true, ret);
757 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
758 LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
759 return errCode;
760 }
761
762 // insert log
763 return InsertLogRecord(tableSchema, trackerTable, vBucket);
764 }
765
GetInsertLogSql(const std::string & logTableName,const std::set<std::string> & extendColNames)766 std::string GetInsertLogSql(const std::string &logTableName, const std::set<std::string> &extendColNames)
767 {
768 if (extendColNames.empty()) {
769 return "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " +
770 "CASE WHEN (SELECT status FROM " + logTableName + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
771 "(SELECT status FROM " + logTableName + " WHERE hash_key=?) " + "END)";
772 }
773 std::string sql = "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, json_object(";
774 for (const auto &extendColName : extendColNames) {
775 sql += "'" + extendColName + "',?,";
776 }
777 sql.pop_back();
778 sql += "), 0, ?, ?, CASE WHEN (SELECT status FROM " + logTableName +
779 " WHERE hash_key=?) IS NULL THEN 0 ELSE " + "(SELECT status FROM " + logTableName +
780 " WHERE hash_key=?) " + "END)";
781 return sql;
782 }
783
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)784 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
785 const TrackerTable &trackerTable, VBucket &vBucket)
786 {
787 if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
788 // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
789 // so we need to delete the old log record according to the gid first
790 std::string gidStr;
791 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
792 if (errCode != E_OK || gidStr.empty()) {
793 LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
794 return errCode;
795 }
796 std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
797 + gidStr + "';";
798 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
799 if (errCode != E_OK) {
800 LOGE("delete log record according gid fail, errCode = %d", errCode);
801 return errCode;
802 }
803 }
804
805 std::string sql = GetInsertLogSql(DBCommon::GetLogTableName(tableSchema.name), trackerTable.GetExtendNames());
806 sqlite3_stmt *insertLogStmt = nullptr;
807 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
808 if (errCode != E_OK) {
809 LOGE("Get insert log statement failed when save cloud data, %d", errCode);
810 return errCode;
811 }
812
813 int ret = E_OK;
814 errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
815 if (errCode != E_OK) {
816 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
817 return errCode;
818 }
819
820 errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
821 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
822 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
823 return ret;
824 } else {
825 LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
826 return errCode;
827 }
828 }
829
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)830 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
831 sqlite3_stmt *updateStmt)
832 {
833 auto it = bindCloudFieldFuncMap_.find(field.type);
834 if (it == bindCloudFieldFuncMap_.end()) {
835 LOGE("unknown cloud type when bind one field.");
836 return -E_CLOUD_ERROR;
837 }
838 return it->second(index, vBucket, field, updateStmt);
839 }
840
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)841 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
842 const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
843 {
844 int errCode = E_OK;
845 int index = 0;
846 for (const auto &field : fields) {
847 index++;
848 errCode = BindOneField(index, vBucket, field, upsertStmt);
849 if (errCode != E_OK) {
850 return errCode;
851 }
852 }
853 return errCode;
854 }
855
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey,int & index)856 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
857 std::vector<uint8_t> &hashKey, int &index)
858 {
859 int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
860 if (errCode != E_OK) {
861 LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
862 return errCode;
863 }
864
865 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
866 if (errCode != E_OK) {
867 LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
868 return errCode;
869 }
870 return errCode;
871 }
872
BindExtendValue(const VBucket & vBucket,const TrackerTable & trackerTable,sqlite3_stmt * stmt,int & index)873 int BindExtendValue(const VBucket &vBucket, const TrackerTable &trackerTable, sqlite3_stmt *stmt, int &index)
874 {
875 const std::set<std::string> &extendColNames = trackerTable.GetExtendNames();
876 int errCode = E_OK;
877 int extendValueIndex = index;
878 if (extendColNames.empty()) {
879 return SQLiteUtils::BindTextToStatement(stmt, index++, "");
880 }
881 for (const auto &extendColName : extendColNames) {
882 if (vBucket.find(extendColName) == vBucket.end()) {
883 errCode = SQLiteUtils::BindTextToStatement(stmt, extendValueIndex++, "");
884 } else {
885 Type extendValue = vBucket.at(extendColName);
886 errCode = SQLiteRelationalUtils::BindStatementByType(stmt, extendValueIndex++, extendValue);
887 }
888 if (errCode != E_OK) {
889 const std::string tableName = DBCommon::StringMiddleMasking(trackerTable.GetTableName());
890 size_t nameLength = trackerTable.GetTableName().size();
891 LOGE("[%s [%zu]] Bind extend field failed: %d", tableName.c_str(), nameLength, errCode);
892 return errCode;
893 }
894 }
895 index = extendValueIndex;
896 return E_OK;
897 }
898
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt,int & index)899 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
900 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt, int &index)
901 {
902 std::vector<uint8_t> hashKey;
903 int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
904 if (errCode != E_OK) {
905 return errCode;
906 }
907 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
908 if (errCode != E_OK) {
909 LOGE("Bind hash_key to insert log statement failed, %d", errCode);
910 return errCode;
911 }
912
913 std::string cloudGid;
914 if (putDataMode_ == PutDataMode::SYNC) {
915 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
916 if (errCode != E_OK) {
917 LOGE("get gid for insert log statement failed, %d", errCode);
918 return -E_CLOUD_ERROR;
919 }
920 }
921
922 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, cloudGid); // next is cloud_gid
923 if (errCode != E_OK) {
924 LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
925 return errCode;
926 }
927
928 errCode = BindExtendValue(vBucket, trackerTable, insertLogStmt, index); // next is extend_field
929 if (errCode != E_OK) {
930 LOGE("Bind extend_field to insert log statement failed, %d", errCode);
931 return errCode;
932 }
933
934 errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt, index);
935 if (errCode != E_OK) {
936 return errCode;
937 }
938 return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey, index);
939 }
940
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)941 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
942 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
943 {
944 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
945 int bindIndex = 1; // 1 is rowid
946 int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, rowid);
947 if (errCode != E_OK) {
948 LOGE("Bind rowid to insert log statement failed, %d", errCode);
949 return errCode;
950 }
951
952 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is device
953 if (errCode != E_OK) {
954 LOGE("Bind device to insert log statement failed, %d", errCode);
955 return errCode;
956 }
957
958 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is ori_device
959 if (errCode != E_OK) {
960 LOGE("Bind ori_device to insert log statement failed, %d", errCode);
961 return errCode;
962 }
963
964 int64_t val = 0;
965 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
966 if (errCode != E_OK) {
967 LOGE("get modify time for insert log statement failed, %d", errCode);
968 return -E_CLOUD_ERROR;
969 }
970
971 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is timestamp
972 if (errCode != E_OK) {
973 LOGE("Bind timestamp to insert log statement failed, %d", errCode);
974 return errCode;
975 }
976
977 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
978 if (errCode != E_OK) {
979 LOGE("get create time for insert log statement failed, %d", errCode);
980 return -E_CLOUD_ERROR;
981 }
982
983 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is wtimestamp
984 if (errCode != E_OK) {
985 LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
986 return errCode;
987 }
988
989 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, bindIndex++, GetDataFlag())); // next is flag
990 if (errCode != E_OK) {
991 LOGE("Bind flag to insert log statement failed, %d", errCode);
992 return errCode;
993 }
994
995 vBucket[DBConstant::ROWID] = rowid; // fill rowid to cloud data to notify user
996 return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt, bindIndex);
997 }
998
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)999 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
1000 const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
1001 {
1002 std::string where = " WHERE";
1003 if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
1004 where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
1005 DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
1006 }
1007 if (!pkSet.empty() && queryByPk) {
1008 if (!gidStr.empty()) {
1009 where += " OR";
1010 }
1011 where += " (1 = 1";
1012 for (const auto &pk : pkSet) {
1013 where += (" AND " + pk + " = ?");
1014 }
1015 where += ");";
1016 }
1017 return where;
1018 }
1019
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)1020 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
1021 const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
1022 std::string &updateSql)
1023 {
1024 if (pkSet.empty() && gidStr.empty()) {
1025 LOGE("update data fail because both primary key and gid is empty.");
1026 return -E_CLOUD_ERROR;
1027 }
1028 std::string sql = "UPDATE " + tableSchema.name + " SET";
1029 for (const auto &field : updateFields) {
1030 sql += " " + field.colName + " = ?,";
1031 }
1032 sql.pop_back();
1033 sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
1034 updateSql = sql;
1035 return E_OK;
1036 }
1037
IsGidValid(const std::string & gidStr)1038 static inline bool IsGidValid(const std::string &gidStr)
1039 {
1040 if (!gidStr.empty()) {
1041 return gidStr.find("'") == std::string::npos;
1042 }
1043 return true;
1044 }
1045
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)1046 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
1047 const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
1048 {
1049 std::string gidStr;
1050 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1051 if (errCode != E_OK) {
1052 LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
1053 return errCode;
1054 }
1055 if (!IsGidValid(gidStr)) {
1056 LOGE("invalid char in cloud gid");
1057 return -E_CLOUD_ERROR;
1058 }
1059
1060 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1061 auto updateFields = GetUpdateField(vBucket, tableSchema);
1062 std::string updateSql;
1063 errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
1064 if (errCode != E_OK) {
1065 return errCode;
1066 }
1067
1068 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
1069 if (errCode != E_OK) {
1070 LOGE("Get update statement failed when update cloud data, %d", errCode);
1071 return errCode;
1072 }
1073
1074 // bind value
1075 if (!pkSet.empty()) {
1076 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1077 updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
1078 }
1079 errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
1080 if (errCode != E_OK) {
1081 LOGE("bind value to update statement failed when update cloud data, %d", errCode);
1082 int ret = E_OK;
1083 SQLiteUtils::ResetStatement(updateStmt, true, ret);
1084 }
1085 return errCode;
1086 }
1087
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)1088 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
1089 {
1090 if (putDataMode_ == PutDataMode::SYNC) {
1091 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
1092 }
1093 sqlite3_stmt *updateStmt = nullptr;
1094 int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
1095 if (errCode != E_OK) {
1096 LOGE("Get update data table statement fail, %d", errCode);
1097 return errCode;
1098 }
1099
1100 // update data
1101 int ret = E_OK;
1102 errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
1103 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1104 errCode = E_OK;
1105 } else {
1106 LOGE("update data failed when save cloud data:%d", errCode);
1107 SQLiteUtils::ResetStatement(updateStmt, true, ret);
1108 return errCode;
1109 }
1110 SQLiteUtils::ResetStatement(updateStmt, true, ret);
1111
1112 // update log
1113 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
1114 if (errCode != E_OK) {
1115 LOGE("update log record failed when update cloud data, errCode = %d", errCode);
1116 }
1117 return errCode;
1118 }
1119
IsAllowWithPrimaryKey(OpType opType)1120 static inline bool IsAllowWithPrimaryKey(OpType opType)
1121 {
1122 return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
1123 opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
1124 }
1125
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1126 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1127 OpType opType)
1128 {
1129 sqlite3_stmt *updateLogStmt = nullptr;
1130 std::vector<std::string> updateColName;
1131 int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1132 if (errCode != E_OK) {
1133 LOGE("Get update log statement failed, errCode = %d", errCode);
1134 return errCode;
1135 }
1136
1137 errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1138 updateLogStmt);
1139 int ret = E_OK;
1140 if (errCode != E_OK) {
1141 LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1142 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1143 return errCode != E_OK ? errCode : ret;
1144 }
1145
1146 errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1147 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1148 errCode = E_OK;
1149 } else {
1150 LOGE("update log record failed when update cloud data:%d", errCode);
1151 }
1152 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1153 return errCode != E_OK ? errCode : ret;
1154 }
1155
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1156 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1157 const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1158 sqlite3_stmt *updateLogStmt)
1159 {
1160 int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1161 if (errCode != E_OK) {
1162 return errCode;
1163 }
1164 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1165 if (pkMap.empty()) {
1166 return E_OK;
1167 }
1168
1169 std::vector<uint8_t> hashKey;
1170 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1171 if (errCode != E_OK) {
1172 return errCode;
1173 }
1174 return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1175 }
1176
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1177 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1178 const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1179 {
1180 std::string gidStr;
1181 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1182 if (errCode != E_OK) {
1183 LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1184 return errCode;
1185 }
1186 if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1187 LOGE("empty or invalid char in cloud gid");
1188 return -E_CLOUD_ERROR;
1189 }
1190
1191 bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1192 std::string deleteSql = "DELETE FROM " + tableSchema.name;
1193 deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1194 errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1195 if (errCode != E_OK) {
1196 LOGE("Get delete statement failed when delete data, %d", errCode);
1197 return errCode;
1198 }
1199
1200 int ret = E_OK;
1201 if (!pkSet.empty() && queryByPk) {
1202 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1203 errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1204 if (errCode != E_OK) {
1205 LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1206 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1207 }
1208 }
1209 return errCode != E_OK ? errCode : ret;
1210 }
1211
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1212 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1213 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1214 {
1215 if (isLogicDelete_) {
1216 return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1217 }
1218 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1219 sqlite3_stmt *deleteStmt = nullptr;
1220 int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1221 if (errCode != E_OK) {
1222 return errCode;
1223 }
1224 errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1225 int ret = E_OK;
1226 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1227 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1228 LOGE("delete data failed when sync with cloud:%d", errCode);
1229 return errCode;
1230 }
1231 if (ret != E_OK) {
1232 LOGE("reset delete statement failed:%d", ret);
1233 return ret;
1234 }
1235
1236 // update log
1237 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1238 if (errCode != E_OK) {
1239 LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1240 }
1241 return errCode;
1242 }
1243
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1244 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1245 const TableSchema &tableSchema, OpType opType)
1246 {
1247 return UpdateLogRecord(vBucket, tableSchema, opType);
1248 }
1249
DeleteTableTrigger(const std::string & missTable) const1250 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1251 {
1252 static const char *triggerEndName[] = {
1253 "_ON_INSERT",
1254 "_ON_UPDATE",
1255 "_ON_DELETE"
1256 };
1257 std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1258 for (const auto &endName : triggerEndName) {
1259 std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1260 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1261 if (errCode != E_OK) {
1262 LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1263 return errCode;
1264 }
1265 }
1266 return E_OK;
1267 }
1268
SetLogicDelete(bool isLogicDelete)1269 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1270 {
1271 isLogicDelete_ = isLogicDelete;
1272 }
1273
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1274 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1275 const std::string &status, const Key &hashKey)
1276 {
1277 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1278 sqlite3_stmt *stmt = nullptr;
1279 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1280 if (errCode != E_OK) {
1281 LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1282 return errCode;
1283 }
1284 int ret = E_OK;
1285 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1286 if (errCode != E_OK) {
1287 LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1288 SQLiteUtils::ResetStatement(stmt, true, ret);
1289 return errCode;
1290 }
1291 errCode = SQLiteUtils::StepWithRetry(stmt);
1292 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1293 errCode = E_OK;
1294 } else {
1295 LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1296 }
1297 SQLiteUtils::ResetStatement(stmt, true, ret);
1298 return errCode == E_OK ? ret : errCode;
1299 }
1300
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1301 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1302 {
1303 maxUploadCount_ = maxUploadCount;
1304 maxUploadSize_ = maxUploadSize;
1305 }
1306
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1307 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1308 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1309 {
1310 LOGD("[RDBExecutor] logic delete skip delete data");
1311 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1312 if (errCode != E_OK) {
1313 return errCode;
1314 }
1315 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1316 if (errCode != E_OK) {
1317 return errCode;
1318 }
1319 if (!trackerTable.IsEmpty()) {
1320 return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1321 }
1322 return E_OK;
1323 }
1324
InitCursorToMeta(const std::string & tableName) const1325 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName) const
1326 {
1327 return SQLiteRelationalUtils::InitCursorToMeta(dbHandle_, isMemDb_, tableName);
1328 }
1329
SetTableSchema(const TableSchema & tableSchema)1330 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1331 {
1332 tableSchema_ = tableSchema;
1333 }
1334
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)1335 int SQLiteSingleVerRelationalStorageExecutor::ReviseLocalModTime(const std::string &tableName,
1336 const std::vector<ReviseModTimeInfo> &revisedData)
1337 {
1338 sqlite3_stmt *stmt = nullptr;
1339 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1340 " SET timestamp=? where hash_key=? AND timestamp=?";
1341 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1342 if (errCode != E_OK) {
1343 LOGE("[RDBExecutor][ReviseLocalModTime] Get stmt failed: %d", errCode);
1344 return errCode;
1345 }
1346 ResFinalizer finalizer([stmt]() {
1347 sqlite3_stmt *statement = stmt;
1348 int ret = E_OK;
1349 SQLiteUtils::ResetStatement(statement, true, ret);
1350 if (ret != E_OK) {
1351 LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed %d", ret);
1352 }
1353 });
1354 for (auto &data : revisedData) {
1355 int resetCode = E_OK;
1356 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, data.curTime); // 1st bind modify time
1357 if (errCode != E_OK) {
1358 LOGE("[RDBExecutor][ReviseLocalModTime] Bind revised modify time failed: %d", errCode);
1359 return errCode;
1360 }
1361 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, data.hashKey); // 2nd bind hash key
1362 if (errCode != E_OK) {
1363 LOGE("[RDBExecutor][ReviseLocalModTime] Bind hash key failed: %d", errCode);
1364 return errCode;
1365 }
1366 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 3, data.invalidTime); // 3rd bind modify time
1367 if (errCode != E_OK) {
1368 LOGE("[RDBExecutor][ReviseLocalModTime] Bind modify time failed: %d", errCode);
1369 return errCode;
1370 }
1371 errCode = SQLiteUtils::StepWithRetry(stmt);
1372 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1373 LOGE("[RDBExecutor][ReviseLocalModTime] Revise failed: %d", errCode);
1374 return errCode;
1375 }
1376 LOGI("[RDBExecutor][ReviseLocalModTime] Local data mod time revised from %lld to %lld",
1377 data.invalidTime, data.curTime);
1378 SQLiteUtils::ResetStatement(stmt, false, resetCode);
1379 if (resetCode != E_OK) {
1380 LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed: %d", resetCode);
1381 return resetCode;
1382 }
1383 }
1384 return E_OK;
1385 }
1386
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo,bool & isNotIncCursor)1387 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1388 const VBucket &vBucket, const Field &field, VBucket &assetInfo, bool &isNotIncCursor)
1389 {
1390 if (field.type == TYPE_INDEX<Asset>) {
1391 Asset asset;
1392 UpdateLocalAssetId(vBucket, field.colName, asset);
1393 Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1394 if (assetDBPtr == nullptr) {
1395 isNotIncCursor = true;
1396 return true;
1397 }
1398 const Asset &assetDB = *assetDBPtr;
1399 if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1400 isNotIncCursor = true;
1401 return true;
1402 }
1403 }
1404 if (field.type == TYPE_INDEX<Assets>) {
1405 Assets assets;
1406 UpdateLocalAssetsId(vBucket, field.colName, assets);
1407 Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1408 if (assetsDBPtr == nullptr) {
1409 isNotIncCursor = true;
1410 return true;
1411 }
1412 Assets &assetsDB = *assetsDBPtr;
1413 if (assets.size() != assetsDB.size()) {
1414 isNotIncCursor = true;
1415 return true;
1416 }
1417 for (uint32_t i = 0; i < assets.size(); ++i) {
1418 if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1419 isNotIncCursor = true;
1420 return true;
1421 }
1422 }
1423 }
1424 return false;
1425 }
1426
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket,bool & isNotIncCursor)1427 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1428 const VBucket &vBucket, bool &isNotIncCursor)
1429 {
1430 std::vector<Field> assetFields;
1431 for (const auto &field : tableSchema.fields) {
1432 if (field.type == TYPE_INDEX<Asset>) {
1433 assetFields.push_back(field);
1434 }
1435 if (field.type == TYPE_INDEX<Assets>) {
1436 assetFields.push_back(field);
1437 }
1438 }
1439 if (assetFields.empty()) {
1440 return false;
1441 }
1442 sqlite3_stmt *selectStmt = nullptr;
1443 std::string queryAssetsSql = "SELECT ";
1444 for (const auto &field : assetFields) {
1445 queryAssetsSql += field.colName + ",";
1446 }
1447 queryAssetsSql.pop_back();
1448 queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1449 std::to_string(dataKey) + ";";
1450 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1451 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1452 LOGE("Get select assets statement failed, %d.", errCode);
1453 return true;
1454 }
1455 ResFinalizer finalizer([selectStmt]() {
1456 sqlite3_stmt *statementInner = selectStmt;
1457 int ret = E_OK;
1458 SQLiteUtils::ResetStatement(statementInner, true, ret);
1459 if (ret != E_OK) {
1460 LOGW("Reset stmt failed %d when get asset", ret);
1461 }
1462 });
1463 VBucket assetInfo;
1464 errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1465 if (errCode != E_OK) {
1466 return true;
1467 }
1468 return std::any_of(assetFields.begin(), assetFields.end(), [&](const Field &field) {
1469 return IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo, isNotIncCursor);
1470 });
1471 }
1472
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1473 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1474 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1475 {
1476 if (downloadData.data.size() != downloadData.opType.size()) {
1477 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1478 return -E_CLOUD_ERROR;
1479 }
1480 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1481 " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1482 sqlite3_stmt *stmt = nullptr;
1483 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1484 if (errCode != E_OK) {
1485 LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1486 return errCode;
1487 }
1488 int ret = E_OK;
1489 int index = 0;
1490 for (const auto &data: downloadData.data) {
1491 SQLiteUtils::ResetStatement(stmt, false, ret);
1492 OpType opType = downloadData.opType[index++];
1493 if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1494 continue;
1495 }
1496 errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1497 if (errCode != E_OK) {
1498 break;
1499 }
1500 }
1501 SQLiteUtils::ResetStatement(stmt, true, ret);
1502 return errCode == E_OK ? ret : errCode;
1503 }
1504
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1505 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
1506 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1507 {
1508 if (downloadData.data.empty()) {
1509 return E_OK;
1510 }
1511 if (downloadData.data.size() != downloadData.opType.size()) {
1512 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1513 return -E_CLOUD_ERROR;
1514 }
1515 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag|0x1000 WHERE cloud_gid=?;";
1516 sqlite3_stmt *stmt = nullptr;
1517 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1518 if (errCode != E_OK) {
1519 LOGE("[Storage Executor]Get mark flag as asset async download stmt failed, %d.", errCode);
1520 return errCode;
1521 }
1522 int ret = E_OK;
1523 for (const auto &gid : gidFilters) {
1524 SQLiteUtils::ResetStatement(stmt, false, ret);
1525 if (ret != E_OK) {
1526 LOGE("[Storage Executor]Reset stmt failed:%d", ret);
1527 break;
1528 }
1529 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1530 if (errCode != E_OK) {
1531 break;
1532 }
1533 errCode = SQLiteUtils::StepWithRetry(stmt);
1534 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1535 errCode = E_OK;
1536 } else {
1537 LOGW("[Storage Executor]Step mark flag as asset async download stmt failed %d gid %s",
1538 errCode, gid.c_str());
1539 }
1540 }
1541 SQLiteUtils::ResetStatement(stmt, true, ret);
1542 return errCode == E_OK ? ret : errCode;
1543 }
1544
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1545 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1546 const CloudSyncBatch &batchData)
1547 {
1548 if (batchData.extend.empty()) {
1549 return E_OK;
1550 }
1551 if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1552 LOGE("invalid sync data for filling version.");
1553 return -E_INVALID_ARGS;
1554 }
1555 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1556 "' SET version = ? WHERE hash_key = ? ";
1557 sqlite3_stmt *stmt = nullptr;
1558 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1559 if (errCode != E_OK) {
1560 return errCode;
1561 }
1562 int ret = E_OK;
1563 for (size_t i = 0; i < batchData.extend.size(); ++i) {
1564 errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1565 if (errCode != E_OK) {
1566 LOGE("bind update version stmt failed.");
1567 SQLiteUtils::ResetStatement(stmt, true, ret);
1568 return errCode;
1569 }
1570 }
1571 SQLiteUtils::ResetStatement(stmt, true, ret);
1572 return ret;
1573 }
1574
QueryCount(const std::string & tableName,int64_t & count)1575 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1576 {
1577 return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1578 }
1579
CheckInventoryData(const std::string & tableName)1580 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1581 {
1582 int64_t dataCount = 0;
1583 int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1584 if (errCode != E_OK) {
1585 LOGE("Query count failed %d", errCode);
1586 return errCode;
1587 }
1588 return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1589 }
1590
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1591 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp ×tamp,
1592 SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1593 {
1594 sqlite3_stmt *stmt = nullptr;
1595 int errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, stmt);
1596 if (errCode != E_OK) {
1597 LOGE("failed to get count statement %d", errCode);
1598 return errCode;
1599 }
1600 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1601 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1602 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1603 errCode = E_OK;
1604 } else {
1605 LOGE("Failed to get the count to be uploaded. %d", errCode);
1606 }
1607 int ret = E_OK;
1608 SQLiteUtils::ResetStatement(stmt, true, ret);
1609 return errCode != E_OK ? errCode : ret;
1610 }
1611
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1612 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush,
1613 bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1614 {
1615 int errCode;
1616 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1617 if (errCode != E_OK) {
1618 return errCode;
1619 }
1620 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1621 CloudWaterType::DELETE);
1622 return GetUploadCountInner(timestamp, helper, sql, count);
1623 }
1624
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1625 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> ×tampVec,
1626 bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1627 {
1628 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1629 if (timestampVec.size() != typeVec.size()) {
1630 return -E_INVALID_ARGS;
1631 }
1632 int errCode;
1633 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1634 if (errCode != E_OK) {
1635 return errCode;
1636 }
1637 count = 0;
1638 for (size_t i = 0; i < typeVec.size(); i++) {
1639 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1640 int64_t tempCount = 0;
1641 helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1642 errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1643 if (errCode != E_OK) {
1644 return errCode;
1645 }
1646 count += tempCount;
1647 }
1648 return E_OK;
1649 }
1650
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1651 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1652 bool ignoreEmptyGid)
1653 {
1654 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1655 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1656 return -E_INVALID_ARGS;
1657 }
1658 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1659 + "' SET cloud_gid = ? WHERE data_key = ? ";
1660 sqlite3_stmt *stmt = nullptr;
1661 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1662 if (errCode != E_OK) {
1663 return errCode;
1664 }
1665 errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1666 int resetCode = E_OK;
1667 SQLiteUtils::ResetStatement(stmt, true, resetCode);
1668 return errCode == E_OK ? resetCode : errCode;
1669 }
1670
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1671 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1672 CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1673 {
1674 token.GetCloudTableSchema(tableSchema_);
1675 sqlite3_stmt *queryStmt = nullptr;
1676 bool isStepNext = false;
1677 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1678 if (errCode != E_OK) {
1679 (void)token.ReleaseCloudStatement();
1680 return errCode;
1681 }
1682 uint32_t totalSize = 0;
1683 uint32_t stepNum = -1;
1684 do {
1685 if (isStepNext) {
1686 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1687 if (errCode != E_OK) {
1688 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1689 break;
1690 }
1691 }
1692 isStepNext = true;
1693 errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1694 } while (errCode == E_OK);
1695 if (errCode != -E_UNFINISHED) {
1696 (void)token.ReleaseCloudStatement();
1697 }
1698 return errCode;
1699 }
1700
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1701 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1702 {
1703 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1704 Asset asset;
1705 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1706 if (errCode != E_OK) {
1707 return errCode;
1708 }
1709 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1710 return -E_CLOUD_ERROR;
1711 }
1712 vBucket.insert_or_assign(field.colName, asset);
1713 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1714 Assets assets;
1715 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1716 if (errCode != E_OK) {
1717 return errCode;
1718 }
1719 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1720 return -E_CLOUD_ERROR;
1721 }
1722 vBucket.insert_or_assign(field.colName, assets);
1723 } else {
1724 vBucket.insert_or_assign(field.colName, cloudValue);
1725 }
1726 return E_OK;
1727 }
1728
GetDownloadAsset(std::vector<VBucket> & assetsV,const Field & field,Type & cloudValue)1729 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAsset(std::vector<VBucket> &assetsV, const Field &field,
1730 Type &cloudValue)
1731 {
1732 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1733 Asset asset;
1734 VBucket bucket;
1735 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1736 if (errCode != E_OK) {
1737 return errCode;
1738 }
1739 if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1740 bucket.insert_or_assign(field.colName, asset);
1741 assetsV.push_back(bucket);
1742 }
1743 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1744 Assets assets;
1745 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1746 if (errCode != E_OK) {
1747 return errCode;
1748 }
1749 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1750 return E_OK;
1751 }
1752 for (const auto &asset : assets) {
1753 if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1754 VBucket bucket;
1755 bucket.insert_or_assign(field.colName, asset);
1756 assetsV.push_back(bucket);
1757 }
1758 }
1759 }
1760 return E_OK;
1761 }
1762
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1763 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1764 const std::vector<Field> &assetFields, VBucket &assetInfo)
1765 {
1766 int errCode = SQLiteUtils::StepWithRetry(stmt);
1767 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1768 int index = 0;
1769 for (const auto &field : assetFields) {
1770 Type cloudValue;
1771 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1772 if (errCode != E_OK) {
1773 break;
1774 }
1775 errCode = PutVBucketByType(assetInfo, field, cloudValue);
1776 if (errCode != E_OK) {
1777 break;
1778 }
1779 }
1780 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1781 errCode = E_OK;
1782 } else {
1783 LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1784 }
1785 return errCode;
1786 }
1787
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)1788 int SQLiteSingleVerRelationalStorageExecutor::GetLocalDataCount(const std::string &tableName, int &dataCount,
1789 int &logicDeleteDataCount)
1790 {
1791 std::string dataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key != -1";
1792 int errCode = SQLiteUtils::GetCountBySql(dbHandle_, dataCountSql, dataCount);
1793 if (errCode != E_OK) {
1794 LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
1795 return errCode;
1796 }
1797
1798 std::string logicDeleteDataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) +
1799 " where flag&0x08!=0 and data_key != -1";
1800 errCode = SQLiteUtils::GetCountBySql(dbHandle_, logicDeleteDataCountSql, logicDeleteDataCount);
1801 if (errCode != E_OK) {
1802 LOGE("[RDBExecutor] Query local logic delete data count failed: %d", errCode);
1803 }
1804 return errCode;
1805 }
1806
UpdateExtendField(const std::string & tableName,const std::set<std::string> & extendColNames,const std::string & condition)1807 int SQLiteSingleVerRelationalStorageExecutor::UpdateExtendField(const std::string &tableName,
1808 const std::set<std::string> &extendColNames, const std::string &condition)
1809 {
1810 bool isLogTableExist = false;
1811 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
1812 if (errCode == E_OK && !isLogTableExist) {
1813 LOGW("[RDBExecutor][UpdateExtendField] Log table of [%s [%zu]] not found!",
1814 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1815 return E_OK;
1816 }
1817 std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " as log set extend_field = json_object(";
1818 for (const auto &extendColName : extendColNames) {
1819 sql += "'" + extendColName + "',data." + extendColName + ",";
1820 }
1821 sql.pop_back();
1822 sql += ") from " + tableName + " as data where log.data_key = data." + std::string(DBConstant::SQLITE_INNER_ROWID);
1823 sql += condition;
1824 sqlite3_stmt *stmt = nullptr;
1825 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1826 if (errCode != E_OK) {
1827 return errCode;
1828 }
1829
1830 ResFinalizer finalizer([stmt]() {
1831 sqlite3_stmt *statement = stmt;
1832 int ret = E_OK;
1833 SQLiteUtils::ResetStatement(statement, true, ret);
1834 if (ret != E_OK) {
1835 LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1836 }
1837 });
1838 errCode = SQLiteUtils::StepWithRetry(stmt);
1839 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1840 LOGE("[RDBExecutor][UpdateExtendField] Update [%s [%zu]] extend field failed: %d",
1841 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1842 return errCode;
1843 }
1844 return E_OK;
1845 }
1846
BuildJsonExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,sqlite3 * db)1847 int BuildJsonExtendField(const std::string &tableName, const std::string &lowVersionExtendColName, sqlite3 *db)
1848 {
1849 std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " set extend_field = json_object('" +
1850 lowVersionExtendColName + "',extend_field) where data_key = -1 and (json_valid(extend_field) = 0 or " +
1851 "json_extract(extend_field, '$." + lowVersionExtendColName +"') is null)";
1852 sqlite3_stmt *stmt = nullptr;
1853 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1854 if (errCode != E_OK) {
1855 return errCode;
1856 }
1857 errCode = SQLiteUtils::StepWithRetry(stmt);
1858 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1859 LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update [%s [%zu]] extend field non-JSON format failed: %d",
1860 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1861 } else {
1862 errCode = E_OK;
1863 }
1864 int ret = E_OK;
1865 SQLiteUtils::ResetStatement(stmt, true, ret);
1866 if (ret != E_OK) {
1867 LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1868 }
1869 return errCode;
1870 }
1871
GetUpdateExtendFieldSql(const std::string & tableName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1872 std::string GetUpdateExtendFieldSql(const std::string &tableName, const std::set<std::string> &oldExtendColNames,
1873 const std::set<std::string> &extendColNames)
1874 {
1875 std::string sql = "update " + DBCommon::GetLogTableName(tableName) +
1876 " set extend_field = json_insert(extend_field,";
1877 bool isContainNewCol = false;
1878 for (const auto &extendColName : extendColNames) {
1879 if (oldExtendColNames.find(extendColName) != oldExtendColNames.end()) {
1880 continue;
1881 }
1882 isContainNewCol = true;
1883 sql += "'$." + extendColName + "',null,";
1884 }
1885 if (!isContainNewCol) {
1886 return "";
1887 }
1888 sql.pop_back();
1889 sql += ") where data_key = -1 and json_valid(extend_field) = 1;";
1890 return sql;
1891 }
1892
UpdateDeleteDataExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1893 int SQLiteSingleVerRelationalStorageExecutor::UpdateDeleteDataExtendField(const std::string &tableName,
1894 const std::string &lowVersionExtendColName, const std::set<std::string> &oldExtendColNames,
1895 const std::set<std::string> &extendColNames)
1896 {
1897 bool isLogTableExist = false;
1898 if (SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist) == E_OK &&
1899 !isLogTableExist) {
1900 LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Log table of [%s [%zu]] not found!",
1901 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1902 return E_OK;
1903 }
1904 int errCode = E_OK;
1905 if (!lowVersionExtendColName.empty()) {
1906 errCode = BuildJsonExtendField(tableName, lowVersionExtendColName, dbHandle_);
1907 if (errCode != E_OK) {
1908 LOGE("[UpdateDeleteDataExtendField] Update low version extend field of [%s [%zu]] to json failed: %d",
1909 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1910 return errCode;
1911 }
1912 }
1913 std::string sql = GetUpdateExtendFieldSql(tableName, oldExtendColNames, extendColNames);
1914 if (sql.empty()) {
1915 return E_OK;
1916 }
1917
1918 sqlite3_stmt *stmt = nullptr;
1919 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1920 if (errCode != E_OK) {
1921 return errCode;
1922 }
1923 errCode = SQLiteUtils::StepWithRetry(stmt);
1924 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1925 LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update extend field of [%s [%zu]] failed: %d",
1926 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1927 } else {
1928 errCode = E_OK;
1929 }
1930 int ret = E_OK;
1931 SQLiteUtils::ResetStatement(stmt, true, ret);
1932 if (ret != E_OK) {
1933 LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Reset stmt failed %d", ret);
1934 }
1935 return errCode;
1936 }
1937
GetDownloadAssetGid(const TableSchema & tableSchema,std::vector<std::string> & gids,int64_t beginTime,bool abortWithLimit)1938 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetGid(const TableSchema &tableSchema,
1939 std::vector<std::string> &gids, int64_t beginTime, bool abortWithLimit)
1940 {
1941 std::string sql = "SELECT cloud_gid FROM " + DBCommon::GetLogTableName(tableSchema.name) +
1942 " WHERE flag&0x1000=0x1000 AND timestamp > ?;";
1943 sqlite3_stmt *stmt = nullptr;
1944 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1945 if (errCode != E_OK) {
1946 LOGE("[RDBExecutor]Get gid statement failed, %d", errCode);
1947 return errCode;
1948 }
1949 int ret = E_OK;
1950 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, beginTime);
1951 if (errCode != E_OK) {
1952 LOGE("[RDBExecutor] bind time failed %d when get download asset gid", errCode);
1953 SQLiteUtils::ResetStatement(stmt, true, ret);
1954 return errCode;
1955 }
1956 uint32_t count = 0;
1957 do {
1958 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1959 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1960 errCode = E_OK;
1961 break;
1962 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1963 LOGE("[RDBExecutor]Get downloading assets gid failed. %d", errCode);
1964 break;
1965 }
1966 std::string gid;
1967 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1968 if (errCode != E_OK) {
1969 LOGW("[RDBExecutor]Get downloading assets gid failed %d when get col", errCode);
1970 continue;
1971 }
1972 gids.push_back(gid);
1973 if (AbortGetDownloadAssetGidIfNeed(tableSchema, gid, abortWithLimit, count)) {
1974 break;
1975 }
1976 } while (errCode == E_OK);
1977 SQLiteUtils::ResetStatement(stmt, true, ret);
1978 return errCode == E_OK ? ret : errCode;
1979 }
1980
GetDownloadAssetRecordsByGid(const TableSchema & tableSchema,const std::string gid,std::vector<VBucket> & assets)1981 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsByGid(const TableSchema &tableSchema,
1982 const std::string gid, std::vector<VBucket> &assets)
1983 {
1984 std::vector<Field> assetFields;
1985 std::string sql = "SELECT";
1986 for (const auto &field: tableSchema.fields) {
1987 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1988 assetFields.emplace_back(field);
1989 sql += " b." + field.colName + ",";
1990 }
1991 }
1992 if (assetFields.empty()) {
1993 return E_OK;
1994 }
1995 sql.pop_back(); // remove last ,
1996 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE a.cloud_gid = ?;";
1997 sqlite3_stmt *stmt = nullptr;
1998 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1999 if (errCode != E_OK) {
2000 LOGE("Get downloading asset records statement failed, %d", errCode);
2001 return errCode;
2002 }
2003 int ret = E_OK;
2004 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
2005 if (errCode != E_OK) {
2006 SQLiteUtils::ResetStatement(stmt, true, ret);
2007 return errCode;
2008 }
2009 errCode = SQLiteUtils::StepWithRetry(stmt);
2010 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2011 int index = 0;
2012 for (const auto &field: assetFields) {
2013 Type value;
2014 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, value);
2015 if (errCode != E_OK) {
2016 break;
2017 }
2018 errCode = GetDownloadAsset(assets, field, value);
2019 if (errCode != E_OK) {
2020 break;
2021 }
2022 }
2023 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2024 errCode = E_OK;
2025 } else {
2026 LOGE("step get downloading asset records statement failed %d.", errCode);
2027 }
2028 SQLiteUtils::ResetStatement(stmt, true, ret);
2029 return errCode == E_OK ? ret : errCode;
2030 }
2031
GetDownloadingCount(const std::string & tableName,int32_t & count)2032 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingCount(const std::string &tableName, int32_t &count)
2033 {
2034 std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName) + " WHERE flag&0x1000=0x1000;";
2035 int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
2036 if (errCode != E_OK) {
2037 LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
2038 }
2039 return errCode;
2040 }
2041
GetDownloadingAssetsCount(const TableSchema & tableSchema,int32_t & totalCount)2042 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingAssetsCount(
2043 const TableSchema &tableSchema, int32_t &totalCount)
2044 {
2045 std::vector<std::string> gids;
2046 int errCode = GetDownloadAssetGid(tableSchema, gids);
2047 if (errCode != E_OK) {
2048 LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2049 return errCode;
2050 }
2051 for (const auto &gid : gids) {
2052 std::vector<VBucket> assets;
2053 errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2054 if (errCode != E_OK) {
2055 LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
2056 return errCode;
2057 }
2058 totalCount += static_cast<int32_t>(assets.size());
2059 }
2060 return E_OK;
2061 }
2062
GetDownloadAssetRecordsInner(const TableSchema & tableSchema,int64_t beginTime,std::vector<std::string> & gids)2063 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsInner(
2064 const TableSchema &tableSchema, int64_t beginTime, std::vector<std::string> &gids)
2065 {
2066 int errCode = GetDownloadAssetGid(tableSchema, gids, beginTime, true);
2067 if (errCode != E_OK) {
2068 LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2069 }
2070 return errCode;
2071 }
2072
CleanDownloadingFlag(const std::string & tableName)2073 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlag(const std::string &tableName)
2074 {
2075 std::string sql;
2076 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000);";
2077 sqlite3_stmt *stmt = nullptr;
2078 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2079 if (errCode != E_OK) {
2080 LOGE("[RDBExecutor]Get stmt failed clean downloading flag: %d, tableName: %s, length: %zu",
2081 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2082 return errCode;
2083 }
2084 errCode = SQLiteUtils::StepWithRetry(stmt);
2085 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2086 errCode = E_OK;
2087 } else {
2088 LOGE("[RDBExecutor]Clean downloading flag failed: %d, tableName: %s, length: %zu",
2089 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2090 }
2091 int ret = E_OK;
2092 SQLiteUtils::ResetStatement(stmt, true, ret);
2093 if (ret != E_OK) {
2094 LOGE("[RDBExecutor]Reset stmt failed clean downloading flag: %d", ret);
2095 }
2096 return errCode != E_OK ? errCode : ret;
2097 }
2098
AbortGetDownloadAssetGidIfNeed(const DistributedDB::TableSchema & tableSchema,const std::string & gid,bool abortWithLimit,uint32_t & count)2099 bool SQLiteSingleVerRelationalStorageExecutor::AbortGetDownloadAssetGidIfNeed(
2100 const DistributedDB::TableSchema &tableSchema, const std::string &gid, bool abortWithLimit,
2101 uint32_t &count)
2102 {
2103 if (!abortWithLimit) {
2104 return false;
2105 }
2106 std::vector<VBucket> assets;
2107 int errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2108 if (errCode != E_OK) {
2109 LOGW("[RDBExecutor]Get downloading assets failed %d gid %s", errCode, gid.c_str());
2110 return false;
2111 }
2112 count += assets.size();
2113 return count >= RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetMaxDownloadAssetsCount();
2114 }
2115 } // namespace DistributedDB
2116 #endif
2117