1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 18 19 #include "macro_utils.h" 20 #include "db_types.h" 21 #include "query_object.h" 22 #include "sqlite_utils.h" 23 #include "sqlite_storage_executor.h" 24 #include "single_ver_natural_store_commit_notify_data.h" 25 26 namespace DistributedDB { 27 enum class SingleVerDataType { 28 META_TYPE, 29 LOCAL_TYPE, 30 SYNC_TYPE, 31 }; 32 33 enum class DataStatus { 34 NOEXISTED, 35 DELETED, 36 EXISTED, 37 }; 38 39 enum class ExecutorState { 40 INVALID = -1, 41 MAINDB, 42 CACHEDB, 43 MAIN_ATTACH_CACHE, // After process crash and cacheDb existed 44 CACHE_ATTACH_MAIN, // while cacheDb migrating to mainDb 45 }; 46 47 struct DataOperStatus { 48 DataStatus preStatus = DataStatus::NOEXISTED; 49 bool isDeleted = false; 50 bool isDefeated = false; // whether the put data is defeated. 51 }; 52 53 struct SingleVerRecord { 54 Key key; 55 Value value; 56 Timestamp timestamp = 0; 57 uint64_t flag = 0; 58 std::string device; 59 std::string origDevice; 60 Key hashKey; 61 Timestamp writeTimestamp = 0; 62 }; 63 64 struct DeviceInfo { 65 bool isLocal = false; 66 std::string deviceName; 67 }; 68 69 struct LocalDataItem { 70 Key key; 71 Value value; 72 Timestamp timestamp = 0; 73 Key hashKey; 74 uint64_t flag = 0; 75 }; 76 77 struct NotifyConflictAndObserverData { 78 SingleVerNaturalStoreCommitNotifyData *committedData = nullptr; 79 DataItem getData; 80 Key hashKey; 81 DataOperStatus dataStatus; 82 }; 83 84 struct NotifyMigrateSyncData { 85 bool isRemote = false; 86 bool isRemoveDeviceData = false; 87 bool isPermitForceWrite = true; 88 SingleVerNaturalStoreCommitNotifyData *committedData = nullptr; 89 std::vector<Entry> entries{}; 90 }; 91 92 struct SyncDataDevices { 93 std::string origDev; 94 std::string dev; 95 }; 96 97 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor { 98 public: 99 SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb); 100 SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState); 101 ~SQLiteSingleVerStorageExecutor() override; 102 103 // Delete the copy and assign constructors 104 DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor); 105 106 // Get the Kv data according the type(sync, meta, local data). 107 int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp ×tamp) const; 108 109 // Get the sync data record by hash key. 110 int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const; 111 112 // Put the Kv data according the type(meta and the local data). 113 int PutKvData(SingleVerDataType type, const Key &key, const Value &value, 114 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData); 115 116 int GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix, std::vector<Entry> &entries) const; 117 118 int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const; 119 120 int GetCount(QueryObject &queryObj, int &count) const; 121 122 // Get all the meta keys. 123 int GetAllMetaKeys(std::vector<Key> &keys) const; 124 125 int GetAllSyncedEntries(const std::string &hashDev, std::vector<Entry> &entries) const; 126 127 int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo, 128 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true); 129 130 int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, 131 Timestamp ×tamp); 132 133 // delete a row data by hashKey, with no tombstone left. 134 int EraseSyncData(const Key &hashKey); 135 136 int RemoveDeviceData(const std::string &deviceName); 137 138 int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify, uint64_t recordVersion) const; 139 140 void InitCurrentMaxStamp(Timestamp &maxStamp); 141 142 void ReleaseContinueStatement(); 143 144 int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin, 145 Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const; 146 int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin, 147 Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const; 148 149 int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier); 150 151 int OpenResultSet(const Key &keyPrefix, int &count); 152 153 int OpenResultSet(QueryObject &queryObj, int &count); 154 155 int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache, 156 uint32_t cacheLimit, int &count); 157 158 int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache, 159 uint32_t cacheLimit, int &count); 160 161 int ReloadResultSet(const Key &keyPrefix); 162 163 int ReloadResultSet(QueryObject &queryObj); 164 165 int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache, 166 uint32_t cacheLimit, uint32_t cacheStartPos); 167 168 int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache, 169 uint32_t cacheLimit, uint32_t cacheStartPos); 170 171 int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy); 172 173 int GetEntryByRowId(int64_t rowId, Entry &entry); 174 175 void CloseResultSet(); 176 177 int StartTransaction(TransactType type); 178 179 int Commit(); 180 181 int Rollback(); 182 183 bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp ×tamp) const; 184 185 int PrepareForSavingData(SingleVerDataType type); 186 187 int ResetForSavingData(SingleVerDataType type); 188 189 int Reset() override; 190 191 int UpdateLocalDataTimestamp(Timestamp timestamp); 192 193 void SetAttachMetaMode(bool attachMetaMode); 194 195 int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const; 196 197 int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp, 198 uint64_t recordVersion, const QueryObject &query); 199 200 int PrepareForSavingCacheData(SingleVerDataType type); 201 int ResetForSavingCacheData(SingleVerDataType type); 202 203 int MigrateLocalData(); 204 205 int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData, 206 std::vector<DataItem> &dataItems); 207 int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const; 208 209 int GetMaxVersionInCacheDb(uint64_t &maxVersion) const; 210 int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd, 211 const std::string &attachDbAbsPath, EngineState engineState); 212 213 // Clear migrating data. 214 void ClearMigrateData(); 215 216 // Get current max timestamp. 217 int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const; 218 219 void SetConflictResolvePolicy(int policy); 220 221 // Delete multiple meta data records in a transaction. 222 int DeleteMetaData(const std::vector<Key> &keys); 223 // Delete multiple meta data records with key prefix in a transaction. 224 int DeleteMetaDataByPrefixKey(const Key &keyPrefix); 225 226 int CheckIntegrity() const; 227 228 int CheckQueryObjectLegal(QueryObject &queryObj) const; 229 230 int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo); 231 232 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen); 233 234 int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId); 235 236 int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds); 237 238 int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds); 239 240 int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames); 241 242 int RemoveTrigger(const std::vector<std::string> &triggers); 243 244 int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, 245 const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const; 246 247 int ForceCheckPoint() const; 248 249 uint64_t GetLogFileSize() const; 250 251 int GetExistsDevicesFromMeta(std::set<std::string> &devices); 252 253 int UpdateKey(const UpdateKeyCallback &callback); 254 255 private: 256 struct SaveRecordStatements { 257 sqlite3_stmt *queryStatement = nullptr; 258 sqlite3_stmt *insertStatement = nullptr; 259 sqlite3_stmt *updateStatement = nullptr; 260 261 int ResetStatement(); 262 GetDataSaveStatementSaveRecordStatements263 inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const 264 { 265 return isUpdate ? updateStatement : insertStatement; 266 } 267 }; 268 269 struct UpdateContext { 270 int errCode = E_OK; 271 Key newKey; 272 UpdateKeyCallback callback; 273 }; 274 275 void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status, 276 SingleVerNaturalStoreCommitNotifyData *committedData); 277 278 static int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey, 279 const SyncDataDevices &devices, bool isUpdate); 280 281 static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev, 282 const std::string &deviceName); 283 284 static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo, 285 const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData); 286 287 static DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet, 288 const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite = true); 289 290 static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet); 291 292 int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const; 293 294 int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const; 295 296 int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const; 297 298 int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false) 299 const; 300 301 int StepForResultEntries(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries) const; 302 303 int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt); 304 305 int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt); 306 307 int InitResultSetContent(QueryObject &queryObj); 308 309 int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt); 310 311 int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const; 312 313 int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp, 314 SingleVerDataType type); 315 316 int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev, 317 const std::string &deviceName, bool isUpdate); 318 319 int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp); 320 321 int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData, 322 const Key &key, const Value &value); 323 324 int PrepareForSavingData(const std::string &readSql, const std::string &insertSql, 325 const std::string &updateSql, SaveRecordStatements &statements) const; 326 327 int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count); 328 329 int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, 330 uint32_t cacheStartPos, int &count); 331 332 void FinalizeAllStatements(); 333 int ResetSaveSyncStatements(int errCode); 334 335 int BindSyncDataInCacheMode(sqlite3_stmt *statement, 336 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const; 337 338 int BindPrimaryKeySyncDataInCacheMode( 339 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const; 340 341 int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const; 342 343 int BindDevSyncDataInCacheMode(sqlite3_stmt *statement, 344 const std::string &origDev, const std::string &deviceName) const; 345 346 int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const; 347 348 int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem, 349 uint64_t &verInCurCacheDb, bool isCacheDb) const; 350 int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems, 351 uint64_t &verInCurCacheDb, bool isCacheDb) const; 352 int DelCacheDbDataByVersion(uint64_t version) const; 353 354 // use for migrating data 355 int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const; 356 357 // Process timestamp for syncdata in cacheDB when migrating. 358 int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems); 359 360 // Get migrateTimeOffset_. 361 int InitMigrateTimestampOffset(); 362 363 // Get min timestamp of local data in sync_data, cacheDB. 364 int GetMinTimestampInCacheDB(Timestamp &minStamp) const; 365 366 // Prepare conflict notify and commit notify data. 367 int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo, 368 NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite = true); 369 370 // Put observer and conflict data into commit notify when migrating cacheDB. 371 int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo, 372 NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite); 373 374 int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData); 375 376 int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData); 377 378 int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const; 379 380 // Reset migrateSyncStatements_. 381 int ResetForMigrateCacheData(); 382 383 // Init migrating data. 384 int InitMigrateData(); 385 386 int MigrateRmDevData(const DataItem &dataItem) const; 387 int VacuumLocalData() const; 388 389 int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement, 390 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const; 391 392 int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt, 393 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const; 394 395 int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo, 396 std::vector<DataItem> &dataItems); 397 398 int CheckDataWithQuery(std::vector<DataItem> &dataItems); 399 400 int GetExpandedCheckSql(QueryObject query, DataItem &dataItem); 401 402 int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item); 403 404 int CreateFuncUpdateKey(UpdateContext &context, 405 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv), 406 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const; 407 408 static void Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv); 409 410 static void CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv); 411 412 sqlite3_stmt *getSyncStatement_; 413 sqlite3_stmt *getResultRowIdStatement_; 414 sqlite3_stmt *getResultEntryStatement_; 415 SaveRecordStatements saveSyncStatements_; 416 SaveRecordStatements saveLocalStatements_; 417 418 // Used for migrating sync_data. 419 SaveRecordStatements migrateSyncStatements_; 420 bool isTransactionOpen_; 421 bool attachMetaMode_; // true for attach meta mode 422 ExecutorState executorState_; 423 424 // Max timestamp in mainDB. Used for migrating. 425 Timestamp maxTimestampInMainDB_; 426 427 // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating. 428 TimeOffset migrateTimeOffset_; 429 430 // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set, 431 // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful. 432 bool isSyncMigrating_; 433 int conflictResolvePolicy_; 434 }; 435 } // namespace DistributedDB 436 437 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H 438