1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "cloud/cloud_upload_recorder.h" 20 #include "cloud/schema_mgr.h" 21 #ifdef USE_FFRT 22 #include "ffrt.h" 23 #endif 24 #include "icloud_sync_storage_interface.h" 25 #include "lru_map.h" 26 #include "relational_db_sync_interface.h" 27 #include "relationaldb_properties.h" 28 #include "sqlite_single_relational_storage_engine.h" 29 #include "sqlite_single_ver_relational_continue_token.h" 30 #include "sync_able_engine.h" 31 32 namespace DistributedDB { 33 using RelationalObserverAction = 34 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData, Origin origin)>; 35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 36 public virtual RefObject { 37 public: 38 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 39 ~RelationalSyncAbleStorage() override; 40 41 // Get interface type of this kvdb. 42 int GetInterfaceType() const override; 43 44 // Get the interface ref-count, in order to access asynchronously. 45 void IncRefCount() override; 46 47 // Drop the interface ref-count. 48 void DecRefCount() override; 49 50 // Get the identifier of this rdb. 51 std::vector<uint8_t> GetIdentifier() const override; 52 53 // Get the dual tuple identifier of this rdb. 54 std::vector<uint8_t> GetDualTupleIdentifier() const override; 55 56 // Get the max timestamp of all entries in database. 57 void GetMaxTimestamp(Timestamp &stamp) const override; 58 59 // Get the max timestamp of one table. 60 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 61 62 // Get meta data associated with the given key. 63 int GetMetaData(const Key &key, Value &value) const override; 64 65 // Put meta data as a key-value entry. 66 int PutMetaData(const Key &key, const Value &value) override; 67 68 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 69 70 // Delete multiple meta data records in a transaction. 71 int DeleteMetaData(const std::vector<Key> &keys) override; 72 73 // Delete multiple meta data records with key prefix in a transaction. 74 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 75 76 // Get all meta data keys. 77 int GetAllMetaKeys(std::vector<Key> &keys) const override; 78 79 const RelationalDBProperties &GetDbProperties() const override; 80 81 // Get the data which would be synced with query condition 82 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 83 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 84 std::vector<SingleVerKvEntry *> &entries) const override; 85 86 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 87 const DataSizeSpecInfo &dataSizeInfo) const override; 88 89 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 90 const DeviceID &deviceName) override; 91 92 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 93 94 RelationalSchemaObject GetSchemaInfo() const override; 95 96 int GetSecurityOption(SecurityOption &option) const override; 97 98 void NotifyRemotePushFinished(const std::string &deviceId) const override; 99 100 // Get the timestamp when database created or imported 101 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 102 103 std::vector<QuerySyncObject> GetTablesQuery() override; 104 105 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 106 107 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 108 const std::string &targetID, bool isPush) const override; 109 110 int CheckAndInitQueryCondition(QueryObject &query) const override; 111 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 112 const RelationalObserverAction &action); 113 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 114 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 115 116 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 117 118 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 119 120 void NotifySchemaChanged(); 121 122 void RegisterHeartBeatListener(const std::function<void()> &listener); 123 124 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 125 126 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 127 128 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 129 ContinueToken &token) const override; 130 131 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 132 133 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) const override; 134 135 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 136 137 // recycling the write handle 138 void SetReusedHandle(StorageExecutor *handle); 139 140 int StartTransaction(TransactType type) override; 141 142 int Commit() override; 143 144 int Rollback() override; 145 146 int GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp, bool isCloudForcePush, 147 bool isCompensatedTask, int64_t &count) override; 148 149 int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> ×tampVec, 150 bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override; 151 152 int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime, 153 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 154 155 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 156 157 int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush, 158 bool isCompensatedTask, std::vector<std::string> &cloudGid) override; 159 160 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 161 162 int GetSchemaFromDB(RelationalSchemaObject &schema) override; 163 164 int ChkSchema(const TableName &tableName) override; 165 166 int SetCloudDbSchema(const DataBaseSchema &schema) override; 167 168 int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override; 169 170 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 171 172 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 173 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 174 175 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 176 177 int UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset) override; 178 179 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 180 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 181 182 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 183 184 int FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 185 186 int SetLogTriggerStatus(bool status) override; 187 188 int SetLogTriggerStatusForAsyncDownload(bool status) override; 189 190 int SetCursorIncFlag(bool flag) override; 191 192 int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override; 193 194 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 195 196 std::string GetIdentify() const override; 197 198 void EraseDataChangeCallback(uint64_t connectionId); 199 200 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 201 202 int CheckQueryValid(const QuerySyncObject &query) override; 203 204 int CreateTempSyncTrigger(const std::string &tableName) override; 205 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override; 206 int ClearAllTempSyncTrigger() override; 207 bool IsSharedTable(const std::string &tableName) override; 208 209 std::map<std::string, std::string> GetSharedTableOriginNames(); 210 211 void SetLogicDelete(bool logicDelete); 212 213 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 214 const Bytes &hashKey, VBucket &assets) override; 215 216 std::pair<int, uint32_t> GetAssetsByGidOrHashKeyForAsyncDownload( 217 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets) override; 218 219 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override; 220 221 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 222 223 int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override; 224 225 int UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict, 226 const LogInfo &logInfo) override; 227 228 int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users, 229 bool isQueryDownloadRecords) override; 230 231 int ClearUnLockingNoNeedCompensated() override; 232 233 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 234 const std::set<std::string> &gidFilters) override; 235 236 int MarkFlagAsAssetAsyncDownload(const std::string &tableName, const DownloadData &downloadData, 237 const std::set<std::string> &gidFilters) override; 238 239 CloudSyncConfig GetCloudSyncConfig() const override; 240 241 void SetCloudSyncConfig(const CloudSyncConfig &config); 242 243 bool IsTableExistReference(const std::string &table) override; 244 245 bool IsTableExistReferenceOrReferenceBy(const std::string &table) override; 246 247 void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override; 248 249 int GetCursor(const std::string &tableName, uint64_t &cursor) override; 250 251 bool IsCurrentLogicDelete() const override; 252 253 int GetLocalDataCount(const std::string &tableName, int &dataCount, int &logicDeleteDataCount) override; 254 255 std::pair<int, std::vector<std::string>> GetDownloadAssetTable() override; 256 257 std::pair<int, std::vector<std::string>> GetDownloadAssetRecords(const std::string &tableName, 258 int64_t beginTime) override; 259 260 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, bool useTransaction, 261 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 262 263 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData, 264 Origin origin); 265 266 void PrintCursorChange(const std::string &tableName) override; 267 268 int GetLockStatusByGid(const std::string &tableName, const std::string &gid, LockStatus &status) override; 269 270 bool IsExistTableContainAssets() override; 271 protected: 272 int FillReferenceData(CloudSyncData &syncData); 273 274 int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 275 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 276 277 int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 278 DownloadData &downloadData); 279 280 virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 281 std::map<int64_t, Entries> &referenceGid); 282 283 int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType, 284 const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid); 285 286 static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid, 287 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend); 288 289 int ReviseLocalModTime(const std::string &tableName, 290 const std::vector<ReviseModTimeInfo> &revisedData) override; 291 292 private: 293 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 294 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 295 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 296 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 297 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 298 299 // get 300 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 301 const DataSizeSpecInfo &dataSizeInfo, RelationalSchemaObject &&filterSchema) const; 302 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 303 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 304 305 int GetTableReference(const std::string &tableName, 306 std::map<std::string, std::vector<TableReferenceProperty>> &reference); 307 308 std::pair<std::string, int> GetSourceTableName(const std::string &tableName); 309 310 std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName); 311 // put 312 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 313 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 314 void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type); 315 StoreInfo GetStoreInfo() const; 316 317 int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 318 const std::vector<VBucket> &records); 319 320 int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 321 const std::vector<VBucket> &records); 322 323 int GetCloudTableWithoutShared(std::vector<TableSchema> &tables); 324 325 int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle, 326 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords); 327 328 int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 329 bool flag = false); 330 331 bool CheckTableSupportCompensatedSync(const TableSchema &table); 332 333 void ExecuteDataChangeCallback( 334 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, 335 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin); 336 337 void SaveCursorChange(const std::string &tableName, uint64_t currCursor); 338 339 // data 340 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 341 std::function<void()> onSchemaChanged_; 342 mutable std::mutex onSchemaChangedMutex_; 343 #ifdef USE_FFRT 344 ffrt::mutex dataChangeDeviceMutex_; 345 #else 346 std::mutex dataChangeDeviceMutex_; 347 #endif 348 std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_; 349 std::function<void()> heartBeatListener_; 350 mutable std::mutex heartBeatMutex_; 351 352 mutable LruMap<std::string, std::string> remoteDeviceSchema_; 353 StorageExecutor *reusedHandle_; 354 mutable std::mutex reusedHandleMutex_; 355 356 // cache securityOption 357 mutable std::mutex securityOptionMutex_; 358 mutable SecurityOption securityOption_; 359 mutable bool isCachedOption_; 360 361 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 362 mutable std::shared_mutex transactionMutex_; // used for transaction 363 364 SchemaMgr schemaMgr_; 365 mutable std::shared_mutex schemaMgrMutex_; 366 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 367 368 std::atomic<bool> logicDelete_ = false; 369 370 std::function<void (void)> syncFinishFunc_; 371 std::function<void (void)> uploadStartFunc_; 372 373 mutable std::mutex configMutex_; 374 CloudSyncConfig cloudSyncConfig_; 375 376 CloudUploadRecorder uploadRecorder_; 377 378 std::map<std::string, std::pair<uint64_t, uint64_t>> cursorChangeMap_; 379 }; 380 } // namespace DistributedDB 381 #endif 382 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H