1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37 std::string querySql;
38 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39 std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40 int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41 if (errCode != E_OK) {
42 LOGE("Get query log sql fail, %d", errCode);
43 return errCode;
44 }
45 if (!pkSet.empty()) {
46 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47 if (errCode != E_OK) {
48 LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49 return errCode;
50 }
51 }
52 sqlite3_stmt *selectStmt = nullptr;
53 errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54 if (errCode != E_OK) {
55 LOGE("Get query log statement fail, %d", errCode);
56 return errCode;
57 }
58
59 bool alreadyFound = false;
60 do {
61 errCode = SQLiteUtils::StepWithRetry(selectStmt);
62 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63 if (alreadyFound) {
64 LOGE("found more than one records in log table for one primary key or gid.");
65 errCode = -E_CLOUD_ERROR;
66 break;
67 }
68 alreadyFound = true;
69 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70 errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71 if (errCode != E_OK) {
72 LOGE("Get info by statement fail, %d", errCode);
73 break;
74 }
75 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76 errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77 break;
78 } else {
79 LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80 break;
81 }
82 } while (errCode == E_OK);
83
84 int ret = E_OK;
85 SQLiteUtils::ResetStatement(selectStmt, true, ret);
86 return errCode != E_OK ? errCode : ret;
87 }
88
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91 int index = 0;
92 logInfo.dataKey = sqlite3_column_int64(statement, index++);
93 std::vector<uint8_t> device;
94 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device); // 1 is device
95 DBCommon::VectorToString(device, logInfo.device);
96 std::vector<uint8_t> originDev;
97 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98 DBCommon::VectorToString(originDev, logInfo.originDev);
99 logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100 logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105 logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107 return index;
108 }
109
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111 const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112 VBucket &assetInfo)
113 {
114 int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115 int errCode = E_OK;
116 for (const auto &field: assetFields) {
117 Type cloudValue;
118 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119 if (errCode != E_OK) {
120 break;
121 }
122 errCode = PutVBucketByType(assetInfo, field, cloudValue);
123 if (errCode != E_OK) {
124 break;
125 }
126 }
127 if (errCode != E_OK) {
128 LOGE("set asset field failed, errCode = %d", errCode);
129 return errCode;
130 }
131
132 // fill primary key
133 for (const auto &item : pkMap) {
134 Type cloudValue;
135 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136 if (errCode != E_OK) {
137 break;
138 }
139 errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140 if (errCode != E_OK) {
141 break;
142 }
143 }
144 return errCode;
145 }
146
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149 std::string sql = "insert into " + tableSchema.name + "(";
150 for (const auto &field : tableSchema.fields) {
151 sql += field.colName + ",";
152 }
153 sql.pop_back();
154 sql += ") values(";
155 for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156 sql += "?,";
157 }
158 sql.pop_back();
159 sql += ");";
160 return sql;
161 }
162
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164 const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166 int errCode = E_OK;
167 TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168 // table name in cloud schema is in lower case
169 if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170 LOGE("localSchema doesn't contain table from cloud");
171 return -E_INTERNAL_ERROR;
172 }
173
174 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175 if (pkMap.size() == 0) {
176 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177 std::vector<uint8_t> value;
178 DBCommon::StringToVector(std::to_string(rowid), value);
179 errCode = DBCommon::CalcValueHash(value, hashValue);
180 } else {
181 std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182 tableSchema, localTable, pkMap, allowEmpty);
183 }
184 return errCode;
185 }
186
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188 const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190 int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191 if (errCode != E_OK) {
192 LOGE("Get select log statement failed, %d", errCode);
193 return errCode;
194 }
195
196 std::string cloudGid;
197 int ret = E_OK;
198 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200 SQLiteUtils::ResetStatement(selectStmt, true, ret);
201 LOGE("Get cloud gid fail when bind query log statement.");
202 return errCode;
203 }
204
205 int index = 0;
206 if (!cloudGid.empty()) {
207 index++;
208 errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209 if (errCode != E_OK) {
210 LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212 return errCode;
213 }
214 }
215
216 index++;
217 errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218 if (errCode != E_OK) {
219 LOGE("Bind hash key to query log statement failed. %d", errCode);
220 SQLiteUtils::ResetStatement(selectStmt, true, ret);
221 }
222 return errCode != E_OK ? errCode : ret;
223 }
224
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226 const std::set<std::string> &pkSet, std::string &querySql)
227 {
228 std::string cloudGid;
229 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230 if (errCode != E_OK) {
231 LOGE("Get cloud gid fail when query log table.");
232 return errCode;
233 }
234
235 if (pkSet.empty() && cloudGid.empty()) {
236 LOGE("query log table failed because of both primary key and gid are empty.");
237 return -E_CLOUD_ERROR;
238 }
239 std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240 " sharing_resource, status, version FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
241 "_log WHERE ";
242 if (!cloudGid.empty()) {
243 sql += "cloud_gid = ? OR ";
244 }
245 sql += "hash_key = ?";
246
247 querySql = sql;
248 return E_OK;
249 }
250
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)251 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
252 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
253 std::map<int, int> &statisticMap)
254 {
255 int index = 0;
256 int errCode = E_OK;
257 for (OpType op : downloadData.opType) {
258 VBucket &vBucket = downloadData.data[index];
259 switch (op) {
260 case OpType::INSERT:
261 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
262 break;
263 case OpType::UPDATE:
264 errCode = UpdateCloudData(vBucket, tableSchema);
265 break;
266 case OpType::DELETE:
267 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
268 break;
269 case OpType::ONLY_UPDATE_GID:
270 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
271 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
272 case OpType::UPDATE_TIMESTAMP:
273 case OpType::CLEAR_GID:
274 case OpType::LOCKED_NOT_HANDLE:
275 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
276 [[fallthrough]];
277 case OpType::NOT_HANDLE:
278 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
279 GetLocalDataKey(index, downloadData), op) : errCode;
280 break;
281 default:
282 errCode = -E_CLOUD_ERROR;
283 break;
284 }
285 if (errCode != E_OK) {
286 LOGE("put cloud sync data fail: %d", errCode);
287 return errCode;
288 }
289 statisticMap[static_cast<int>(op)]++;
290 index++;
291 }
292 return errCode;
293 }
294
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)295 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
296 const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
297 std::vector<Asset> &assets, std::vector<std::string> ¬ifyTableList)
298 {
299 int errCode = SetLogTriggerStatus(false);
300 if (errCode != E_OK) {
301 LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
302 return errCode;
303 }
304 if (mode == FLAG_ONLY) {
305 errCode = DoCleanLogs(tableNameList, localSchema);
306 if (errCode != E_OK) {
307 LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
308 return errCode;
309 }
310 notifyTableList = tableNameList;
311 } else if (mode == FLAG_AND_DATA) {
312 errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
313 if (errCode != E_OK) {
314 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
315 return errCode;
316 }
317 notifyTableList = tableNameList;
318 } else if (mode == CLEAR_SHARED_TABLE) {
319 errCode = DoCleanShareTableDataAndLog(tableNameList);
320 if (errCode != E_OK) {
321 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
322 return errCode;
323 }
324 notifyTableList = tableNameList;
325 }
326 for (const auto &tableName: tableNameList) {
327 errCode = CleanDownloadingFlag(tableName);
328 if (errCode != E_OK) {
329 LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu",
330 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
331 return errCode;
332 }
333 }
334 errCode = SetLogTriggerStatus(true);
335 if (errCode != E_OK) {
336 LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
337 }
338
339 return errCode;
340 }
341
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)342 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
343 const RelationalSchemaObject &localSchema)
344 {
345 int errCode = E_OK;
346 int i = 1;
347 for (const auto &tableName: tableNameList) {
348 std::string logTableName = DBCommon::GetLogTableName(tableName);
349 LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
350 errCode = DoCleanAssetId(tableName, localSchema);
351 if (errCode != E_OK) {
352 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
353 return errCode;
354 }
355 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
356 if (errCode != E_OK) {
357 LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
358 return errCode;
359 }
360 i++;
361 }
362
363 return errCode;
364 }
365
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)366 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
367 {
368 if (ctx == nullptr || argc != 0 || argv == nullptr) {
369 LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
370 return;
371 }
372 auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
373 if (context == nullptr) {
374 LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
375 return;
376 }
377 context->cursor++;
378 sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
379 }
380
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const381 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
382 void(*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
383 {
384 std::string sql = "update_cursor";
385 int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
386 &context, updateCursor, nullptr, nullptr, nullptr);
387 if (errCode != SQLITE_OK) {
388 LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
389 return SQLiteUtils::MapSQLiteErrno(errCode);
390 }
391 return E_OK;
392 }
393
GetCursor(const std::string & tableName,uint64_t & cursor)394 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName, uint64_t &cursor)
395 {
396 return SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, cursor);
397 }
398
SetCursor(const std::string & tableName,uint64_t cursor)399 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, uint64_t cursor)
400 {
401 std::string sql = "UPDATE " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata SET VALUE = ? where KEY = ?;";
402 sqlite3_stmt *stmt = nullptr;
403 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
404 if (errCode != E_OK) {
405 LOGE("Set cursor sql failed=%d", errCode);
406 return errCode;
407 }
408 ResFinalizer finalizer([stmt]() {
409 sqlite3_stmt *statement = stmt;
410 int ret = E_OK;
411 SQLiteUtils::ResetStatement(statement, true, ret);
412 if (ret != E_OK) {
413 LOGW("Reset stmt failed %d when set cursor", ret);
414 }
415 });
416 int index = 1;
417 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
418 if (errCode != E_OK) {
419 LOGE("Bind saved cursor failed:%d", errCode);
420 return errCode;
421 }
422 Key key;
423 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
424 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
425 if (errCode != E_OK) {
426 return errCode;
427 }
428 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
429 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
430 errCode = E_OK;
431 }
432 return errCode;
433 }
434
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)435 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
436 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
437 {
438 int errCode = E_OK;
439 for (size_t i = 0; i < tableNameList.size(); i++) {
440 std::string tableName = tableNameList[i];
441 std::string logTableName = DBCommon::GetLogTableName(tableName);
442 std::vector<int64_t> dataKeys;
443 errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
444 if (errCode != E_OK) {
445 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
446 return errCode;
447 }
448
449 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
450 errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
451 if (errCode != E_OK) {
452 LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
453 return errCode;
454 }
455 if (isLogicDelete_) {
456 errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
457 } else {
458 errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
459 }
460 if (errCode != E_OK) {
461 LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
462 return errCode;
463 }
464 }
465 return errCode;
466 }
467
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)468 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
469 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
470 {
471 int errCode = E_OK;
472 int ret = E_OK;
473 sqlite3_stmt *selectStmt = nullptr;
474 for (const auto &rowId : dataKeys) {
475 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
476 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
477 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
478 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
479 LOGE("Get select asset statement failed, %d", errCode);
480 return errCode;
481 }
482 errCode = SQLiteUtils::StepWithRetry(selectStmt);
483 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
484 std::vector<uint8_t> blobValue;
485 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
486 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
487 LOGE("Get column blob value failed, %d", errCode);
488 goto END;
489 }
490 if (blobValue.empty()) {
491 SQLiteUtils::ResetStatement(selectStmt, true, ret);
492 continue;
493 }
494 Asset asset;
495 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
496 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
497 LOGE("Transfer blob to asset failed, %d", errCode);
498 goto END;
499 }
500 assets.push_back(asset);
501 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
502 errCode = E_OK;
503 Asset asset;
504 assets.push_back(asset);
505 }
506 SQLiteUtils::ResetStatement(selectStmt, true, ret);
507 }
508 return errCode != E_OK ? errCode : ret;
509 END:
510 SQLiteUtils::ResetStatement(selectStmt, true, ret);
511 return errCode != E_OK ? errCode : ret;
512 }
513
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)514 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
515 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
516 {
517 int errCode = E_OK;
518 int ret = E_OK;
519 sqlite3_stmt *selectStmt = nullptr;
520 for (const auto &rowId : dataKeys) {
521 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
522 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
523 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
524 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
525 LOGE("Get select assets statement failed, %d", errCode);
526 goto END;
527 }
528 errCode = SQLiteUtils::StepWithRetry(selectStmt);
529 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
530 std::vector<uint8_t> blobValue;
531 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
532 if (errCode != E_OK) {
533 goto END;
534 }
535 Assets tmpAssets;
536 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
537 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
538 goto END;
539 }
540 for (const auto &asset: tmpAssets) {
541 assets.push_back(asset);
542 }
543 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
544 errCode = E_OK;
545 }
546 SQLiteUtils::ResetStatement(selectStmt, true, ret);
547 }
548 return errCode != E_OK ? errCode : ret;
549 END:
550 SQLiteUtils::ResetStatement(selectStmt, true, ret);
551 return errCode != E_OK ? errCode : ret;
552 }
553
GetCloudDataCount(const std::string & tableName,DownloadData & downloadData,int64_t & count)554 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataCount(const std::string &tableName,
555 DownloadData &downloadData, int64_t &count)
556 {
557 if (downloadData.data.empty()) {
558 return E_OK;
559 }
560 int errCode = E_OK;
561 sqlite3_stmt *queryStmt = nullptr;
562 std::string querySql = "SELECT COUNT(*) FROM '" + DBCommon::GetLogTableName(tableName) +
563 "' WHERE FLAG & 0x02 = 0 AND CLOUD_GID IN (";
564 for (const VBucket &vBucket : downloadData.data) {
565 std::string cloudGid;
566 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
567 if (errCode != E_OK) {
568 LOGE("get gid for query log statement failed, %d", errCode);
569 return -E_CLOUD_ERROR;
570 }
571 querySql += "'" + cloudGid + "',";
572 }
573 querySql.pop_back();
574 querySql += ");";
575 errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, queryStmt);
576 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
577 LOGE("Get query count statement failed, %d", errCode);
578 return errCode;
579 }
580 errCode = SQLiteUtils::StepWithRetry(queryStmt);
581 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
582 count = static_cast<int64_t>(sqlite3_column_int64(queryStmt, 0));
583 errCode = E_OK;
584 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
585 errCode = E_OK;
586 }
587 int ret = E_OK;
588 SQLiteUtils::ResetStatement(queryStmt, true, ret);
589 return errCode != E_OK ? errCode : ret;
590 }
591
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)592 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
593 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
594 {
595 int errCode = E_OK;
596 for (const auto &fieldInfo: fieldInfos) {
597 if (fieldInfo.IsAssetType()) {
598 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
599 if (errCode != E_OK) {
600 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
601 return errCode;
602 }
603 } else if (fieldInfo.IsAssetsType()) {
604 errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
605 if (errCode != E_OK) {
606 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
607 return errCode;
608 }
609 }
610 }
611 return errCode;
612 }
613
SetCursorIncFlag(bool flag)614 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
615 {
616 std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
617 " VALUES ('cursor_inc_flag', ";
618 if (flag) {
619 sql += "'true'";
620 } else {
621 sql += "'false'";
622 }
623 sql += ");";
624 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
625 if (errCode != E_OK) {
626 LOGE("set cursor inc flag fail, errCode = %d", errCode);
627 }
628 return errCode;
629 }
630
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)631 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
632 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
633 {
634 if (downloadData.data.size() != downloadData.opType.size()) {
635 LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
636 downloadData.opType.size());
637 return -E_CLOUD_ERROR;
638 }
639
640 int errCode = SetLogTriggerStatus(false);
641 if (errCode != E_OK) {
642 LOGE("Fail to set log trigger off, %d", errCode);
643 return errCode;
644 }
645
646 std::map<int, int> statisticMap = {};
647 errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
648 if (errCode != E_OK) {
649 LOGE("ExecutePutCloudData failed, %d", errCode);
650 }
651 int ret = SetLogTriggerStatus(true);
652 if (ret != E_OK) {
653 LOGE("Fail to set log trigger on, %d", ret);
654 }
655 int64_t count = 0;
656 int errCodeCount = GetCloudDataCount(tableName, downloadData, count);
657 if (errCodeCount != E_OK) {
658 LOGW("get cloud data count failed, %d", errCodeCount);
659 }
660 LOGI("save cloud data of table %s [length %zu]:%d, cloud data count:%lld, ins:%d, upd:%d, del:%d, only gid:%d,"
661 "flag zero:%d, flag one:%d, upd timestamp:%d, clear gid:%d, not handle:%d, lock:%d",
662 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode, count,
663 statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
664 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
665 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
666 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
667 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
668 statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
669 return errCode == E_OK ? ret : errCode;
670 }
671
UpdateAssetStatusForAssetOnly(const TableSchema & tableSchema,VBucket & vBucket)672 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetStatusForAssetOnly(
673 const TableSchema &tableSchema, VBucket &vBucket)
674 {
675 std::string cloudGid;
676 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
677 if (errCode != E_OK) {
678 LOGE("Miss gid when fill Asset %d.", errCode);
679 return errCode;
680 }
681 std::vector<Field> assetsField;
682 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
683 if (errCode != E_OK) {
684 LOGE("No assets need to be filled when download assets only, err:%d.", errCode);
685 return errCode;
686 }
687
688 sqlite3_stmt *stmt = nullptr;
689 errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
690 if (errCode != E_OK) {
691 LOGE("can not get assetsField from tableSchema:%s err:%d when download assets only.",
692 tableSchema.name.c_str(), errCode);
693 return errCode;
694 }
695 return ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
696 }
697
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)698 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
699 const TrackerTable &trackerTable, int64_t dataKey)
700 {
701 int errCode = E_OK;
702 if (dataKey > 0) {
703 errCode = RemoveDataAndLog(tableSchema.name, dataKey);
704 if (errCode != E_OK) {
705 return errCode;
706 }
707 }
708 std::string sql = GetInsertSqlForCloudSync(tableSchema);
709 sqlite3_stmt *insertStmt = nullptr;
710 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
711 if (errCode != E_OK) {
712 LOGE("Get insert statement failed when save cloud data, %d", errCode);
713 return errCode;
714 }
715 if (putDataMode_ == PutDataMode::SYNC) {
716 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
717 }
718 errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
719 if (errCode != E_OK) {
720 SQLiteUtils::ResetStatement(insertStmt, true, errCode);
721 return errCode;
722 }
723 // insert data
724 errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
725 int ret = E_OK;
726 SQLiteUtils::ResetStatement(insertStmt, true, ret);
727 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
728 LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
729 return errCode;
730 }
731
732 // insert log
733 return InsertLogRecord(tableSchema, trackerTable, vBucket);
734 }
735
GetInsertLogSql(const std::string & logTableName,const std::set<std::string> & extendColNames)736 std::string GetInsertLogSql(const std::string &logTableName, const std::set<std::string> &extendColNames)
737 {
738 if (extendColNames.empty()) {
739 return "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " +
740 "CASE WHEN (SELECT status FROM " + logTableName + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
741 "(SELECT status FROM " + logTableName + " WHERE hash_key=?) " + "END)";
742 }
743 std::string sql = "INSERT OR REPLACE INTO " + logTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, json_object(";
744 for (const auto &extendColName : extendColNames) {
745 sql += "'" + extendColName + "',?,";
746 }
747 sql.pop_back();
748 sql += "), 0, ?, ?, CASE WHEN (SELECT status FROM " + logTableName +
749 " WHERE hash_key=?) IS NULL THEN 0 ELSE " + "(SELECT status FROM " + logTableName +
750 " WHERE hash_key=?) " + "END)";
751 return sql;
752 }
753
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)754 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
755 const TrackerTable &trackerTable, VBucket &vBucket)
756 {
757 if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
758 // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
759 // so we need to delete the old log record according to the gid first
760 std::string gidStr;
761 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
762 if (errCode != E_OK || gidStr.empty()) {
763 LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
764 return errCode;
765 }
766 std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
767 + gidStr + "';";
768 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
769 if (errCode != E_OK) {
770 LOGE("delete log record according gid fail, errCode = %d", errCode);
771 return errCode;
772 }
773 }
774
775 std::string sql = GetInsertLogSql(DBCommon::GetLogTableName(tableSchema.name), trackerTable.GetExtendNames());
776 sqlite3_stmt *insertLogStmt = nullptr;
777 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
778 if (errCode != E_OK) {
779 LOGE("Get insert log statement failed when save cloud data, %d", errCode);
780 return errCode;
781 }
782
783 errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
784 if (errCode != E_OK) {
785 SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
786 return errCode;
787 }
788
789 errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
790 int ret = E_OK;
791 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
792 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
793 return ret;
794 } else {
795 LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
796 return errCode;
797 }
798 }
799
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)800 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
801 sqlite3_stmt *updateStmt)
802 {
803 auto it = bindCloudFieldFuncMap_.find(field.type);
804 if (it == bindCloudFieldFuncMap_.end()) {
805 LOGE("unknown cloud type when bind one field.");
806 return -E_CLOUD_ERROR;
807 }
808 return it->second(index, vBucket, field, updateStmt);
809 }
810
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)811 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
812 const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
813 {
814 int errCode = E_OK;
815 int index = 0;
816 for (const auto &field : fields) {
817 index++;
818 errCode = BindOneField(index, vBucket, field, upsertStmt);
819 if (errCode != E_OK) {
820 return errCode;
821 }
822 }
823 return errCode;
824 }
825
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey,int & index)826 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
827 std::vector<uint8_t> &hashKey, int &index)
828 {
829 int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
830 if (errCode != E_OK) {
831 LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
832 return errCode;
833 }
834
835 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
836 if (errCode != E_OK) {
837 LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
838 return errCode;
839 }
840 return errCode;
841 }
842
BindExtendValue(const VBucket & vBucket,const TrackerTable & trackerTable,sqlite3_stmt * stmt,int & index)843 int BindExtendValue(const VBucket &vBucket, const TrackerTable &trackerTable, sqlite3_stmt *stmt, int &index)
844 {
845 const std::set<std::string> &extendColNames = trackerTable.GetExtendNames();
846 int errCode = E_OK;
847 int extendValueIndex = index;
848 if (extendColNames.empty()) {
849 return SQLiteUtils::BindTextToStatement(stmt, index++, "");
850 }
851 for (const auto &extendColName : extendColNames) {
852 if (vBucket.find(extendColName) == vBucket.end()) {
853 errCode = SQLiteUtils::BindTextToStatement(stmt, extendValueIndex++, "");
854 } else {
855 Type extendValue = vBucket.at(extendColName);
856 errCode = SQLiteRelationalUtils::BindStatementByType(stmt, extendValueIndex++, extendValue);
857 }
858 if (errCode != E_OK) {
859 const std::string tableName = DBCommon::StringMiddleMasking(trackerTable.GetTableName());
860 size_t nameLength = trackerTable.GetTableName().size();
861 LOGE("[%s [%zu]] Bind extend field failed: %d", tableName.c_str(), nameLength, errCode);
862 return errCode;
863 }
864 }
865 index = extendValueIndex;
866 return E_OK;
867 }
868
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt,int & index)869 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
870 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt, int &index)
871 {
872 std::vector<uint8_t> hashKey;
873 int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
874 if (errCode != E_OK) {
875 return errCode;
876 }
877 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, index++, hashKey); // next is hash_key
878 if (errCode != E_OK) {
879 LOGE("Bind hash_key to insert log statement failed, %d", errCode);
880 return errCode;
881 }
882
883 std::string cloudGid;
884 if (putDataMode_ == PutDataMode::SYNC) {
885 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
886 if (errCode != E_OK) {
887 LOGE("get gid for insert log statement failed, %d", errCode);
888 return -E_CLOUD_ERROR;
889 }
890 }
891
892 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, index++, cloudGid); // next is cloud_gid
893 if (errCode != E_OK) {
894 LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
895 return errCode;
896 }
897
898 errCode = BindExtendValue(vBucket, trackerTable, insertLogStmt, index); // next is extend_field
899 if (errCode != E_OK) {
900 LOGE("Bind extend_field to insert log statement failed, %d", errCode);
901 return errCode;
902 }
903
904 errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt, index);
905 if (errCode != E_OK) {
906 return errCode;
907 }
908 return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey, index);
909 }
910
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)911 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
912 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
913 {
914 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
915 int bindIndex = 1; // 1 is rowid
916 int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, rowid);
917 if (errCode != E_OK) {
918 LOGE("Bind rowid to insert log statement failed, %d", errCode);
919 return errCode;
920 }
921
922 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is device
923 if (errCode != E_OK) {
924 LOGE("Bind device to insert log statement failed, %d", errCode);
925 return errCode;
926 }
927
928 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, bindIndex++, GetDev()); // next is ori_device
929 if (errCode != E_OK) {
930 LOGE("Bind ori_device to insert log statement failed, %d", errCode);
931 return errCode;
932 }
933
934 int64_t val = 0;
935 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
936 if (errCode != E_OK) {
937 LOGE("get modify time for insert log statement failed, %d", errCode);
938 return -E_CLOUD_ERROR;
939 }
940
941 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is timestamp
942 if (errCode != E_OK) {
943 LOGE("Bind timestamp to insert log statement failed, %d", errCode);
944 return errCode;
945 }
946
947 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
948 if (errCode != E_OK) {
949 LOGE("get create time for insert log statement failed, %d", errCode);
950 return -E_CLOUD_ERROR;
951 }
952
953 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, bindIndex++, val); // next is wtimestamp
954 if (errCode != E_OK) {
955 LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
956 return errCode;
957 }
958
959 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, bindIndex++, GetDataFlag())); // next is flag
960 if (errCode != E_OK) {
961 LOGE("Bind flag to insert log statement failed, %d", errCode);
962 return errCode;
963 }
964
965 vBucket[DBConstant::ROWID] = rowid; // fill rowid to cloud data to notify user
966 return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt, bindIndex);
967 }
968
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)969 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
970 const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
971 {
972 std::string where = " WHERE";
973 if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
974 where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
975 DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
976 }
977 if (!pkSet.empty() && queryByPk) {
978 if (!gidStr.empty()) {
979 where += " OR";
980 }
981 where += " (1 = 1";
982 for (const auto &pk : pkSet) {
983 where += (" AND " + pk + " = ?");
984 }
985 where += ");";
986 }
987 return where;
988 }
989
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)990 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
991 const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
992 std::string &updateSql)
993 {
994 if (pkSet.empty() && gidStr.empty()) {
995 LOGE("update data fail because both primary key and gid is empty.");
996 return -E_CLOUD_ERROR;
997 }
998 std::string sql = "UPDATE " + tableSchema.name + " SET";
999 for (const auto &field : updateFields) {
1000 sql += " " + field.colName + " = ?,";
1001 }
1002 sql.pop_back();
1003 sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
1004 updateSql = sql;
1005 return E_OK;
1006 }
1007
IsGidValid(const std::string & gidStr)1008 static inline bool IsGidValid(const std::string &gidStr)
1009 {
1010 if (!gidStr.empty()) {
1011 return gidStr.find("'") == std::string::npos;
1012 }
1013 return true;
1014 }
1015
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)1016 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
1017 const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
1018 {
1019 std::string gidStr;
1020 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1021 if (errCode != E_OK) {
1022 LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
1023 return errCode;
1024 }
1025 if (!IsGidValid(gidStr)) {
1026 LOGE("invalid char in cloud gid");
1027 return -E_CLOUD_ERROR;
1028 }
1029
1030 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1031 auto updateFields = GetUpdateField(vBucket, tableSchema);
1032 std::string updateSql;
1033 errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
1034 if (errCode != E_OK) {
1035 return errCode;
1036 }
1037
1038 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
1039 if (errCode != E_OK) {
1040 LOGE("Get update statement failed when update cloud data, %d", errCode);
1041 return errCode;
1042 }
1043
1044 // bind value
1045 if (!pkSet.empty()) {
1046 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1047 updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
1048 }
1049 errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
1050 if (errCode != E_OK) {
1051 LOGE("bind value to update statement failed when update cloud data, %d", errCode);
1052 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1053 }
1054 return errCode;
1055 }
1056
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)1057 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
1058 {
1059 if (putDataMode_ == PutDataMode::SYNC) {
1060 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
1061 }
1062 sqlite3_stmt *updateStmt = nullptr;
1063 int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
1064 if (errCode != E_OK) {
1065 LOGE("Get update data table statement fail, %d", errCode);
1066 return errCode;
1067 }
1068
1069 // update data
1070 errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
1071 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1072 errCode = E_OK;
1073 } else {
1074 LOGE("update data failed when save cloud data:%d", errCode);
1075 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1076 return errCode;
1077 }
1078 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
1079
1080 // update log
1081 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
1082 if (errCode != E_OK) {
1083 LOGE("update log record failed when update cloud data, errCode = %d", errCode);
1084 }
1085 return errCode;
1086 }
1087
IsAllowWithPrimaryKey(OpType opType)1088 static inline bool IsAllowWithPrimaryKey(OpType opType)
1089 {
1090 return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
1091 opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
1092 }
1093
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1094 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1095 OpType opType)
1096 {
1097 sqlite3_stmt *updateLogStmt = nullptr;
1098 std::vector<std::string> updateColName;
1099 int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1100 if (errCode != E_OK) {
1101 LOGE("Get update log statement failed, errCode = %d", errCode);
1102 return errCode;
1103 }
1104
1105 errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1106 updateLogStmt);
1107 int ret = E_OK;
1108 if (errCode != E_OK) {
1109 LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1110 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1111 return errCode != E_OK ? errCode : ret;
1112 }
1113
1114 errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1115 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1116 errCode = E_OK;
1117 } else {
1118 LOGE("update log record failed when update cloud data:%d", errCode);
1119 }
1120 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1121 return errCode != E_OK ? errCode : ret;
1122 }
1123
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1124 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1125 const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1126 sqlite3_stmt *updateLogStmt)
1127 {
1128 int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1129 if (errCode != E_OK) {
1130 return errCode;
1131 }
1132 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1133 if (pkMap.empty()) {
1134 return E_OK;
1135 }
1136
1137 std::vector<uint8_t> hashKey;
1138 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1139 if (errCode != E_OK) {
1140 return errCode;
1141 }
1142 return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1143 }
1144
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1145 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1146 const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1147 {
1148 std::string gidStr;
1149 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1150 if (errCode != E_OK) {
1151 LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1152 return errCode;
1153 }
1154 if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1155 LOGE("empty or invalid char in cloud gid");
1156 return -E_CLOUD_ERROR;
1157 }
1158
1159 bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1160 std::string deleteSql = "DELETE FROM " + tableSchema.name;
1161 deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1162 errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1163 if (errCode != E_OK) {
1164 LOGE("Get delete statement failed when delete data, %d", errCode);
1165 return errCode;
1166 }
1167
1168 int ret = E_OK;
1169 if (!pkSet.empty() && queryByPk) {
1170 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1171 errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1172 if (errCode != E_OK) {
1173 LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1174 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1175 }
1176 }
1177 return errCode != E_OK ? errCode : ret;
1178 }
1179
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1180 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1181 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1182 {
1183 if (isLogicDelete_) {
1184 return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1185 }
1186 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1187 sqlite3_stmt *deleteStmt = nullptr;
1188 int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1189 if (errCode != E_OK) {
1190 return errCode;
1191 }
1192 errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1193 int ret = E_OK;
1194 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1195 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1196 LOGE("delete data failed when sync with cloud:%d", errCode);
1197 return errCode;
1198 }
1199 if (ret != E_OK) {
1200 LOGE("reset delete statement failed:%d", ret);
1201 return ret;
1202 }
1203
1204 // update log
1205 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1206 if (errCode != E_OK) {
1207 LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1208 }
1209 return errCode;
1210 }
1211
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1212 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1213 const TableSchema &tableSchema, OpType opType)
1214 {
1215 return UpdateLogRecord(vBucket, tableSchema, opType);
1216 }
1217
DeleteTableTrigger(const std::string & missTable) const1218 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1219 {
1220 static const char *triggerEndName[] = {
1221 "_ON_INSERT",
1222 "_ON_UPDATE",
1223 "_ON_DELETE"
1224 };
1225 std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1226 for (const auto &endName : triggerEndName) {
1227 std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1228 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1229 if (errCode != E_OK) {
1230 LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1231 return errCode;
1232 }
1233 }
1234 return E_OK;
1235 }
1236
SetLogicDelete(bool isLogicDelete)1237 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1238 {
1239 isLogicDelete_ = isLogicDelete;
1240 }
1241
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1242 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1243 const std::string &status, const Key &hashKey)
1244 {
1245 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1246 sqlite3_stmt *stmt = nullptr;
1247 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1248 if (errCode != E_OK) {
1249 LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1250 return errCode;
1251 }
1252 int ret = E_OK;
1253 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1254 if (errCode != E_OK) {
1255 LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1256 SQLiteUtils::ResetStatement(stmt, true, ret);
1257 return errCode;
1258 }
1259 errCode = SQLiteUtils::StepWithRetry(stmt);
1260 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1261 errCode = E_OK;
1262 } else {
1263 LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1264 }
1265 SQLiteUtils::ResetStatement(stmt, true, ret);
1266 return errCode == E_OK ? ret : errCode;
1267 }
1268
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1269 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1270 {
1271 maxUploadCount_ = maxUploadCount;
1272 maxUploadSize_ = maxUploadSize;
1273 }
1274
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1275 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1276 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1277 {
1278 LOGD("[RDBExecutor] logic delete skip delete data");
1279 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1280 if (errCode != E_OK) {
1281 return errCode;
1282 }
1283 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1284 if (errCode != E_OK) {
1285 return errCode;
1286 }
1287 if (!trackerTable.IsEmpty()) {
1288 return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1289 }
1290 return E_OK;
1291 }
1292
InitCursorToMeta(const std::string & tableName)1293 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1294 {
1295 Value key;
1296 Value cursor;
1297 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1298 int errCode = GetKvData(key, cursor);
1299 if (errCode == -E_NOT_FOUND) {
1300 DBCommon::StringToVector(std::string("0"), cursor);
1301 errCode = PutKvData(key, cursor);
1302 if (errCode != E_OK) {
1303 LOGE("Init cursor to meta table failed. %d", errCode);
1304 }
1305 return errCode;
1306 }
1307 if (errCode != E_OK) {
1308 LOGE("Get cursor from meta table failed. %d", errCode);
1309 }
1310 return errCode;
1311 }
1312
SetTableSchema(const TableSchema & tableSchema)1313 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1314 {
1315 tableSchema_ = tableSchema;
1316 }
1317
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)1318 int SQLiteSingleVerRelationalStorageExecutor::ReviseLocalModTime(const std::string &tableName,
1319 const std::vector<ReviseModTimeInfo> &revisedData)
1320 {
1321 sqlite3_stmt *stmt = nullptr;
1322 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1323 " SET timestamp=? where hash_key=? AND timestamp=?";
1324 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1325 if (errCode != E_OK) {
1326 LOGE("[RDBExecutor][ReviseLocalModTime] Get stmt failed: %d", errCode);
1327 return errCode;
1328 }
1329 ResFinalizer finalizer([stmt]() {
1330 sqlite3_stmt *statement = stmt;
1331 int ret = E_OK;
1332 SQLiteUtils::ResetStatement(statement, true, ret);
1333 if (ret != E_OK) {
1334 LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed %d", ret);
1335 }
1336 });
1337 for (auto &data : revisedData) {
1338 int resetCode = E_OK;
1339 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, data.curTime); // 1st bind modify time
1340 if (errCode != E_OK) {
1341 LOGE("[RDBExecutor][ReviseLocalModTime] Bind revised modify time failed: %d", errCode);
1342 return errCode;
1343 }
1344 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, data.hashKey); // 2nd bind hash key
1345 if (errCode != E_OK) {
1346 LOGE("[RDBExecutor][ReviseLocalModTime] Bind hash key failed: %d", errCode);
1347 return errCode;
1348 }
1349 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 3, data.invalidTime); // 3rd bind modify time
1350 if (errCode != E_OK) {
1351 LOGE("[RDBExecutor][ReviseLocalModTime] Bind modify time failed: %d", errCode);
1352 return errCode;
1353 }
1354 errCode = SQLiteUtils::StepWithRetry(stmt);
1355 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1356 LOGE("[RDBExecutor][ReviseLocalModTime] Revise failed: %d", errCode);
1357 return errCode;
1358 }
1359 LOGI("[RDBExecutor][ReviseLocalModTime] Local data mod time revised from %lld to %lld",
1360 data.invalidTime, data.curTime);
1361 SQLiteUtils::ResetStatement(stmt, false, resetCode);
1362 if (resetCode != E_OK) {
1363 LOGW("[RDBExecutor][ReviseLocalModTime] Reset stmt failed: %d", resetCode);
1364 return resetCode;
1365 }
1366 }
1367 return E_OK;
1368 }
1369
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo,bool & isNotIncCursor)1370 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1371 const VBucket &vBucket, const Field &field, VBucket &assetInfo, bool &isNotIncCursor)
1372 {
1373 if (field.type == TYPE_INDEX<Asset>) {
1374 Asset asset;
1375 UpdateLocalAssetId(vBucket, field.colName, asset);
1376 Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1377 if (assetDBPtr == nullptr) {
1378 isNotIncCursor = true;
1379 return true;
1380 }
1381 const Asset &assetDB = *assetDBPtr;
1382 if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1383 isNotIncCursor = true;
1384 return true;
1385 }
1386 }
1387 if (field.type == TYPE_INDEX<Assets>) {
1388 Assets assets;
1389 UpdateLocalAssetsId(vBucket, field.colName, assets);
1390 Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1391 if (assetsDBPtr == nullptr) {
1392 isNotIncCursor = true;
1393 return true;
1394 }
1395 Assets &assetsDB = *assetsDBPtr;
1396 if (assets.size() != assetsDB.size()) {
1397 isNotIncCursor = true;
1398 return true;
1399 }
1400 for (uint32_t i = 0; i < assets.size(); ++i) {
1401 if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1402 isNotIncCursor = true;
1403 return true;
1404 }
1405 }
1406 }
1407 return false;
1408 }
1409
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket,bool & isNotIncCursor)1410 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1411 const VBucket &vBucket, bool &isNotIncCursor)
1412 {
1413 std::vector<Field> assetFields;
1414 for (const auto &field : tableSchema.fields) {
1415 if (field.type == TYPE_INDEX<Asset>) {
1416 assetFields.push_back(field);
1417 }
1418 if (field.type == TYPE_INDEX<Assets>) {
1419 assetFields.push_back(field);
1420 }
1421 }
1422 if (assetFields.empty()) {
1423 return false;
1424 }
1425 sqlite3_stmt *selectStmt = nullptr;
1426 std::string queryAssetsSql = "SELECT ";
1427 for (const auto &field : assetFields) {
1428 queryAssetsSql += field.colName + ",";
1429 }
1430 queryAssetsSql.pop_back();
1431 queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1432 std::to_string(dataKey) + ";";
1433 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1434 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1435 LOGE("Get select assets statement failed, %d.", errCode);
1436 return true;
1437 }
1438 ResFinalizer finalizer([selectStmt]() {
1439 sqlite3_stmt *statementInner = selectStmt;
1440 int ret = E_OK;
1441 SQLiteUtils::ResetStatement(statementInner, true, ret);
1442 if (ret != E_OK) {
1443 LOGW("Reset stmt failed %d when get asset", ret);
1444 }
1445 });
1446 VBucket assetInfo;
1447 errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1448 if (errCode != E_OK) {
1449 return true;
1450 }
1451 return std::any_of(assetFields.begin(), assetFields.end(), [&](const Field &field) {
1452 return IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo, isNotIncCursor);
1453 });
1454 }
1455
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1456 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1457 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1458 {
1459 if (downloadData.data.size() != downloadData.opType.size()) {
1460 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1461 return -E_CLOUD_ERROR;
1462 }
1463 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1464 " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1465 sqlite3_stmt *stmt = nullptr;
1466 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1467 if (errCode != E_OK) {
1468 LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1469 return errCode;
1470 }
1471 int ret = E_OK;
1472 int index = 0;
1473 for (const auto &data: downloadData.data) {
1474 SQLiteUtils::ResetStatement(stmt, false, ret);
1475 OpType opType = downloadData.opType[index++];
1476 if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1477 continue;
1478 }
1479 errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1480 if (errCode != E_OK) {
1481 break;
1482 }
1483 }
1484 SQLiteUtils::ResetStatement(stmt, true, ret);
1485 return errCode == E_OK ? ret : errCode;
1486 }
1487
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1488 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
1489 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1490 {
1491 if (downloadData.data.empty()) {
1492 return E_OK;
1493 }
1494 if (downloadData.data.size() != downloadData.opType.size()) {
1495 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1496 return -E_CLOUD_ERROR;
1497 }
1498 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag|0x1000 WHERE cloud_gid=?;";
1499 sqlite3_stmt *stmt = nullptr;
1500 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1501 if (errCode != E_OK) {
1502 LOGE("[Storage Executor]Get mark flag as asset async download stmt failed, %d.", errCode);
1503 return errCode;
1504 }
1505 int ret = E_OK;
1506 for (const auto &gid : gidFilters) {
1507 SQLiteUtils::ResetStatement(stmt, false, ret);
1508 if (ret != E_OK) {
1509 LOGE("[Storage Executor]Reset stmt failed:%d", ret);
1510 break;
1511 }
1512 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1513 if (errCode != E_OK) {
1514 break;
1515 }
1516 errCode = SQLiteUtils::StepWithRetry(stmt);
1517 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1518 errCode = E_OK;
1519 } else {
1520 LOGW("[Storage Executor]Step mark flag as asset async download stmt failed %d gid %s",
1521 errCode, gid.c_str());
1522 }
1523 }
1524 SQLiteUtils::ResetStatement(stmt, true, ret);
1525 return errCode == E_OK ? ret : errCode;
1526 }
1527
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1528 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1529 const CloudSyncBatch &batchData)
1530 {
1531 if (batchData.extend.empty()) {
1532 return E_OK;
1533 }
1534 if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1535 LOGE("invalid sync data for filling version.");
1536 return -E_INVALID_ARGS;
1537 }
1538 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1539 "' SET version = ? WHERE hash_key = ? ";
1540 sqlite3_stmt *stmt = nullptr;
1541 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1542 if (errCode != E_OK) {
1543 return errCode;
1544 }
1545 int ret = E_OK;
1546 for (size_t i = 0; i < batchData.extend.size(); ++i) {
1547 errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1548 if (errCode != E_OK) {
1549 LOGE("bind update version stmt failed.");
1550 SQLiteUtils::ResetStatement(stmt, true, ret);
1551 return errCode;
1552 }
1553 }
1554 SQLiteUtils::ResetStatement(stmt, true, ret);
1555 return ret;
1556 }
1557
QueryCount(const std::string & tableName,int64_t & count)1558 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1559 {
1560 return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1561 }
1562
CheckInventoryData(const std::string & tableName)1563 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1564 {
1565 int64_t dataCount = 0;
1566 int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1567 if (errCode != E_OK) {
1568 LOGE("Query count failed %d", errCode);
1569 return errCode;
1570 }
1571 return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1572 }
1573
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1574 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp ×tamp,
1575 SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1576 {
1577 sqlite3_stmt *stmt = nullptr;
1578 int errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, stmt);
1579 if (errCode != E_OK) {
1580 LOGE("failed to get count statement %d", errCode);
1581 return errCode;
1582 }
1583 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1584 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1585 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1586 errCode = E_OK;
1587 } else {
1588 LOGE("Failed to get the count to be uploaded. %d", errCode);
1589 }
1590 SQLiteUtils::ResetStatement(stmt, true, errCode);
1591 return errCode;
1592 }
1593
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1594 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush,
1595 bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1596 {
1597 int errCode;
1598 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1599 if (errCode != E_OK) {
1600 return errCode;
1601 }
1602 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1603 CloudWaterType::DELETE);
1604 return GetUploadCountInner(timestamp, helper, sql, count);
1605 }
1606
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1607 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> ×tampVec,
1608 bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1609 {
1610 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1611 if (timestampVec.size() != typeVec.size()) {
1612 return -E_INVALID_ARGS;
1613 }
1614 int errCode;
1615 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1616 if (errCode != E_OK) {
1617 return errCode;
1618 }
1619 count = 0;
1620 for (size_t i = 0; i < typeVec.size(); i++) {
1621 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1622 int64_t tempCount = 0;
1623 helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1624 errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1625 if (errCode != E_OK) {
1626 return errCode;
1627 }
1628 count += tempCount;
1629 }
1630 return E_OK;
1631 }
1632
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1633 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1634 bool ignoreEmptyGid)
1635 {
1636 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1637 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1638 return -E_INVALID_ARGS;
1639 }
1640 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1641 + "' SET cloud_gid = ? WHERE data_key = ? ";
1642 sqlite3_stmt *stmt = nullptr;
1643 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1644 if (errCode != E_OK) {
1645 return errCode;
1646 }
1647 errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1648 int resetCode = E_OK;
1649 SQLiteUtils::ResetStatement(stmt, true, resetCode);
1650 return errCode == E_OK ? resetCode : errCode;
1651 }
1652
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1653 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1654 CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1655 {
1656 token.GetCloudTableSchema(tableSchema_);
1657 sqlite3_stmt *queryStmt = nullptr;
1658 bool isStepNext = false;
1659 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1660 if (errCode != E_OK) {
1661 (void)token.ReleaseCloudStatement();
1662 return errCode;
1663 }
1664 uint32_t totalSize = 0;
1665 uint32_t stepNum = -1;
1666 do {
1667 if (isStepNext) {
1668 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1669 if (errCode != E_OK) {
1670 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1671 break;
1672 }
1673 }
1674 isStepNext = true;
1675 errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1676 } while (errCode == E_OK);
1677 if (errCode != -E_UNFINISHED) {
1678 (void)token.ReleaseCloudStatement();
1679 }
1680 return errCode;
1681 }
1682
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1683 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1684 {
1685 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1686 Asset asset;
1687 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1688 if (errCode != E_OK) {
1689 return errCode;
1690 }
1691 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1692 return -E_CLOUD_ERROR;
1693 }
1694 vBucket.insert_or_assign(field.colName, asset);
1695 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1696 Assets assets;
1697 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1698 if (errCode != E_OK) {
1699 return errCode;
1700 }
1701 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1702 return -E_CLOUD_ERROR;
1703 }
1704 vBucket.insert_or_assign(field.colName, assets);
1705 } else {
1706 vBucket.insert_or_assign(field.colName, cloudValue);
1707 }
1708 return E_OK;
1709 }
1710
GetDownloadAsset(std::vector<VBucket> & assetsV,const Field & field,Type & cloudValue)1711 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAsset(std::vector<VBucket> &assetsV, const Field &field,
1712 Type &cloudValue)
1713 {
1714 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1715 Asset asset;
1716 VBucket bucket;
1717 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1718 if (errCode != E_OK) {
1719 return errCode;
1720 }
1721 if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1722 bucket.insert_or_assign(field.colName, asset);
1723 assetsV.push_back(bucket);
1724 }
1725 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1726 Assets assets;
1727 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1728 if (errCode != E_OK) {
1729 return errCode;
1730 }
1731 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1732 return E_OK;
1733 }
1734 for (const auto &asset : assets) {
1735 if (AssetOperationUtils::IsAssetNeedDownload(asset)) {
1736 VBucket bucket;
1737 bucket.insert_or_assign(field.colName, asset);
1738 assetsV.push_back(bucket);
1739 }
1740 }
1741 }
1742 return E_OK;
1743 }
1744
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1745 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1746 const std::vector<Field> &assetFields, VBucket &assetInfo)
1747 {
1748 int errCode = SQLiteUtils::StepWithRetry(stmt);
1749 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1750 int index = 0;
1751 for (const auto &field : assetFields) {
1752 Type cloudValue;
1753 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1754 if (errCode != E_OK) {
1755 break;
1756 }
1757 errCode = PutVBucketByType(assetInfo, field, cloudValue);
1758 if (errCode != E_OK) {
1759 break;
1760 }
1761 }
1762 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1763 errCode = E_OK;
1764 } else {
1765 LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1766 }
1767 return errCode;
1768 }
1769
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)1770 int SQLiteSingleVerRelationalStorageExecutor::GetLocalDataCount(const std::string &tableName, int &dataCount,
1771 int &logicDeleteDataCount)
1772 {
1773 std::string dataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key != -1";
1774 int errCode = SQLiteUtils::GetCountBySql(dbHandle_, dataCountSql, dataCount);
1775 if (errCode != E_OK) {
1776 LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
1777 return errCode;
1778 }
1779
1780 std::string logicDeleteDataCountSql = "select count(*) from " + DBCommon::GetLogTableName(tableName) +
1781 " where flag&0x08!=0 and data_key != -1";
1782 errCode = SQLiteUtils::GetCountBySql(dbHandle_, logicDeleteDataCountSql, logicDeleteDataCount);
1783 if (errCode != E_OK) {
1784 LOGE("[RDBExecutor] Query local logic delete data count failed: %d", errCode);
1785 }
1786 return errCode;
1787 }
1788
UpdateExtendField(const std::string & tableName,const std::set<std::string> & extendColNames)1789 int SQLiteSingleVerRelationalStorageExecutor::UpdateExtendField(const std::string &tableName,
1790 const std::set<std::string> &extendColNames)
1791 {
1792 bool isLogTableExist = false;
1793 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist);
1794 if (errCode == E_OK && !isLogTableExist) {
1795 LOGW("[RDBExecutor][UpdateExtendField] Log table of [%s [%zu]] not found!",
1796 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1797 return E_OK;
1798 }
1799 std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " as log set extend_field = json_object(";
1800 for (const auto &extendColName : extendColNames) {
1801 sql += "'" + extendColName + "',data." + extendColName + ",";
1802 }
1803 sql.pop_back();
1804 sql += ") from " + tableName + " as data where log.data_key = data." + std::string(DBConstant::SQLITE_INNER_ROWID);
1805 sqlite3_stmt *stmt = nullptr;
1806 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1807 if (errCode != E_OK) {
1808 return errCode;
1809 }
1810
1811 ResFinalizer finalizer([stmt]() {
1812 sqlite3_stmt *statement = stmt;
1813 int ret = E_OK;
1814 SQLiteUtils::ResetStatement(statement, true, ret);
1815 if (ret != E_OK) {
1816 LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1817 }
1818 });
1819 errCode = SQLiteUtils::StepWithRetry(stmt);
1820 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1821 LOGE("[RDBExecutor][UpdateExtendField] Update [%s [%zu]] extend field failed: %d",
1822 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1823 return errCode;
1824 }
1825 return E_OK;
1826 }
1827
BuildJsonExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,sqlite3 * db)1828 int BuildJsonExtendField(const std::string &tableName, const std::string &lowVersionExtendColName, sqlite3 *db)
1829 {
1830 std::string sql = "update " + DBCommon::GetLogTableName(tableName) + " set extend_field = json_object('" +
1831 lowVersionExtendColName + "',extend_field) where data_key = -1 and (json_valid(extend_field) = 0 or " +
1832 "json_extract(extend_field, '$." + lowVersionExtendColName +"') is null)";
1833 sqlite3_stmt *stmt = nullptr;
1834 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1835 if (errCode != E_OK) {
1836 return errCode;
1837 }
1838 errCode = SQLiteUtils::StepWithRetry(stmt);
1839 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1840 LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update [%s [%zu]] extend field non-JSON format failed: %d",
1841 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1842 } else {
1843 errCode = E_OK;
1844 }
1845 int ret = E_OK;
1846 SQLiteUtils::ResetStatement(stmt, true, ret);
1847 if (ret != E_OK) {
1848 LOGW("[RDBExecutor][UpdateExtendField] Reset stmt failed %d", ret);
1849 }
1850 return errCode;
1851 }
1852
GetUpdateExtendFieldSql(const std::string & tableName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1853 std::string GetUpdateExtendFieldSql(const std::string &tableName, const std::set<std::string> &oldExtendColNames,
1854 const std::set<std::string> &extendColNames)
1855 {
1856 std::string sql = "update " + DBCommon::GetLogTableName(tableName) +
1857 " set extend_field = json_insert(extend_field,";
1858 bool isContainNewCol = false;
1859 for (const auto &extendColName : extendColNames) {
1860 if (oldExtendColNames.find(extendColName) != oldExtendColNames.end()) {
1861 continue;
1862 }
1863 isContainNewCol = true;
1864 sql += "'$." + extendColName + "',null,";
1865 }
1866 if (!isContainNewCol) {
1867 return "";
1868 }
1869 sql.pop_back();
1870 sql += ") where data_key = -1 and json_valid(extend_field) = 1;";
1871 return sql;
1872 }
1873
UpdateDeleteDataExtendField(const std::string & tableName,const std::string & lowVersionExtendColName,const std::set<std::string> & oldExtendColNames,const std::set<std::string> & extendColNames)1874 int SQLiteSingleVerRelationalStorageExecutor::UpdateDeleteDataExtendField(const std::string &tableName,
1875 const std::string &lowVersionExtendColName, const std::set<std::string> &oldExtendColNames,
1876 const std::set<std::string> &extendColNames)
1877 {
1878 bool isLogTableExist = false;
1879 if (SQLiteUtils::CheckTableExists(dbHandle_, DBCommon::GetLogTableName(tableName), isLogTableExist) == E_OK &&
1880 !isLogTableExist) {
1881 LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Log table of [%s [%zu]] not found!",
1882 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1883 return E_OK;
1884 }
1885 int errCode = E_OK;
1886 if (!lowVersionExtendColName.empty()) {
1887 errCode = BuildJsonExtendField(tableName, lowVersionExtendColName, dbHandle_);
1888 if (errCode != E_OK) {
1889 LOGE("[UpdateDeleteDataExtendField] Update low version extend field of [%s [%zu]] to json failed: %d",
1890 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1891 return errCode;
1892 }
1893 }
1894 std::string sql = GetUpdateExtendFieldSql(tableName, oldExtendColNames, extendColNames);
1895 if (sql.empty()) {
1896 return E_OK;
1897 }
1898
1899 sqlite3_stmt *stmt = nullptr;
1900 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1901 if (errCode != E_OK) {
1902 return errCode;
1903 }
1904 errCode = SQLiteUtils::StepWithRetry(stmt);
1905 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1906 LOGE("[RDBExecutor][UpdateDeleteDataExtendField] Update extend field of [%s [%zu]] failed: %d",
1907 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), errCode);
1908 } else {
1909 errCode = E_OK;
1910 }
1911 int ret = E_OK;
1912 SQLiteUtils::ResetStatement(stmt, true, ret);
1913 if (ret != E_OK) {
1914 LOGW("[RDBExecutor][UpdateDeleteDataExtendField] Reset stmt failed %d", ret);
1915 }
1916 return errCode;
1917 }
1918
GetDownloadAssetGid(const TableSchema & tableSchema,std::vector<std::string> & gids,int64_t beginTime,bool abortWithLimit)1919 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetGid(const TableSchema &tableSchema,
1920 std::vector<std::string> &gids, int64_t beginTime, bool abortWithLimit)
1921 {
1922 std::string sql = "SELECT cloud_gid FROM " + DBCommon::GetLogTableName(tableSchema.name) +
1923 " WHERE flag&0x1000=0x1000 AND timestamp >= ?;";
1924 sqlite3_stmt *stmt = nullptr;
1925 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1926 if (errCode != E_OK) {
1927 LOGE("[RDBExecutor]Get gid statement failed, %d", errCode);
1928 return errCode;
1929 }
1930 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, beginTime);
1931 if (errCode != E_OK) {
1932 LOGE("[RDBExecutor] bind time failed %d when get download asset gid", errCode);
1933 SQLiteUtils::ResetStatement(stmt, true, errCode);
1934 return errCode;
1935 }
1936 uint32_t count = 0;
1937 do {
1938 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1939 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1940 errCode = E_OK;
1941 break;
1942 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1943 LOGE("[RDBExecutor]Get downloading assets gid failed. %d", errCode);
1944 break;
1945 }
1946 std::string gid;
1947 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1948 if (errCode != E_OK) {
1949 LOGW("[RDBExecutor]Get downloading assets gid failed %d when get col", errCode);
1950 continue;
1951 }
1952 gids.push_back(gid);
1953 if (AbortGetDownloadAssetGidIfNeed(tableSchema, gid, abortWithLimit, count)) {
1954 break;
1955 }
1956 } while (errCode == E_OK);
1957 int ret = E_OK;
1958 SQLiteUtils::ResetStatement(stmt, true, ret);
1959 return errCode == E_OK ? ret : errCode;
1960 }
1961
GetDownloadAssetRecordsByGid(const TableSchema & tableSchema,const std::string gid,std::vector<VBucket> & assets)1962 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsByGid(const TableSchema &tableSchema,
1963 const std::string gid, std::vector<VBucket> &assets)
1964 {
1965 std::vector<Field> assetFields;
1966 std::string sql = "SELECT";
1967 for (const auto &field: tableSchema.fields) {
1968 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1969 assetFields.emplace_back(field);
1970 sql += " b." + field.colName + ",";
1971 }
1972 }
1973 if (assetFields.empty()) {
1974 return E_OK;
1975 }
1976 sql.pop_back(); // remove last ,
1977 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE a.cloud_gid = ?;";
1978 sqlite3_stmt *stmt = nullptr;
1979 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1980 if (errCode != E_OK) {
1981 LOGE("Get downloading asset records statement failed, %d", errCode);
1982 return errCode;
1983 }
1984 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid);
1985 if (errCode != E_OK) {
1986 SQLiteUtils::ResetStatement(stmt, true, errCode);
1987 return errCode;
1988 }
1989 errCode = SQLiteUtils::StepWithRetry(stmt);
1990 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1991 int index = 0;
1992 for (const auto &field: assetFields) {
1993 Type value;
1994 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, value);
1995 if (errCode != E_OK) {
1996 break;
1997 }
1998 errCode = GetDownloadAsset(assets, field, value);
1999 if (errCode != E_OK) {
2000 break;
2001 }
2002 }
2003 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2004 errCode = E_OK;
2005 } else {
2006 LOGE("step get downloading asset records statement failed %d.", errCode);
2007 }
2008 int ret = E_OK;
2009 SQLiteUtils::ResetStatement(stmt, true, ret);
2010 return errCode == E_OK ? ret : errCode;
2011 }
2012
GetDownloadingCount(const std::string & tableName,int32_t & count)2013 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingCount(const std::string &tableName, int32_t &count)
2014 {
2015 std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName) + " WHERE flag&0x1000=0x1000;";
2016 int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
2017 if (errCode != E_OK) {
2018 LOGE("[RDBExecutor] Query local data count failed: %d", errCode);
2019 }
2020 return errCode;
2021 }
2022
GetDownloadingAssetsCount(const TableSchema & tableSchema,int32_t & totalCount)2023 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadingAssetsCount(
2024 const TableSchema &tableSchema, int32_t &totalCount)
2025 {
2026 std::vector<std::string> gids;
2027 int errCode = GetDownloadAssetGid(tableSchema, gids);
2028 if (errCode != E_OK) {
2029 LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2030 return errCode;
2031 }
2032 for (const auto &gid : gids) {
2033 std::vector<VBucket> assets;
2034 errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2035 if (errCode != E_OK) {
2036 LOGE("[RDBExecutor]Get downloading assets records by gid failed: %d", errCode);
2037 return errCode;
2038 }
2039 totalCount += static_cast<int32_t>(assets.size());
2040 }
2041 return E_OK;
2042 }
2043
GetDownloadAssetRecordsInner(const TableSchema & tableSchema,int64_t beginTime,std::vector<std::string> & gids)2044 int SQLiteSingleVerRelationalStorageExecutor::GetDownloadAssetRecordsInner(
2045 const TableSchema &tableSchema, int64_t beginTime, std::vector<std::string> &gids)
2046 {
2047 int errCode = GetDownloadAssetGid(tableSchema, gids, beginTime, true);
2048 if (errCode != E_OK) {
2049 LOGE("[RDBExecutor]Get downloading assets gid failed: %d", errCode);
2050 }
2051 return errCode;
2052 }
2053
CleanDownloadingFlag(const std::string & tableName)2054 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadingFlag(const std::string &tableName)
2055 {
2056 std::string sql;
2057 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag=flag&(~0x1000);";
2058 sqlite3_stmt *stmt = nullptr;
2059 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
2060 if (errCode != E_OK) {
2061 LOGE("[RDBExecutor]Get stmt failed clean downloading flag: %d, tableName: %s, length: %zu",
2062 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2063 return errCode;
2064 }
2065 errCode = SQLiteUtils::StepWithRetry(stmt);
2066 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2067 errCode = E_OK;
2068 } else {
2069 LOGE("[RDBExecutor]Clean downloading flag failed: %d, tableName: %s, length: %zu",
2070 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
2071 }
2072 int ret = E_OK;
2073 SQLiteUtils::ResetStatement(stmt, true, ret);
2074 if (ret != E_OK) {
2075 LOGE("[RDBExecutor]Reset stmt failed clean downloading flag: %d", ret);
2076 }
2077 return errCode != E_OK ? errCode : ret;
2078 }
2079
AbortGetDownloadAssetGidIfNeed(const DistributedDB::TableSchema & tableSchema,const std::string & gid,bool abortWithLimit,uint32_t & count)2080 bool SQLiteSingleVerRelationalStorageExecutor::AbortGetDownloadAssetGidIfNeed(
2081 const DistributedDB::TableSchema &tableSchema, const std::string &gid, bool abortWithLimit,
2082 uint32_t &count)
2083 {
2084 if (!abortWithLimit) {
2085 return false;
2086 }
2087 std::vector<VBucket> assets;
2088 int errCode = GetDownloadAssetRecordsByGid(tableSchema, gid, assets);
2089 if (errCode != E_OK) {
2090 LOGW("[RDBExecutor]Get downloading assets failed %d gid %s", errCode, gid.c_str());
2091 return false;
2092 }
2093 count += assets.size();
2094 return count >= RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetMaxDownloadAssetsCount();
2095 }
2096 } // namespace DistributedDB
2097 #endif
2098