1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "lru_map.h" 20 #include "icloud_sync_storage_interface.h" 21 #include "relational_db_sync_interface.h" 22 #include "relationaldb_properties.h" 23 #include "cloud/schema_mgr.h" 24 #include "sqlite_single_relational_storage_engine.h" 25 #include "sqlite_single_ver_relational_continue_token.h" 26 #include "sync_able_engine.h" 27 28 namespace DistributedDB { 29 using RelationalObserverAction = 30 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>; 31 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 32 public virtual RefObject { 33 public: 34 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 35 ~RelationalSyncAbleStorage() override; 36 37 // Get interface type of this kvdb. 38 int GetInterfaceType() const override; 39 40 // Get the interface ref-count, in order to access asynchronously. 41 void IncRefCount() override; 42 43 // Drop the interface ref-count. 44 void DecRefCount() override; 45 46 // Get the identifier of this rdb. 47 std::vector<uint8_t> GetIdentifier() const override; 48 49 // Get the dual tuple identifier of this rdb. 50 std::vector<uint8_t> GetDualTupleIdentifier() const override; 51 52 // Get the max timestamp of all entries in database. 53 void GetMaxTimestamp(Timestamp &stamp) const override; 54 55 // Get the max timestamp of one table. 56 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 57 58 // Get meta data associated with the given key. 59 int GetMetaData(const Key &key, Value &value) const override; 60 61 // Put meta data as a key-value entry. 62 int PutMetaData(const Key &key, const Value &value) override; 63 64 // Delete multiple meta data records in a transaction. 65 int DeleteMetaData(const std::vector<Key> &keys) override; 66 67 // Delete multiple meta data records with key prefix in a transaction. 68 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 69 70 // Get all meta data keys. 71 int GetAllMetaKeys(std::vector<Key> &keys) const override; 72 73 const RelationalDBProperties &GetDbProperties() const override; 74 75 // Get the data which would be synced with query condition 76 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 77 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 78 std::vector<SingleVerKvEntry *> &entries) const override; 79 80 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 81 const DataSizeSpecInfo &dataSizeInfo) const override; 82 83 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 84 const DeviceID &deviceName) override; 85 86 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 87 88 RelationalSchemaObject GetSchemaInfo() const override; 89 90 int GetSecurityOption(SecurityOption &option) const override; 91 92 void NotifyRemotePushFinished(const std::string &deviceId) const override; 93 94 // Get the timestamp when database created or imported 95 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 96 97 std::vector<QuerySyncObject> GetTablesQuery() override; 98 99 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 100 101 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 102 const std::string &targetID) const override; 103 104 int CheckAndInitQueryCondition(QueryObject &query) const override; 105 void RegisterObserverAction(uint64_t connectionId, const RelationalObserverAction &action); 106 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 107 108 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 109 110 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 111 112 void NotifySchemaChanged(); 113 114 void RegisterHeartBeatListener(const std::function<void()> &listener); 115 116 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 117 118 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 119 120 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 121 ContinueToken &token) const override; 122 123 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 124 125 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override; 126 127 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 128 129 int StartTransaction(TransactType type) override; 130 131 int Commit() override; 132 133 int Rollback() override; 134 135 int GetUploadCount(const std::string &tableName, const Timestamp ×tamp, const bool isCloudForcePush, 136 int64_t &count) override; 137 138 int FillCloudGid(const CloudSyncData &data) override; 139 140 int GetCloudData(const TableSchema &tableSchema, const Timestamp &beginTime, 141 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 142 143 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 144 145 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 146 147 int ChkSchema(const TableName &tableName) override; 148 149 int SetCloudDbSchema(const DataBaseSchema &schema) override; 150 151 int GetCloudDbSchema(DataBaseSchema &cloudSchema) override; 152 153 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 154 155 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 156 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 157 158 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 159 160 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 161 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 162 163 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 164 165 int SetLogTriggerStatus(bool status) override; 166 167 int FillCloudGidAndAsset(OpType opType, const CloudSyncData &data) override; 168 169 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 170 171 std::string GetIdentify() const override; 172 173 void EraseDataChangeCallback(uint64_t connectionId); 174 175 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 176 177 private: 178 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 179 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 180 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 181 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 182 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 183 184 // get 185 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 186 const DataSizeSpecInfo &dataSizeInfo) const; 187 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 188 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 189 190 // put 191 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 192 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 193 194 // data 195 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 196 std::function<void()> onSchemaChanged_; 197 mutable std::mutex onSchemaChangedMutex_; 198 std::mutex dataChangeDeviceMutex_; 199 std::map<uint64_t, RelationalObserverAction> dataChangeCallbackMap_; 200 std::function<void()> heartBeatListener_; 201 mutable std::mutex heartBeatMutex_; 202 203 LruMap<std::string, std::string> remoteDeviceSchema_; 204 205 // cache securityOption 206 mutable std::mutex securityOptionMutex_; 207 mutable SecurityOption securityOption_; 208 mutable bool isCachedOption_; 209 210 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 211 mutable std::shared_mutex transactionMutex_; // used for transaction 212 213 SchemaMgr schemaMgr_; 214 mutable std::shared_mutex schemaMgrMutex_; 215 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 216 }; 217 } // namespace DistributedDB 218 #endif 219 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H