1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 18 19 #include "macro_utils.h" 20 #include "db_types.h" 21 #include "query_object.h" 22 #include "sqlite_utils.h" 23 #include "sqlite_storage_executor.h" 24 #include "single_ver_natural_store_commit_notify_data.h" 25 26 namespace DistributedDB { 27 enum class SingleVerDataType { 28 META_TYPE, 29 LOCAL_TYPE, 30 SYNC_TYPE, 31 }; 32 33 enum class DataStatus { 34 NOEXISTED, 35 DELETED, 36 EXISTED, 37 }; 38 39 enum class ExecutorState { 40 INVALID = -1, 41 MAINDB, 42 CACHEDB, 43 MAIN_ATTACH_CACHE, // After process crash and cacheDb existed 44 CACHE_ATTACH_MAIN, // while cacheDb migrating to mainDb 45 }; 46 47 struct DataOperStatus { 48 DataStatus preStatus = DataStatus::NOEXISTED; 49 bool isDeleted = false; 50 bool isDefeated = false; // whether the put data is defeated. 51 }; 52 53 struct SingleVerRecord { 54 Key key; 55 Value value; 56 Timestamp timestamp = 0; 57 uint64_t flag = 0; 58 std::string device; 59 std::string origDevice; 60 Key hashKey; 61 Timestamp writeTimestamp = 0; 62 }; 63 64 struct DeviceInfo { 65 bool isLocal = false; 66 std::string deviceName; 67 }; 68 69 struct LocalDataItem { 70 Key key; 71 Value value; 72 Timestamp timestamp = 0; 73 Key hashKey; 74 uint64_t flag = 0; 75 }; 76 77 struct NotifyConflictAndObserverData { 78 SingleVerNaturalStoreCommitNotifyData *committedData = nullptr; 79 DataItem getData; 80 Key hashKey; 81 DataOperStatus dataStatus; 82 }; 83 84 struct NotifyMigrateSyncData { 85 bool isRemote = false; 86 bool isRemoveDeviceData = false; 87 bool isPermitForceWrite = true; 88 SingleVerNaturalStoreCommitNotifyData *committedData = nullptr; 89 std::vector<Entry> entries{}; 90 }; 91 92 struct SyncDataDevices { 93 std::string origDev; 94 std::string dev; 95 }; 96 97 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor { 98 public: 99 SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb); 100 SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState); 101 ~SQLiteSingleVerStorageExecutor() override; 102 103 // Delete the copy and assign constructors 104 DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor); 105 106 // Get the Kv data according the type(sync, meta, local data). 107 int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp ×tamp) const; 108 109 // Get the sync data record by hash key. 110 int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const; 111 112 // Put the Kv data according the type(meta and the local data). 113 int PutKvData(SingleVerDataType type, const Key &key, const Value &value, 114 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData); 115 116 int GetEntries(SingleVerDataType type, const Key &keyPrefix, std::vector<Entry> &entries) const; 117 118 int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const; 119 120 int GetCount(QueryObject &queryObj, int &count) const; 121 122 // Get all the meta keys. 123 int GetAllMetaKeys(std::vector<Key> &keys) const; 124 125 int GetAllSyncedEntries(const std::string &hashDev, std::vector<Entry> &entries) const; 126 127 int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo, 128 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true); 129 130 int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, 131 Timestamp ×tamp); 132 133 // delete a row data by hashKey, with no tombstone left. 134 int EraseSyncData(const Key &hashKey); 135 136 int RemoveDeviceData(const std::string &deviceName); 137 138 int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify, uint64_t recordVersion) const; 139 140 void InitCurrentMaxStamp(Timestamp &maxStamp); 141 142 void ReleaseContinueStatement(); 143 144 int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin, 145 Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const; 146 int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin, 147 Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const; 148 149 int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier); 150 151 int OpenResultSet(const Key &keyPrefix, int &count); 152 153 int OpenResultSet(QueryObject &queryObj, int &count); 154 155 int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache, 156 uint32_t cacheLimit, int &count); 157 158 int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache, 159 uint32_t cacheLimit, int &count); 160 161 int ReloadResultSet(const Key &keyPrefix); 162 163 int ReloadResultSet(QueryObject &queryObj); 164 165 int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache, 166 uint32_t cacheLimit, uint32_t cacheStartPos); 167 168 int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache, 169 uint32_t cacheLimit, uint32_t cacheStartPos); 170 171 int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy); 172 173 int GetEntryByRowId(int64_t rowId, Entry &entry); 174 175 void CloseResultSet(); 176 177 int StartTransaction(TransactType type); 178 179 int Commit(); 180 181 int Rollback(); 182 183 bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp ×tamp) const; 184 185 int PrepareForSavingData(SingleVerDataType type); 186 187 int ResetForSavingData(SingleVerDataType type); 188 189 int Reset() override; 190 191 int UpdateLocalDataTimestamp(Timestamp timestamp); 192 193 void SetAttachMetaMode(bool attachMetaMode); 194 195 int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const; 196 197 int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp, 198 uint64_t recordVersion, const QueryObject &query); 199 200 int PrepareForSavingCacheData(SingleVerDataType type); 201 int ResetForSavingCacheData(SingleVerDataType type); 202 203 int MigrateLocalData(); 204 205 int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData, 206 std::vector<DataItem> &dataItems); 207 int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &maxVerIncurCacheDb) const; 208 209 int GetMaxVersionInCacheDb(uint64_t &maxVersion) const; 210 int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd, 211 const std::string &attachDbAbsPath, EngineState engineState); 212 213 // Clear migrating data. 214 void ClearMigrateData(); 215 216 // Get current max timestamp. 217 int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const; 218 219 void SetConflictResolvePolicy(int policy); 220 221 // Delete multiple meta data records in a transaction. 222 int DeleteMetaData(const std::vector<Key> &keys); 223 // Delete multiple meta data records with key prefix in a transaction. 224 int DeleteMetaDataByPrefixKey(const Key &keyPrefix); 225 226 int CheckIntegrity() const; 227 228 int CheckQueryObjectLegal(QueryObject &queryObj) const; 229 230 int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo); 231 232 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen); 233 234 int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId); 235 236 int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds); 237 238 int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds); 239 240 int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames); 241 242 int RemoveTrigger(const std::vector<std::string> &triggers); 243 244 int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, 245 const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const; 246 247 int ForceCheckPoint() const; 248 249 uint64_t GetLogFileSize() const; 250 251 int UpdateKey(const UpdateKeyCallback &callback); 252 253 int GetExistsDevicesFromMeta(std::set<std::string> &devices) const; 254 private: 255 struct SaveRecordStatements { 256 sqlite3_stmt *queryStatement = nullptr; 257 sqlite3_stmt *insertStatement = nullptr; 258 sqlite3_stmt *updateStatement = nullptr; 259 260 int ResetStatement(); 261 GetDataSaveStatementSaveRecordStatements262 inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const 263 { 264 return isUpdate ? updateStatement : insertStatement; 265 } 266 }; 267 268 struct UpdateContext { 269 int errCode = E_OK; 270 Key newKey; 271 UpdateKeyCallback callback; 272 }; 273 274 void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status, 275 const Key &hashKey, SingleVerNaturalStoreCommitNotifyData *committedData); 276 277 static int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey, 278 const SyncDataDevices &devices, bool isUpdate); 279 280 static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev, 281 const std::string &deviceName); 282 283 static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo, 284 const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData); 285 286 static DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet, 287 const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite = true); 288 289 static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet); 290 291 int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const; 292 293 int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const; 294 295 int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const; 296 297 int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false) 298 const; 299 300 int StepForResultEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const; 301 302 int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt); 303 304 int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt); 305 306 int InitResultSetContent(QueryObject &queryObj); 307 308 int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt); 309 310 int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const; 311 312 int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp, 313 SingleVerDataType type); 314 315 int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev, 316 const std::string &deviceName, bool isUpdate); 317 318 int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp); 319 320 int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData, 321 const Key &key, const Value &value); 322 323 int PrepareForSavingData(const std::string &readSql, const std::string &insertSql, 324 const std::string &updateSql, SaveRecordStatements &statements) const; 325 326 int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count); 327 328 int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, 329 uint32_t cacheStartPos, int &count); 330 331 void FinalizeAllStatements(); 332 int ResetSaveSyncStatements(int errCode); 333 334 int BindSyncDataInCacheMode(sqlite3_stmt *statement, 335 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const; 336 337 int BindPrimaryKeySyncDataInCacheMode( 338 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const; 339 340 int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const; 341 342 int BindDevSyncDataInCacheMode(sqlite3_stmt *statement, 343 const std::string &origDev, const std::string &deviceName) const; 344 345 int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const; 346 347 int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem, 348 uint64_t &verInCurCacheDb, bool isCacheDb) const; 349 int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems, 350 uint64_t &verInCurCacheDb, bool isCacheDb) const; 351 int DelCacheDbDataByVersion(uint64_t version) const; 352 353 // use for migrating data 354 int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const; 355 356 // Process timestamp for syncdata in cacheDB when migrating. 357 int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems); 358 359 // Get migrateTimeOffset_. 360 int InitMigrateTimestampOffset(); 361 362 // Get min timestamp of local data in sync_data, cacheDB. 363 int GetMinTimestampInCacheDB(Timestamp &minStamp) const; 364 365 // Prepare conflict notify and commit notify data. 366 int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo, 367 NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite = true); 368 369 // Put observer and conflict data into commit notify when migrating cacheDB. 370 int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo, 371 NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite); 372 373 int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData); 374 375 int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData); 376 377 int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const; 378 379 // Reset migrateSyncStatements_. 380 int ResetForMigrateCacheData(); 381 382 // Init migrating data. 383 int InitMigrateData(); 384 385 int MigrateRmDevData(const DataItem &dataItem) const; 386 int VacuumLocalData() const; 387 388 int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement, 389 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const; 390 391 int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt, 392 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const; 393 394 int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo, 395 std::vector<DataItem> &dataItems); 396 397 int CheckDataWithQuery(std::vector<DataItem> &dataItems); 398 399 int GetExpandedCheckSql(QueryObject query, DataItem &dataItem); 400 401 int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item); 402 403 int CreateFuncUpdateKey(UpdateContext &context, 404 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv), 405 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const; 406 407 static void Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv); 408 409 static void CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv); 410 411 sqlite3_stmt *getSyncStatement_; 412 sqlite3_stmt *getResultRowIdStatement_; 413 sqlite3_stmt *getResultEntryStatement_; 414 SaveRecordStatements saveSyncStatements_; 415 SaveRecordStatements saveLocalStatements_; 416 417 // Used for migrating sync_data. 418 SaveRecordStatements migrateSyncStatements_; 419 bool isTransactionOpen_; 420 bool attachMetaMode_; // true for attach meta mode 421 ExecutorState executorState_; 422 423 // Max timestamp in mainDB. Used for migrating. 424 Timestamp maxTimestampInMainDB_; 425 426 // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating. 427 TimeOffset migrateTimeOffset_; 428 429 // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set, 430 // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful. 431 bool isSyncMigrating_; 432 int conflictResolvePolicy_; 433 }; 434 } // namespace DistributedDB 435 436 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 437