• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "lru_map.h"
20 #include "icloud_sync_storage_interface.h"
21 #include "relational_db_sync_interface.h"
22 #include "relationaldb_properties.h"
23 #include "cloud/schema_mgr.h"
24 #include "sqlite_single_relational_storage_engine.h"
25 #include "sqlite_single_ver_relational_continue_token.h"
26 #include "sync_able_engine.h"
27 
28 namespace DistributedDB {
29 using RelationalObserverAction =
30     std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>;
31 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface,
32     public virtual RefObject {
33 public:
34     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
35     ~RelationalSyncAbleStorage() override;
36 
37     // Get interface type of this kvdb.
38     int GetInterfaceType() const override;
39 
40     // Get the interface ref-count, in order to access asynchronously.
41     void IncRefCount() override;
42 
43     // Drop the interface ref-count.
44     void DecRefCount() override;
45 
46     // Get the identifier of this rdb.
47     std::vector<uint8_t> GetIdentifier() const override;
48 
49     // Get the dual tuple identifier of this rdb.
50     std::vector<uint8_t> GetDualTupleIdentifier() const override;
51 
52     // Get the max timestamp of all entries in database.
53     void GetMaxTimestamp(Timestamp &stamp) const override;
54 
55     // Get the max timestamp of one table.
56     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
57 
58     // Get meta data associated with the given key.
59     int GetMetaData(const Key &key, Value &value) const override;
60 
61     // Put meta data as a key-value entry.
62     int PutMetaData(const Key &key, const Value &value) override;
63 
64     // Delete multiple meta data records in a transaction.
65     int DeleteMetaData(const std::vector<Key> &keys) override;
66 
67     // Delete multiple meta data records with key prefix in a transaction.
68     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
69 
70     // Get all meta data keys.
71     int GetAllMetaKeys(std::vector<Key> &keys) const override;
72 
73     const RelationalDBProperties &GetDbProperties() const override;
74 
75     // Get the data which would be synced with query condition
76     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
77         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
78         std::vector<SingleVerKvEntry *> &entries) const override;
79 
80     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
81         const DataSizeSpecInfo &dataSizeInfo) const override;
82 
83     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
84         const DeviceID &deviceName) override;
85 
86     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
87 
88     RelationalSchemaObject GetSchemaInfo() const override;
89 
90     int GetSecurityOption(SecurityOption &option) const override;
91 
92     void NotifyRemotePushFinished(const std::string &deviceId) const override;
93 
94     // Get the timestamp when database created or imported
95     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
96 
97     std::vector<QuerySyncObject> GetTablesQuery() override;
98 
99     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
100 
101     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
102         const std::string &targetID) const override;
103 
104     int CheckAndInitQueryCondition(QueryObject &query) const override;
105     void RegisterObserverAction(uint64_t connectionId, const RelationalObserverAction &action);
106     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override;
107 
108     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
109 
110     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
111 
112     void NotifySchemaChanged();
113 
114     void RegisterHeartBeatListener(const std::function<void()> &listener);
115 
116     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
117 
118     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
119 
120     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
121         ContinueToken &token) const override;
122 
123     int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override;
124 
125     int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override;
126 
127     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
128 
129     int StartTransaction(TransactType type) override;
130 
131     int Commit() override;
132 
133     int Rollback() override;
134 
135     int GetUploadCount(const std::string &tableName, const Timestamp &timestamp, const bool isCloudForcePush,
136         int64_t &count) override;
137 
138     int FillCloudGid(const CloudSyncData &data) override;
139 
140     int GetCloudData(const TableSchema &tableSchema, const Timestamp &beginTime,
141         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
142 
143     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
144 
145     int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override;
146 
147     int ChkSchema(const TableName &tableName) override;
148 
149     int SetCloudDbSchema(const DataBaseSchema &schema) override;
150 
151     int GetCloudDbSchema(DataBaseSchema &cloudSchema) override;
152 
153     int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override;
154 
155     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
156         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
157 
158     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override;
159 
160     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
161         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override;
162 
163     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
164 
165     int SetLogTriggerStatus(bool status) override;
166 
167     int FillCloudGidAndAsset(OpType opType, const CloudSyncData &data) override;
168 
169     void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine);
170 
171     std::string GetIdentify() const override;
172 
173     void EraseDataChangeCallback(uint64_t connectionId);
174 
175     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
176 
177 private:
178     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
179         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
180     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode,
181         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
182     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
183 
184     // get
185     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
186         const DataSizeSpecInfo &dataSizeInfo) const;
187     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
188         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
189 
190     // put
191     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
192     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
193 
194     // data
195     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
196     std::function<void()> onSchemaChanged_;
197     mutable std::mutex onSchemaChangedMutex_;
198     std::mutex dataChangeDeviceMutex_;
199     std::map<uint64_t, RelationalObserverAction> dataChangeCallbackMap_;
200     std::function<void()> heartBeatListener_;
201     mutable std::mutex heartBeatMutex_;
202 
203     LruMap<std::string, std::string> remoteDeviceSchema_;
204 
205     // cache securityOption
206     mutable std::mutex securityOptionMutex_;
207     mutable SecurityOption securityOption_;
208     mutable bool isCachedOption_;
209 
210     SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr;
211     mutable std::shared_mutex transactionMutex_; // used for transaction
212 
213     SchemaMgr schemaMgr_;
214     mutable std::shared_mutex schemaMgrMutex_;
215     std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr;
216 };
217 }  // namespace DistributedDB
218 #endif
219 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H