1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "lru_map.h" 20 #include "relational_db_sync_interface.h" 21 #include "relationaldb_properties.h" 22 #include "runtime_context.h" 23 #include "sqlite_single_relational_storage_engine.h" 24 #include "sqlite_single_ver_relational_continue_token.h" 25 26 namespace DistributedDB { 27 using RelationalObserverAction = std::function<void(const std::string &device)>; 28 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public virtual RefObject { 29 public: 30 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 31 ~RelationalSyncAbleStorage() override; 32 33 // Get interface type of this kvdb. 34 int GetInterfaceType() const override; 35 36 // Get the interface ref-count, in order to access asynchronously. 37 void IncRefCount() override; 38 39 // Drop the interface ref-count. 40 void DecRefCount() override; 41 42 // Get the identifier of this rdb. 43 std::vector<uint8_t> GetIdentifier() const override; 44 45 // Get the dual tuple identifier of this rdb. 46 std::vector<uint8_t> GetDualTupleIdentifier() const override; 47 48 // Get the max timestamp of all entries in database. 49 void GetMaxTimestamp(Timestamp &stamp) const override; 50 51 // Get the max timestamp of one table. 52 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 53 54 // Get meta data associated with the given key. 55 int GetMetaData(const Key &key, Value &value) const override; 56 57 // Put meta data as a key-value entry. 58 int PutMetaData(const Key &key, const Value &value) override; 59 60 // Delete multiple meta data records in a transaction. 61 int DeleteMetaData(const std::vector<Key> &keys) override; 62 63 // Delete multiple meta data records with key prefix in a transaction. 64 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 65 66 // Get all meta data keys. 67 int GetAllMetaKeys(std::vector<Key> &keys) const override; 68 69 const RelationalDBProperties &GetDbProperties() const override; 70 71 // Get the data which would be synced with query condition 72 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 73 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 74 std::vector<SingleVerKvEntry *> &entries) const override; 75 76 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 77 const DataSizeSpecInfo &dataSizeInfo) const override; 78 79 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 80 const DeviceID &deviceName) override; 81 82 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 83 84 RelationalSchemaObject GetSchemaInfo() const override; 85 86 int GetSecurityOption(SecurityOption &option) const override; 87 88 void NotifyRemotePushFinished(const std::string &deviceId) const override; 89 90 // Get the timestamp when database created or imported 91 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 92 93 // Get batch meta data associated with the given key. 94 int GetBatchMetaData(const std::vector<Key> &keys, std::vector<Entry> &entries) const override; 95 // Put batch meta data as a key-value entry vector 96 int PutBatchMetaData(std::vector<Entry> &entries) override; 97 98 std::vector<QuerySyncObject> GetTablesQuery() override; 99 100 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 101 InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID)102 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 103 const std::string &targetID) const override 104 { 105 return E_OK; 106 } 107 108 int CheckAndInitQueryCondition(QueryObject &query) const override; 109 void RegisterObserverAction(const RelationalObserverAction &action); 110 void TriggerObserverAction(const std::string &deviceName); 111 112 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 113 114 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 115 116 void NotifySchemaChanged(); 117 118 void RegisterHeartBeatListener(const std::function<void()> &listener); 119 120 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 121 122 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 123 124 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 125 ContinueToken &token) const override; 126 127 const RelationalDBProperties &GetRelationalDbProperties() const override; 128 129 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 130 private: 131 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 132 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 133 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 134 135 // get 136 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 137 const DataSizeSpecInfo &dataSizeInfo) const; 138 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 139 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 140 141 // put 142 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 143 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 144 145 // data 146 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 147 std::function<void()> onSchemaChanged_; 148 mutable std::mutex onSchemaChangedMutex_; 149 std::mutex dataChangeDeviceMutex_; 150 RelationalObserverAction dataChangeDeviceCallback_; 151 std::function<void()> heartBeatListener_; 152 mutable std::mutex heartBeatMutex_; 153 154 // cache securityOption 155 mutable std::mutex securityOptionMutex_; 156 mutable SecurityOption securityOption_; 157 mutable bool isCachedOption_; 158 }; 159 } // namespace DistributedDB 160 #endif 161 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H