1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "cloud/cloud_upload_recorder.h" 20 #include "cloud/schema_mgr.h" 21 #ifdef USE_FFRT 22 #include "ffrt.h" 23 #endif 24 #include "icloud_sync_storage_interface.h" 25 #include "lru_map.h" 26 #include "relational_db_sync_interface.h" 27 #include "relationaldb_properties.h" 28 #include "sqlite_single_relational_storage_engine.h" 29 #include "sqlite_single_ver_relational_continue_token.h" 30 #include "sync_able_engine.h" 31 32 namespace DistributedDB { 33 using RelationalObserverAction = 34 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData, Origin origin)>; 35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 36 public virtual RefObject { 37 public: 38 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 39 ~RelationalSyncAbleStorage() override; 40 41 // Get interface type of this kvdb. 42 int GetInterfaceType() const override; 43 44 // Get the interface ref-count, in order to access asynchronously. 45 void IncRefCount() override; 46 47 // Drop the interface ref-count. 48 void DecRefCount() override; 49 50 // Get the identifier of this rdb. 51 std::vector<uint8_t> GetIdentifier() const override; 52 53 // Get the dual tuple identifier of this rdb. 54 std::vector<uint8_t> GetDualTupleIdentifier() const override; 55 56 // Get the max timestamp of all entries in database. 57 void GetMaxTimestamp(Timestamp &stamp) const override; 58 59 // Get the max timestamp of one table. 60 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 61 62 // Get meta data associated with the given key. 63 int GetMetaData(const Key &key, Value &value) const override; 64 65 int GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const override; 66 67 // Put meta data as a key-value entry. 68 int PutMetaData(const Key &key, const Value &value) override; 69 70 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 71 72 // Delete multiple meta data records in a transaction. 73 int DeleteMetaData(const std::vector<Key> &keys) override; 74 75 // Delete multiple meta data records with key prefix in a transaction. 76 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 77 78 // Get all meta data keys. 79 int GetAllMetaKeys(std::vector<Key> &keys) const override; 80 81 const RelationalDBProperties &GetDbProperties() const override; 82 83 // Get the data which would be synced with query condition 84 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 85 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 86 std::vector<SingleVerKvEntry *> &entries) const override; 87 88 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 89 const DataSizeSpecInfo &dataSizeInfo) const override; 90 91 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 92 const DeviceID &deviceName) override; 93 94 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 95 96 RelationalSchemaObject GetSchemaInfo() const override; 97 98 int GetSecurityOption(SecurityOption &option) const override; 99 100 void NotifyRemotePushFinished(const std::string &deviceId) const override; 101 102 // Get the timestamp when database created or imported 103 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 104 105 std::vector<QuerySyncObject> GetTablesQuery() override; 106 107 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 108 109 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 110 const std::string &targetID, bool isPush) const override; 111 112 int CheckAndInitQueryCondition(QueryObject &query) const override; 113 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 114 const RelationalObserverAction &action); 115 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 116 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 117 118 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 119 120 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 121 122 void NotifySchemaChanged(); 123 124 void RegisterHeartBeatListener(const std::function<void()> &listener); 125 126 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 127 128 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 129 130 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 131 ContinueToken &token) const override; 132 133 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 134 135 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) const override; 136 137 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 138 139 // recycling the write handle 140 void SetReusedHandle(StorageExecutor *handle); 141 142 int StartTransaction(TransactType type, bool isAsyncDownload = false) override; 143 144 int Commit(bool isAsyncDownload = false) override; 145 146 int Rollback(bool isAsyncDownload = false) override; 147 148 int GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp, bool isCloudForcePush, 149 bool isCompensatedTask, int64_t &count) override; 150 151 int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> ×tampVec, 152 bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override; 153 154 int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime, 155 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 156 157 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 158 159 int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush, 160 bool isCompensatedTask, std::vector<std::string> &cloudGid) override; 161 162 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 163 164 int GetSchemaFromDB(RelationalSchemaObject &schema) override; 165 166 int ChkSchema(const TableName &tableName) override; 167 168 int SetCloudDbSchema(const DataBaseSchema &schema) override; 169 170 int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override; 171 172 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 173 174 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 175 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 176 177 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 178 179 int UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset) override; 180 181 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 182 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 183 184 int ClearCloudLogVersion(const std::vector<std::string> &tableNameList) override; 185 186 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 187 188 int FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 189 190 int SetLogTriggerStatus(bool status) override; 191 192 int SetLogTriggerStatusForAsyncDownload(bool status) override; 193 194 int SetCursorIncFlag(bool flag) override; 195 196 int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override; 197 198 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 199 200 std::string GetIdentify() const override; 201 202 void EraseDataChangeCallback(uint64_t connectionId); 203 204 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 205 206 int CheckQueryValid(const QuerySyncObject &query) override; 207 208 int CreateTempSyncTrigger(const std::string &tableName) override; 209 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override; 210 int ClearAllTempSyncTrigger() override; 211 bool IsSharedTable(const std::string &tableName) override; 212 213 std::map<std::string, std::string> GetSharedTableOriginNames(); 214 215 void SetLogicDelete(bool logicDelete); 216 217 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 218 const Bytes &hashKey, VBucket &assets) override; 219 220 std::pair<int, uint32_t> GetAssetsByGidOrHashKeyForAsyncDownload( 221 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets) override; 222 223 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override; 224 225 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 226 227 int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override; 228 229 int UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict, 230 const LogInfo &logInfo) override; 231 232 int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users, 233 bool isQueryDownloadRecords) override; 234 235 int ClearUnLockingNoNeedCompensated() override; 236 237 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 238 const std::set<std::string> &gidFilters) override; 239 240 int MarkFlagAsAssetAsyncDownload(const std::string &tableName, const DownloadData &downloadData, 241 const std::set<std::string> &gidFilters) override; 242 243 CloudSyncConfig GetCloudSyncConfig() const override; 244 245 void SetCloudSyncConfig(const CloudSyncConfig &config); 246 247 bool IsTableExistReference(const std::string &table) override; 248 249 bool IsTableExistReferenceOrReferenceBy(const std::string &table) override; 250 251 void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override; 252 253 int GetCursor(const std::string &tableName, uint64_t &cursor) override; 254 255 bool IsCurrentLogicDelete() const override; 256 257 int GetLocalDataCount(const std::string &tableName, int &dataCount, int &logicDeleteDataCount) override; 258 259 std::pair<int, std::vector<std::string>> GetDownloadAssetTable() override; 260 261 std::pair<int, std::vector<std::string>> GetDownloadAssetRecords(const std::string &tableName, 262 int64_t beginTime) override; 263 264 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, bool useTransaction, 265 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 266 267 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData, 268 Origin origin); 269 270 void PrintCursorChange(const std::string &tableName) override; 271 272 int GetLockStatusByGid(const std::string &tableName, const std::string &gid, LockStatus &status) override; 273 274 bool IsExistTableContainAssets() override; 275 276 int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override; 277 protected: 278 int FillReferenceData(CloudSyncData &syncData); 279 280 int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 281 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 282 283 int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 284 DownloadData &downloadData); 285 286 virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 287 std::map<int64_t, Entries> &referenceGid); 288 289 int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType, 290 const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid); 291 292 static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid, 293 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend); 294 295 int ReviseLocalModTime(const std::string &tableName, 296 const std::vector<ReviseModTimeInfo> &revisedData) override; 297 298 bool IsSetDistributedSchema(const std::string &tableName, RelationalSchemaObject &schemaObj); 299 300 private: 301 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 302 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 303 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 304 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 305 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransactionForAsyncDownload(bool isWrite, int &errCode, 306 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 307 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 308 309 // get 310 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 311 const DataSizeSpecInfo &dataSizeInfo, RelationalSchemaObject &&filterSchema) const; 312 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 313 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 314 315 int GetTableReference(const std::string &tableName, 316 std::map<std::string, std::vector<TableReferenceProperty>> &reference); 317 318 std::pair<std::string, int> GetSourceTableName(const std::string &tableName); 319 320 std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName); 321 // put 322 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 323 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 324 void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type); 325 StoreInfo GetStoreInfo() const; 326 327 int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 328 const std::vector<VBucket> &records); 329 330 int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 331 const std::vector<VBucket> &records); 332 333 int GetCloudTableWithoutShared(std::vector<TableSchema> &tables); 334 335 int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle, 336 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords); 337 338 int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 339 bool flag = false); 340 341 bool CheckTableSupportCompensatedSync(const TableSchema &table); 342 343 void ExecuteDataChangeCallback( 344 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, 345 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin); 346 347 void SaveCursorChange(const std::string &tableName, uint64_t currCursor); 348 349 int CommitForAsyncDownload(); 350 351 int RollbackForAsyncDownload(); 352 353 int StartTransactionForAsyncDownload(TransactType type); 354 355 // data 356 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 357 std::function<void()> onSchemaChanged_; 358 mutable std::mutex onSchemaChangedMutex_; 359 #ifdef USE_FFRT 360 ffrt::mutex dataChangeDeviceMutex_; 361 #else 362 std::mutex dataChangeDeviceMutex_; 363 #endif 364 std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_; 365 std::function<void()> heartBeatListener_; 366 mutable std::mutex heartBeatMutex_; 367 368 mutable LruMap<std::string, std::string> remoteDeviceSchema_; 369 StorageExecutor *reusedHandle_; 370 mutable std::mutex reusedHandleMutex_; 371 372 // cache securityOption 373 mutable std::mutex securityOptionMutex_; 374 mutable SecurityOption securityOption_; 375 mutable bool isCachedOption_; 376 377 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 378 SQLiteSingleVerRelationalStorageExecutor *asyncDownloadTransactionHandle_ = nullptr; 379 mutable std::shared_mutex transactionMutex_; // used for transaction 380 mutable std::shared_mutex asyncDownloadtransactionMutex_; // used for async download transaction 381 382 SchemaMgr schemaMgr_; 383 mutable std::shared_mutex schemaMgrMutex_; 384 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 385 386 std::atomic<bool> logicDelete_ = false; 387 388 std::function<void (void)> syncFinishFunc_; 389 std::function<void (void)> uploadStartFunc_; 390 391 mutable std::mutex configMutex_; 392 CloudSyncConfig cloudSyncConfig_; 393 394 CloudUploadRecorder uploadRecorder_; 395 396 std::map<std::string, std::pair<uint64_t, uint64_t>> cursorChangeMap_; 397 398 std::mutex cursorChangeMutex_; 399 }; 400 } // namespace DistributedDB 401 #endif 402 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H