1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "cloud/cloud_upload_recorder.h" 20 #include "cloud/schema_mgr.h" 21 #ifdef USE_FFRT 22 #include "ffrt.h" 23 #endif 24 #include "icloud_sync_storage_interface.h" 25 #include "lru_map.h" 26 #include "relational_db_sync_interface.h" 27 #include "relationaldb_properties.h" 28 #include "sqlite_single_relational_storage_engine.h" 29 #include "sqlite_single_ver_relational_continue_token.h" 30 #include "sync_able_engine.h" 31 32 namespace DistributedDB { 33 using RelationalObserverAction = 34 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>; 35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 36 public virtual RefObject { 37 public: 38 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 39 ~RelationalSyncAbleStorage() override; 40 41 // Get interface type of this kvdb. 42 int GetInterfaceType() const override; 43 44 // Get the interface ref-count, in order to access asynchronously. 45 void IncRefCount() override; 46 47 // Drop the interface ref-count. 48 void DecRefCount() override; 49 50 // Get the identifier of this rdb. 51 std::vector<uint8_t> GetIdentifier() const override; 52 53 // Get the dual tuple identifier of this rdb. 54 std::vector<uint8_t> GetDualTupleIdentifier() const override; 55 56 // Get the max timestamp of all entries in database. 57 void GetMaxTimestamp(Timestamp &stamp) const override; 58 59 // Get the max timestamp of one table. 60 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 61 62 // Get meta data associated with the given key. 63 int GetMetaData(const Key &key, Value &value) const override; 64 65 // Put meta data as a key-value entry. 66 int PutMetaData(const Key &key, const Value &value) override; 67 68 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 69 70 // Delete multiple meta data records in a transaction. 71 int DeleteMetaData(const std::vector<Key> &keys) override; 72 73 // Delete multiple meta data records with key prefix in a transaction. 74 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 75 76 // Get all meta data keys. 77 int GetAllMetaKeys(std::vector<Key> &keys) const override; 78 79 const RelationalDBProperties &GetDbProperties() const override; 80 81 // Get the data which would be synced with query condition 82 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 83 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 84 std::vector<SingleVerKvEntry *> &entries) const override; 85 86 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 87 const DataSizeSpecInfo &dataSizeInfo) const override; 88 89 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 90 const DeviceID &deviceName) override; 91 92 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 93 94 RelationalSchemaObject GetSchemaInfo() const override; 95 96 int GetSecurityOption(SecurityOption &option) const override; 97 98 void NotifyRemotePushFinished(const std::string &deviceId) const override; 99 100 // Get the timestamp when database created or imported 101 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 102 103 std::vector<QuerySyncObject> GetTablesQuery() override; 104 105 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 106 107 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 108 const std::string &targetID, bool isPush) const override; 109 110 int CheckAndInitQueryCondition(QueryObject &query) const override; 111 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 112 const RelationalObserverAction &action); 113 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 114 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 115 116 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 117 118 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 119 120 void NotifySchemaChanged(); 121 122 void RegisterHeartBeatListener(const std::function<void()> &listener); 123 124 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 125 126 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 127 128 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 129 ContinueToken &token) const override; 130 131 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 132 133 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override; 134 135 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 136 137 // recycling the write handle 138 void SetReusedHandle(StorageExecutor *handle); 139 140 int StartTransaction(TransactType type) override; 141 142 int Commit() override; 143 144 int Rollback() override; 145 146 int GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp, bool isCloudForcePush, 147 bool isCompensatedTask, int64_t &count) override; 148 149 int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> ×tampVec, 150 bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override; 151 152 int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime, 153 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 154 155 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 156 157 int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush, 158 bool isCompensatedTask, std::vector<std::string> &cloudGid) override; 159 160 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 161 162 int GetSchemaFromDB(RelationalSchemaObject &schema) override; 163 164 int ChkSchema(const TableName &tableName) override; 165 166 int SetCloudDbSchema(const DataBaseSchema &schema) override; 167 168 int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override; 169 170 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 171 172 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 173 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 174 175 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 176 177 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 178 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 179 180 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 181 182 int SetLogTriggerStatus(bool status) override; 183 int SetCursorIncFlag(bool flag) override; 184 185 int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override; 186 187 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 188 189 std::string GetIdentify() const override; 190 191 void EraseDataChangeCallback(uint64_t connectionId); 192 193 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 194 195 int CheckQueryValid(const QuerySyncObject &query) override; 196 197 int CreateTempSyncTrigger(const std::string &tableName) override; 198 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override; 199 int ClearAllTempSyncTrigger() override; 200 bool IsSharedTable(const std::string &tableName) override; 201 202 std::map<std::string, std::string> GetSharedTableOriginNames(); 203 204 void SetLogicDelete(bool logicDelete); 205 206 void SetCloudTaskConfig(const CloudTaskConfig &config) override; 207 208 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 209 const Bytes &hashKey, VBucket &assets) override; 210 211 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override; 212 213 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 214 215 int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override; 216 217 int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users) override; 218 219 int ClearUnLockingNoNeedCompensated() override; 220 221 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 222 const std::set<std::string> &gidFilters) override; 223 224 CloudSyncConfig GetCloudSyncConfig() const override; 225 226 void SetCloudSyncConfig(const CloudSyncConfig &config); 227 228 bool IsTableExistReference(const std::string &table) override; 229 230 bool IsTableExistReferenceOrReferenceBy(const std::string &table) override; 231 232 void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override; 233 protected: 234 int FillReferenceData(CloudSyncData &syncData); 235 236 int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 237 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 238 239 int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 240 DownloadData &downloadData); 241 242 virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 243 std::map<int64_t, Entries> &referenceGid); 244 245 int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType, 246 const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid); 247 248 int UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 249 const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock = false); 250 251 static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid, 252 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend); 253 254 private: 255 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 256 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 257 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 258 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 259 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 260 261 // get 262 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 263 const DataSizeSpecInfo &dataSizeInfo) const; 264 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 265 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 266 267 int GetTableReference(const std::string &tableName, 268 std::map<std::string, std::vector<TableReferenceProperty>> &reference); 269 270 std::pair<std::string, int> GetSourceTableName(const std::string &tableName); 271 272 std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName); 273 // put 274 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 275 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 276 void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type); 277 StoreInfo GetStoreInfo() const; 278 279 bool IsCurrentLogicDelete() const; 280 281 int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 282 const std::vector<VBucket> &records); 283 284 int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 285 const std::vector<VBucket> &records); 286 287 int GetCloudTableWithoutShared(std::vector<TableSchema> &tables); 288 289 int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle, 290 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery); 291 292 int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 293 bool flag = false); 294 295 bool CheckTableSupportCompensatedSync(const TableSchema &table); 296 297 void ExecuteDataChangeCallback( 298 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, 299 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt); 300 // data 301 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 302 std::function<void()> onSchemaChanged_; 303 mutable std::mutex onSchemaChangedMutex_; 304 #ifdef USE_FFRT 305 ffrt::mutex dataChangeDeviceMutex_; 306 #else 307 std::mutex dataChangeDeviceMutex_; 308 #endif 309 std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_; 310 std::function<void()> heartBeatListener_; 311 mutable std::mutex heartBeatMutex_; 312 313 LruMap<std::string, std::string> remoteDeviceSchema_; 314 StorageExecutor *reusedHandle_; 315 mutable std::mutex reusedHandleMutex_; 316 317 // cache securityOption 318 mutable std::mutex securityOptionMutex_; 319 mutable SecurityOption securityOption_; 320 mutable bool isCachedOption_; 321 322 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 323 mutable std::shared_mutex transactionMutex_; // used for transaction 324 325 SchemaMgr schemaMgr_; 326 mutable std::shared_mutex schemaMgrMutex_; 327 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 328 329 std::atomic<bool> logicDelete_ = false; 330 std::atomic<bool> allowLogicDelete_ = false; 331 332 std::function<void (void)> syncFinishFunc_; 333 std::function<void (void)> uploadStartFunc_; 334 335 mutable std::mutex configMutex_; 336 CloudSyncConfig cloudSyncConfig_; 337 338 CloudUploadRecorder uploadRecorder_; 339 }; 340 } // namespace DistributedDB 341 #endif 342 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H 343