1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H 16 #define SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H 17 #ifdef RELATIONAL_STORE 18 19 #include <atomic> 20 #include "cloud/asset_operation_utils.h" 21 #include "cloud/cloud_db_constant.h" 22 #include "cloud/cloud_store_types.h" 23 #include "cloud/cloud_upload_recorder.h" 24 #include "data_transformer.h" 25 #include "db_types.h" 26 #include "icloud_sync_storage_interface.h" 27 #include "macro_utils.h" 28 #include "query_object.h" 29 #include "relational_row_data.h" 30 #include "relational_store_delegate.h" 31 #include "relational_sync_data_inserter.h" 32 #include "sqlite_log_table_manager.h" 33 #include "sqlite_single_ver_relational_continue_token.h" 34 #include "sqlite_storage_executor.h" 35 #include "sqlite_utils.h" 36 #include "tracker_table.h" 37 38 namespace DistributedDB { 39 class SQLiteSingleVerRelationalStorageExecutor : public SQLiteStorageExecutor { 40 public: 41 enum class PutDataMode { 42 SYNC, 43 USER 44 }; 45 enum class MarkFlagOption { 46 DEFAULT, 47 SET_WAIT_COMPENSATED_SYNC 48 }; 49 50 struct UpdateCursorContext { 51 int errCode = E_OK; 52 uint64_t cursor; 53 }; 54 55 SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable, DistributedTableMode mode); 56 ~SQLiteSingleVerRelationalStorageExecutor() override = default; 57 58 // Delete the copy and assign constructors 59 DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerRelationalStorageExecutor); 60 61 int ResetLogStatus(std::string &tableName); 62 63 int CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded, const std::string &identity, 64 TableInfo &table); 65 66 // The parameter "identity" is a hash string that identifies a device 67 int CreateDistributedTable(DistributedTableMode mode, bool isUpgraded, const std::string &identity, 68 TableInfo &table); 69 70 int UpgradeDistributedTable(const std::string &tableName, DistributedTableMode mode, bool &schemaChanged, 71 RelationalSchemaObject &schema, TableSyncType syncType); 72 73 int StartTransaction(TransactType type); 74 int Commit(); 75 int Rollback(); 76 77 // For Get sync data 78 int GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength, const DataSizeSpecInfo &sizeInfo, 79 std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt, const TableInfo &tableInfo); 80 81 // operation of meta data 82 int GetKvData(const Key &key, Value &value) const; 83 int GetKvDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const; 84 int PutKvData(const Key &key, const Value &value) const; 85 int DeleteMetaData(const std::vector<Key> &keys) const; 86 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const; 87 int GetAllMetaKeys(std::vector<Key> &keys) const; 88 89 // For Put sync data 90 int SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans = true); 91 92 int CheckDBModeForRelational(); 93 94 int DeleteDistributedDeviceTable(const std::string &device, const std::string &tableName); 95 96 int DeleteDistributedAllDeviceTableLog(const std::string &tableName); 97 98 int DeleteDistributedDeviceTableLog(const std::string &device, const std::string &tableName); 99 100 int DeleteDistributedLogTable(const std::string &tableName); 101 102 int CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames, 103 std::vector<std::string> &missingTables); 104 105 int CreateDistributedDeviceTable(const std::string &device, const TableInfo &baseTbl, const StoreInfo &info); 106 107 int CheckQueryObjectLegal(const TableInfo &table, QueryObject &query, const std::string &schemaVersion); 108 109 int CheckQueryObjectLegal(const QuerySyncObject &query); 110 111 int GetMaxTimestamp(const std::vector<std::string> &tablesName, Timestamp &maxTimestamp) const; 112 113 int ExecuteQueryBySqlStmt(const std::string &sql, const std::vector<std::string> &bindArgs, int packetSize, 114 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data); 115 116 int SaveSyncDataItems(RelationalSyncDataInserter &inserter); 117 118 int CheckEncryptedOrCorrupted() const; 119 120 int GetExistsDeviceList(std::set<std::string> &devices) const; 121 122 int GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush, bool isCompensatedTask, 123 QuerySyncObject &query, int64_t &count); 124 125 int GetAllUploadCount(const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, bool isCompensatedTask, 126 QuerySyncObject &query, int64_t &count); 127 128 int UpdateCloudLogGid(const CloudSyncData &cloudDataResult, bool ignoreEmptyGid); 129 130 int GetSyncCloudData(const CloudUploadRecorder &uploadRecorder, CloudSyncData &cloudDataResult, 131 SQLiteSingleVerRelationalContinueToken &token); 132 133 int GetSyncCloudGid(QuerySyncObject &query, const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy, 134 bool isCompensatedTask, std::vector<std::string> &cloudGid); 135 136 void SetLocalSchema(const RelationalSchemaObject &localSchema); 137 138 int GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema, const VBucket &vBucket, 139 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 140 141 int PutCloudSyncData(const std::string &tableName, const TableSchema &tableSchema, 142 const TrackerTable &trackerTable, DownloadData &downloadData); 143 144 int UpdateAssetStatusForAssetOnly(const TableSchema &tableSchema, VBucket &asset); 145 146 int FillCloudAssetForDownload(const TableSchema &tableSchema, VBucket &vBucket, bool isDownloadSuccess, 147 uint64_t &currCursor); 148 149 int DoCleanInner(ClearMode mode, const std::vector<std::string> &tableNameList, 150 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets, 151 std::vector<std::string> ¬ifyTableList); 152 int DoClearCloudLogVersion(const std::vector<std::string> &tableNameList); 153 154 int FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema, const CloudSyncBatch &data); 155 int FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data); 156 157 int SetLogTriggerStatus(bool status); 158 int SetCursorIncFlag(bool flag); 159 160 int GetCursor(const std::string &tableName, uint64_t &cursor); 161 162 int AnalysisTrackerTable(const TrackerTable &trackerTable, TableInfo &tableInfo); 163 int CreateTrackerTable(const TrackerTable &trackerTable, const TableInfo &table, bool checkData); 164 int GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema); 165 int ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records); 166 167 int GetClearWaterMarkTables(const std::vector<TableReferenceProperty> &tableReferenceProperty, 168 const RelationalSchemaObject &schema, std::set<std::string> &clearWaterMarkTables); 169 int CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag); 170 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties); 171 int ClearAllTempSyncTrigger(); 172 int CleanTrackerData(const std::string &tableName, int64_t cursor, bool isOnlyTrackTable); 173 int CreateSharedTable(const TableSchema &schema); 174 int DeleteTable(const std::vector<std::string> &tableNames); 175 int UpdateSharedTable(const std::map<std::string, std::vector<Field>> &updateTableNames); 176 int AlterTableName(const std::map<std::string, std::string> &tableNames); 177 int DeleteTableTrigger(const std::string &missTable) const; 178 179 int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 180 const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference, 181 std::map<int64_t, Entries> &referenceGid); 182 183 int CheckIfExistUserTable(const std::string &tableName); 184 185 void SetLogicDelete(bool isLogicDelete); 186 int RenewTableTrigger(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, 187 const std::string &localIdentity = ""); 188 189 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 190 const Bytes &hashKey, VBucket &assets); 191 192 int FillHandleWithOpType(const OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid, 193 const TableSchema &tableSchema); 194 195 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 196 197 int CleanResourceForDroppedTable(const std::string &tableName); 198 199 void SetPutDataMode(PutDataMode mode); 200 201 void SetMarkFlagOption(MarkFlagOption option); 202 203 int UpgradedLogForExistedData(const TableInfo &tableInfo, bool schemaChanged); 204 205 int UpdateRecordFlag(const std::string &tableName, const std::string &sql, const LogInfo &logInfo); 206 207 void MarkFlagAsUploadFinished(const std::string &tableName, const Key &hashKey, Timestamp timestamp, 208 bool isExistAssetsDownload); 209 210 int CleanUploadFinishedFlag(const std::string &tableName); 211 212 int GetWaitCompensatedSyncDataPk(const TableSchema &table, std::vector<VBucket> &data, 213 bool isQueryDownloadRecords); 214 215 int ClearUnLockingStatus(const std::string &tableName); 216 217 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 218 const std::set<std::string> &gidFilters); 219 220 int MarkFlagAsAssetAsyncDownload(const std::string &tableName, const DownloadData &downloadData, 221 const std::set<std::string> &gidFilters); 222 223 int CheckInventoryData(const std::string &tableName); 224 225 int UpdateRecordStatus(const std::string &tableName, const std::string &status, const Key &hashKey); 226 227 void SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize); 228 229 int InitCursorToMeta(const std::string &tableName) const; 230 231 void SetTableSchema(const TableSchema &tableSchema); 232 233 int ReviseLocalModTime(const std::string &tableName, const std::vector<ReviseModTimeInfo> &revisedData); 234 235 int GetLocalDataCount(const std::string &tableName, int &dataCount, int &logicDeleteDataCount); 236 237 int GetCloudDataCount(const std::string &tableName, DownloadData &downloadData, int64_t &count); 238 239 int UpdateExtendField(const std::string &tableName, const std::set<std::string> &extendColNames, 240 const std::string &condition = ""); 241 242 int UpdateDeleteDataExtendField(const std::string &tableName, const std::string &lowVersionExtendColName, 243 const std::set<std::string> &oldExtendColNames, const std::set<std::string> &extendColNames); 244 245 int GetDownloadingAssetsCount(const TableSchema &tableSchema, int32_t &totalCount); 246 247 int GetDownloadingCount(const std::string &tableName, int32_t &count); 248 249 int GetDownloadAssetGid(const TableSchema &tableSchema, std::vector<std::string> &gids, 250 int64_t beginTime = 0, bool abortWithLimit = false); 251 252 int GetDownloadAssetRecordsInner(const TableSchema &tableSchema, int64_t beginTime, std::vector<std::string> &gids); 253 254 int GetDownloadAssetRecordsByGid(const TableSchema &tableSchema, const std::string gid, 255 std::vector<VBucket> &assets); 256 257 int CleanDownloadingFlag(const std::string &tableName); 258 259 int CleanDownloadingFlagByGid(const std::string &tableName, const std::string &gid, VBucket dbAssets); 260 261 void CheckAndCreateTrigger(const TableInfo &table); 262 263 int GetLockStatusByGid(const std::string &tableName, const std::string &gid, LockStatus &status); 264 265 int CompareSchemaTableColumns(const std::string &tableName); 266 267 int UpdateHashKey(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, 268 const std::string &localIdentity); 269 270 void SetTableMode(DistributedTableMode mode); 271 272 int GetFlagIsLocalCount(const std::string &logTableName, int32_t &count); 273 274 void ClearLogOfMismatchedData(const std::string &tableName); 275 276 int IsTableOnceDropped(const std::string &tableName, bool &onceDropped); 277 278 void RecoverNullExtendLog(const TrackerSchema &trackerSchema, const TrackerTable &table); 279 private: 280 int UpdateHashKeyWithOutPk(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, 281 const std::string &localIdentity); 282 283 int DoCleanLogs(const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema); 284 285 int DoCleanLogAndData(const std::vector<std::string> &tableNameList, 286 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets); 287 288 int CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode); 289 290 int ClearVersionOnLogTable(const std::string &logTableName); 291 292 int CleanCloudDataAndLogOnUserTable(const std::string &tableName, const std::string &logTableName, 293 const RelationalSchemaObject &localSchema); 294 295 int ChangeCloudDataFlagOnLogTable(const std::string &logTableName); 296 297 int SetDataOnUserTableWithLogicDelete(const std::string &tableName, const std::string &logTableName); 298 299 static void UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv); 300 301 int CreateFuncUpdateCursor(UpdateCursorContext &context, 302 void(*updateCursorFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const; 303 304 int SetCursor(const std::string &tableName, uint64_t cursor); 305 306 int IncreaseCursorOnAssetData(const std::string &tableName, const std::string &gid, uint64_t &currCursor); 307 308 int GetCleanCloudDataKeys(const std::string &logTableName, std::vector<int64_t> &dataKeys, 309 bool distinguishCloudFlag); 310 311 int GetCloudAssets(const std::string &tableName, const std::vector<FieldInfo> &fieldInfos, 312 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 313 314 int GetAssetOnTable(const std::string &tableName, const std::string &fieldName, 315 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 316 317 int GetCloudAssetsOnTable(const std::string &tableName, const std::string &fieldName, 318 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 319 320 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem, bool isGettingDeletedData) const; 321 322 int CheckDataConflictDefeated(const DataItem &item, const RelationalSyncDataInserter &inserter, 323 SaveSyncDataStmt &saveStmt, DeviceSyncSaveDataInfo &saveDataInfo); 324 325 int SaveSyncDataItem(RelationalSyncDataInserter &inserter, SaveSyncDataStmt &saveStmt, DataItem &item); 326 327 int SaveSyncDataItem(const DataItem &dataItem, bool isUpdate, SaveSyncDataStmt &saveStmt, 328 RelationalSyncDataInserter &inserter, std::map<std::string, Type> &saveVals); 329 330 int DeleteSyncDataItem(const DataItem &dataItem, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt); 331 332 int AlterAuxTableForUpgrade(const TableInfo &oldTableInfo, const TableInfo &newTableInfo); 333 334 int DeleteSyncLog(const DataItem &item, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmLogStmt); 335 int ProcessMissQueryData(const DataItem &item, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, 336 sqlite3_stmt *&rmLogStmt); 337 int GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item); 338 int GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData, sqlite3_stmt *queryStmt, DataItem &item, 339 Timestamp &queryTime); 340 int GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item, Timestamp &missQueryTime); 341 342 void SetTableInfo(const TableInfo &tableInfo); // When put or get sync data, must call the func first. 343 344 int GeneLogInfoForExistedData(sqlite3 *db, const std::string &identity, const TableInfo &tableInfo, 345 std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable); 346 347 int CleanExtendAndCursorForDeleteData(const std::string &tableName); 348 349 int GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder, sqlite3_stmt *statement, 350 CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize); 351 352 int PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue); 353 354 int GetDownloadAsset(std::vector<VBucket> &assetsV, const Field &field, Type &cloudValue); 355 356 int ExecutePutCloudData(const std::string &tableName, const TableSchema &tableSchema, 357 const TrackerTable &trackerTable, DownloadData &downloadData, std::map<int, int> &statisticMap); 358 359 std::string GetInsertSqlForCloudSync(const TableSchema &tableSchema); 360 361 int GetPrimaryKeyHashValue(const VBucket &vBucket, const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, 362 bool allowEmpty = false); 363 364 int GetQueryLogStatement(const TableSchema &tableSchema, const VBucket &vBucket, const std::string &querySql, 365 const Key &hashKey, sqlite3_stmt *&selectStmt); 366 367 int GetQueryLogSql(const std::string &tableName, const VBucket &vBucket, const std::set<std::string> &pkSet, 368 std::string &querySql); 369 370 int GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket, std::set<std::string> &pkSet, 371 std::vector<Field> &assetFields, std::string &querySql); 372 373 int GetFillDownloadAssetStatement(const std::string &tableName, const VBucket &vBucket, 374 const std::vector<Field> &fields, sqlite3_stmt *&statement); 375 376 int InitFillUploadAssetStatement(OpType opType, const TableSchema &tableSchema, const CloudSyncBatch &data, 377 const int &index, sqlite3_stmt *&statement); 378 379 int GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo); 380 381 int GetInfoByStatement(sqlite3_stmt *statement, const std::vector<Field> &assetFields, 382 const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 383 384 int InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema, const TrackerTable &trackerTable, 385 int64_t dataKey); 386 387 int InsertLogRecord(const TableSchema &tableSchema, const TrackerTable &trackerTable, VBucket &vBucket); 388 389 int BindOneField(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *updateStmt); 390 391 int BindValueToUpsertStatement(const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *upsertStmt); 392 393 int BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt, std::vector<uint8_t> &hashKey, int &index); 394 395 int BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 396 const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt, int &index); 397 398 int BindShareValueToInsertLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 399 sqlite3_stmt *insertLogStmt, int &index); 400 401 int BindValueToInsertLogStatement(VBucket &vBucket, const TableSchema &tableSchema, 402 const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt); 403 404 std::string GetWhereConditionForDataTable(const std::string &gidStr, const std::set<std::string> &pkSet, 405 const std::string &tableName, bool queryByPk = true); 406 407 int GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields, const TableSchema &tableSchema, 408 const std::string &gidStr, const std::set<std::string> &pkSet, std::string &updateSql); 409 410 int GetUpdateDataTableStatement(const VBucket &vBucket, const TableSchema &tableSchema, sqlite3_stmt *&updateStmt); 411 412 int UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema); 413 414 int GetUpdateLogRecordStatement(const TableSchema &tableSchema, const VBucket &vBucket, OpType opType, 415 std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt); 416 int AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema, const VBucket &vBucket, 417 std::string &sql); 418 419 int UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema, OpType opType); 420 421 int BindValueToUpdateLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 422 const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty, sqlite3_stmt *updateLogStmt); 423 424 int GetDeleteStatementForCloudSync(const TableSchema &tableSchema, const std::set<std::string> &pkSet, 425 const VBucket &vBucket, sqlite3_stmt *&deleteStmt); 426 427 int UpdateTrackerTable(sqlite3 *db, const std::string &identity, const TableInfo &tableInfo, 428 std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTimestampOnly); 429 430 int DeleteCloudData(const std::string &tableName, const VBucket &vBucket, const TableSchema &tableSchema, 431 const TrackerTable &trackerTable); 432 433 int OnlyUpdateLogTable(const VBucket &vBucket, const TableSchema &tableSchema, OpType opType); 434 435 int BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey, sqlite3_stmt *&stmt); 436 int DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList); 437 int SetDataOnShareTableWithLogicDelete(const std::string &tableName, const std::string &logTableName); 438 int CleanShareTable(const std::string &tableName); 439 440 int GetReferenceGidInner(const std::string &sourceTable, const std::string &targetTable, 441 const CloudSyncBatch &syncBatch, const std::vector<TableReferenceProperty> &targetTableReference, 442 std::map<int64_t, Entries> &referenceGid); 443 444 int GetReferenceGidByStmt(sqlite3_stmt *statement, const CloudSyncBatch &syncBatch, 445 const std::string &targetTable, std::map<int64_t, Entries> &referenceGid); 446 447 static std::string GetReferenceGidSql(const std::string &sourceTable, const std::string &targetTable, 448 const std::vector<std::string> &sourceFields, const std::vector<std::string> &targetFields); 449 450 static std::pair<std::vector<std::string>, std::vector<std::string>> SplitReferenceByField( 451 const std::vector<TableReferenceProperty> &targetTableReference); 452 453 int BindStmtWithCloudGid(const CloudSyncData &cloudDataResult, bool ignoreEmptyGid, sqlite3_stmt *&stmt); 454 455 int GetCloudDeleteSql(const std::string &table, std::string &sql); 456 457 int RemoveDataAndLog(const std::string &tableName, int64_t dataKey); 458 459 int64_t GetLocalDataKey(size_t index, const DownloadData &downloadData); 460 461 int BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid, 462 sqlite3_stmt *&stmt, int &fillGidCount); 463 464 int DoCleanAssetId(const std::string &tableName, const RelationalSchemaObject &localSchema); 465 466 int CleanAssetId(const std::string &tableName, const std::vector<FieldInfo> &fieldInfos, 467 const std::vector<int64_t> &dataKeys); 468 469 int UpdateAssetIdOnUserTable(const std::string &tableName, const std::string &fieldName, 470 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 471 472 int GetAssetsAndUpdateAssetsId(const std::string &tableName, const std::string &fieldName, 473 const std::vector<int64_t> &dataKeys); 474 475 int CleanAssetsIdOnUserTable(const std::string &tableName, const std::string &fieldName, const int64_t rowId, 476 const std::vector<uint8_t> &assetsValue); 477 478 int InitGetAssetStmt(const std::string &sql, const std::string &gid, const Bytes &hashKey, 479 sqlite3_stmt *&stmt); 480 481 int GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets); 482 483 int ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt, int beginIndex, const std::string &cloudGid); 484 485 int CleanDownloadChangedAssets(const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType); 486 487 int GetAndBindFillUploadAssetStatement(const std::string &tableName, const VBucket &assets, 488 sqlite3_stmt *&statement); 489 490 int OnlyUpdateAssetId(const std::string &tableName, const TableSchema &tableSchema, const VBucket &vBucket, 491 int64_t dataKey, OpType opType); 492 493 void UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName, Asset &asset); 494 495 void UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName, Assets &assets); 496 497 int BindAssetToBlobStatement(const Asset &asset, int index, sqlite3_stmt *&stmt); 498 499 int BindAssetsToBlobStatement(const Assets &assets, int index, sqlite3_stmt *&stmt); 500 501 int GetAssetInfoOnTable(sqlite3_stmt *&stmt, const std::vector<Field> &assetFields, VBucket &assetInfo); 502 503 int BindAssetFiledToBlobStatement(const TableSchema &tableSchema, const std::vector<Asset> &assetOfOneRecord, 504 const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt); 505 506 int UpdateAssetsIdForOneRecord(const TableSchema &tableSchema, const std::string &sql, 507 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord); 508 509 bool IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt, const VBucket &vBucket, const Field &field, 510 VBucket &assetInfo, bool &isNotIncCursor); 511 512 bool IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey, const VBucket &vBucket, 513 bool &isNotIncCursor); 514 515 int UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey, const VBucket &vBucket); 516 517 int64_t GetDataFlag(); 518 519 std::string GetUpdateDataFlagSql(const VBucket &data); 520 521 std::string GetDev(); 522 523 std::vector<Field> GetUpdateField(const VBucket &vBucket, const TableSchema &tableSchema); 524 525 int GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields, int startIndex, VBucket &record); 526 527 int FillCloudVersionForUpload(const std::string &tableName, const CloudSyncBatch &batchData); 528 529 int QueryCount(const std::string &tableName, int64_t &count); 530 531 int GetUploadCountInner(const Timestamp ×tamp, SqliteQueryHelper &helper, std::string &sql, int64_t &count); 532 533 int LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket, 534 const TableSchema &tableSchema, const TrackerTable &trackerTable); 535 536 bool AbortGetDownloadAssetGidIfNeed(const TableSchema &tableSchema, const std::string &gid, bool abortWithLimit, 537 uint32_t &count); 538 539 int RecoverNullExtendLogInner(const TrackerSchema &trackerSchema, const TrackerTable &table); 540 541 static constexpr const char *CONSISTENT_FLAG = "0x20"; 542 static constexpr const char *UPDATE_FLAG_CLOUD = "flag = 0"; 543 static constexpr const char *UPDATE_FLAG_WAIT_COMPENSATED_SYNC = "flag = flag | 0x10"; 544 static constexpr const char *FLAG_IS_WAIT_COMPENSATED_SYNC = 545 "(a.flag & 0x10 != 0 and a.status = 0) or a.status = 1"; 546 static constexpr const char *FLAG_IS_WAIT_COMPENSATED_CONTAIN_DOWNLOAD_SYNC = 547 "(a.flag & 0x10 != 0 and a.status = 0) or a.status = 1 or (a.flag & 0x1000 != 0 and a.status = 0)"; 548 549 std::string baseTblName_; 550 TableInfo table_; // Always operating table, user table when get, device table when put. 551 TableSchema tableSchema_; // for cloud table 552 RelationalSchemaObject localSchema_; 553 554 DistributedTableMode mode_; 555 556 std::map<int32_t, std::function<int(int, const VBucket &, const Field &, sqlite3_stmt *)>> bindCloudFieldFuncMap_; 557 std::map<int32_t, std::function<int(const VBucket &, const Field &, std::vector<uint8_t> &)>> toVectorFuncMap_; 558 559 std::atomic<bool> isLogicDelete_; 560 std::shared_ptr<IAssetLoader> assetLoader_; 561 562 PutDataMode putDataMode_; 563 MarkFlagOption markFlagOption_; 564 565 std::atomic<int32_t> maxUploadCount_; 566 std::atomic<int32_t> maxUploadSize_; 567 }; 568 } // namespace DistributedDB 569 #endif 570 #endif // SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H 571