1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "cloud/cloud_upload_recorder.h" 20 #include "cloud/schema_mgr.h" 21 #include "icloud_sync_storage_interface.h" 22 #include "lru_map.h" 23 #include "relational_db_sync_interface.h" 24 #include "relationaldb_properties.h" 25 #include "sqlite_single_relational_storage_engine.h" 26 #include "sqlite_single_ver_relational_continue_token.h" 27 #include "sync_able_engine.h" 28 29 namespace DistributedDB { 30 using RelationalObserverAction = 31 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>; 32 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 33 public virtual RefObject { 34 public: 35 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 36 ~RelationalSyncAbleStorage() override; 37 38 // Get interface type of this kvdb. 39 int GetInterfaceType() const override; 40 41 // Get the interface ref-count, in order to access asynchronously. 42 void IncRefCount() override; 43 44 // Drop the interface ref-count. 45 void DecRefCount() override; 46 47 // Get the identifier of this rdb. 48 std::vector<uint8_t> GetIdentifier() const override; 49 50 // Get the dual tuple identifier of this rdb. 51 std::vector<uint8_t> GetDualTupleIdentifier() const override; 52 53 // Get the max timestamp of all entries in database. 54 void GetMaxTimestamp(Timestamp &stamp) const override; 55 56 // Get the max timestamp of one table. 57 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 58 59 // Get meta data associated with the given key. 60 int GetMetaData(const Key &key, Value &value) const override; 61 62 // Put meta data as a key-value entry. 63 int PutMetaData(const Key &key, const Value &value) override; 64 65 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 66 67 // Delete multiple meta data records in a transaction. 68 int DeleteMetaData(const std::vector<Key> &keys) override; 69 70 // Delete multiple meta data records with key prefix in a transaction. 71 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 72 73 // Get all meta data keys. 74 int GetAllMetaKeys(std::vector<Key> &keys) const override; 75 76 const RelationalDBProperties &GetDbProperties() const override; 77 78 // Get the data which would be synced with query condition 79 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 80 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 81 std::vector<SingleVerKvEntry *> &entries) const override; 82 83 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 84 const DataSizeSpecInfo &dataSizeInfo) const override; 85 86 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 87 const DeviceID &deviceName) override; 88 89 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 90 91 RelationalSchemaObject GetSchemaInfo() const override; 92 93 int GetSecurityOption(SecurityOption &option) const override; 94 95 void NotifyRemotePushFinished(const std::string &deviceId) const override; 96 97 // Get the timestamp when database created or imported 98 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 99 100 std::vector<QuerySyncObject> GetTablesQuery() override; 101 102 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 103 104 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 105 const std::string &targetID, bool isPush) const override; 106 107 int CheckAndInitQueryCondition(QueryObject &query) const override; 108 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 109 const RelationalObserverAction &action); 110 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 111 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 112 113 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 114 115 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 116 117 void NotifySchemaChanged(); 118 119 void RegisterHeartBeatListener(const std::function<void()> &listener); 120 121 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 122 123 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 124 125 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 126 ContinueToken &token) const override; 127 128 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 129 130 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override; 131 132 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 133 134 // recycling the write handle 135 void SetReusedHandle(StorageExecutor *handle); 136 137 int StartTransaction(TransactType type) override; 138 139 int Commit() override; 140 141 int Rollback() override; 142 143 int GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp, bool isCloudForcePush, 144 bool isCompensatedTask, int64_t &count) override; 145 146 int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> ×tampVec, 147 bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override; 148 149 int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime, 150 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 151 152 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 153 154 int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush, 155 bool isCompensatedTask, std::vector<std::string> &cloudGid) override; 156 157 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 158 159 int GetSchemaFromDB(RelationalSchemaObject &schema) override; 160 161 int ChkSchema(const TableName &tableName) override; 162 163 int SetCloudDbSchema(const DataBaseSchema &schema) override; 164 165 int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override; 166 167 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 168 169 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 170 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 171 172 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 173 174 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 175 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 176 177 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 178 179 int SetLogTriggerStatus(bool status) override; 180 181 int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override; 182 183 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 184 185 std::string GetIdentify() const override; 186 187 void EraseDataChangeCallback(uint64_t connectionId); 188 189 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 190 191 int CheckQueryValid(const QuerySyncObject &query) override; 192 193 int CreateTempSyncTrigger(const std::string &tableName) override; 194 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override; 195 int ClearAllTempSyncTrigger() override; 196 bool IsSharedTable(const std::string &tableName) override; 197 198 std::map<std::string, std::string> GetSharedTableOriginNames(); 199 200 void SetLogicDelete(bool logicDelete); 201 202 void SetCloudTaskConfig(const CloudTaskConfig &config) override; 203 204 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 205 const Bytes &hashKey, VBucket &assets) override; 206 207 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override; 208 209 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 210 211 int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override; 212 213 int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery) override; 214 215 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 216 const std::set<std::string> &gidFilters) override; 217 218 CloudSyncConfig GetCloudSyncConfig() const override; 219 220 void SetCloudSyncConfig(const CloudSyncConfig &config); 221 222 bool IsTableExistReference(const std::string &table) override; 223 224 bool IsTableExistReferenceOrReferenceBy(const std::string &table) override; 225 226 void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override; 227 protected: 228 int FillReferenceData(CloudSyncData &syncData); 229 230 int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 231 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 232 233 int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 234 DownloadData &downloadData); 235 236 virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 237 std::map<int64_t, Entries> &referenceGid); 238 239 int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType, 240 const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid); 241 242 int UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 243 const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock = false); 244 245 static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid, 246 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend); 247 248 private: 249 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 250 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 251 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 252 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 253 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 254 255 // get 256 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 257 const DataSizeSpecInfo &dataSizeInfo) const; 258 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 259 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 260 261 int GetTableReference(const std::string &tableName, 262 std::map<std::string, std::vector<TableReferenceProperty>> &reference); 263 264 std::pair<std::string, int> GetSourceTableName(const std::string &tableName); 265 266 std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName); 267 // put 268 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 269 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 270 void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type); 271 StoreInfo GetStoreInfo() const; 272 273 bool IsCurrentLogicDelete() const; 274 275 int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 276 const std::vector<VBucket> &records); 277 278 int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 279 const std::vector<VBucket> &records); 280 281 int GetCloudTableWithoutShared(std::vector<TableSchema> &tables); 282 283 int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle, 284 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery); 285 286 int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName); 287 288 bool CheckTableSupportCompensatedSync(const TableSchema &table); 289 290 void ExecuteDataChangeCallback( 291 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, 292 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt); 293 // data 294 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 295 std::function<void()> onSchemaChanged_; 296 mutable std::mutex onSchemaChangedMutex_; 297 std::mutex dataChangeDeviceMutex_; 298 std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_; 299 std::function<void()> heartBeatListener_; 300 mutable std::mutex heartBeatMutex_; 301 302 LruMap<std::string, std::string> remoteDeviceSchema_; 303 StorageExecutor *reusedHandle_; 304 mutable std::mutex reusedHandleMutex_; 305 306 // cache securityOption 307 mutable std::mutex securityOptionMutex_; 308 mutable SecurityOption securityOption_; 309 mutable bool isCachedOption_; 310 311 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 312 mutable std::shared_mutex transactionMutex_; // used for transaction 313 314 SchemaMgr schemaMgr_; 315 mutable std::shared_mutex schemaMgrMutex_; 316 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 317 318 std::atomic<bool> logicDelete_ = false; 319 std::atomic<bool> allowLogicDelete_ = false; 320 321 std::function<void (void)> syncFinishFunc_; 322 std::function<void (void)> uploadStartFunc_; 323 324 mutable std::mutex configMutex_; 325 CloudSyncConfig cloudSyncConfig_; 326 327 CloudUploadRecorder uploadRecorder_; 328 }; 329 } // namespace DistributedDB 330 #endif 331 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H