• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "cloud/cloud_upload_recorder.h"
20 #include "cloud/schema_mgr.h"
21 #ifdef USE_FFRT
22 #include "ffrt.h"
23 #endif
24 #include "icloud_sync_storage_interface.h"
25 #include "lru_map.h"
26 #include "relational_db_sync_interface.h"
27 #include "relationaldb_properties.h"
28 #include "sqlite_single_relational_storage_engine.h"
29 #include "sqlite_single_ver_relational_continue_token.h"
30 #include "sync_able_engine.h"
31 
32 namespace DistributedDB {
33 using RelationalObserverAction =
34     std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>;
35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface,
36     public virtual RefObject {
37 public:
38     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
39     ~RelationalSyncAbleStorage() override;
40 
41     // Get interface type of this kvdb.
42     int GetInterfaceType() const override;
43 
44     // Get the interface ref-count, in order to access asynchronously.
45     void IncRefCount() override;
46 
47     // Drop the interface ref-count.
48     void DecRefCount() override;
49 
50     // Get the identifier of this rdb.
51     std::vector<uint8_t> GetIdentifier() const override;
52 
53     // Get the dual tuple identifier of this rdb.
54     std::vector<uint8_t> GetDualTupleIdentifier() const override;
55 
56     // Get the max timestamp of all entries in database.
57     void GetMaxTimestamp(Timestamp &stamp) const override;
58 
59     // Get the max timestamp of one table.
60     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
61 
62     // Get meta data associated with the given key.
63     int GetMetaData(const Key &key, Value &value) const override;
64 
65     // Put meta data as a key-value entry.
66     int PutMetaData(const Key &key, const Value &value) override;
67 
68     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
69 
70     // Delete multiple meta data records in a transaction.
71     int DeleteMetaData(const std::vector<Key> &keys) override;
72 
73     // Delete multiple meta data records with key prefix in a transaction.
74     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
75 
76     // Get all meta data keys.
77     int GetAllMetaKeys(std::vector<Key> &keys) const override;
78 
79     const RelationalDBProperties &GetDbProperties() const override;
80 
81     // Get the data which would be synced with query condition
82     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
83         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
84         std::vector<SingleVerKvEntry *> &entries) const override;
85 
86     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
87         const DataSizeSpecInfo &dataSizeInfo) const override;
88 
89     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
90         const DeviceID &deviceName) override;
91 
92     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
93 
94     RelationalSchemaObject GetSchemaInfo() const override;
95 
96     int GetSecurityOption(SecurityOption &option) const override;
97 
98     void NotifyRemotePushFinished(const std::string &deviceId) const override;
99 
100     // Get the timestamp when database created or imported
101     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
102 
103     std::vector<QuerySyncObject> GetTablesQuery() override;
104 
105     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
106 
107     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
108         const std::string &targetID, bool isPush) const override;
109 
110     int CheckAndInitQueryCondition(QueryObject &query) const override;
111     int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
112         const RelationalObserverAction &action);
113     int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer);
114     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override;
115 
116     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
117 
118     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
119 
120     void NotifySchemaChanged();
121 
122     void RegisterHeartBeatListener(const std::function<void()> &listener);
123 
124     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
125 
126     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
127 
128     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
129         ContinueToken &token) const override;
130 
131     int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override;
132 
133     int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override;
134 
135     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
136 
137     // recycling the write handle
138     void SetReusedHandle(StorageExecutor *handle);
139 
140     int StartTransaction(TransactType type) override;
141 
142     int Commit() override;
143 
144     int Rollback() override;
145 
146     int GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp, bool isCloudForcePush,
147         bool isCompensatedTask, int64_t &count) override;
148 
149     int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> &timestampVec,
150         bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override;
151 
152     int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime,
153         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
154 
155     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
156 
157     int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush,
158         bool isCompensatedTask, std::vector<std::string> &cloudGid) override;
159 
160     int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override;
161 
162     int GetSchemaFromDB(RelationalSchemaObject &schema) override;
163 
164     int ChkSchema(const TableName &tableName) override;
165 
166     int SetCloudDbSchema(const DataBaseSchema &schema) override;
167 
168     int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override;
169 
170     int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override;
171 
172     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
173         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
174 
175     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override;
176 
177     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
178         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override;
179 
180     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
181 
182     int SetLogTriggerStatus(bool status) override;
183     int SetCursorIncFlag(bool flag) override;
184 
185     int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override;
186 
187     void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine);
188 
189     std::string GetIdentify() const override;
190 
191     void EraseDataChangeCallback(uint64_t connectionId);
192 
193     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
194 
195     int CheckQueryValid(const QuerySyncObject &query) override;
196 
197     int CreateTempSyncTrigger(const std::string &tableName) override;
198     int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override;
199     int ClearAllTempSyncTrigger() override;
200     bool IsSharedTable(const std::string &tableName) override;
201 
202     std::map<std::string, std::string> GetSharedTableOriginNames();
203 
204     void SetLogicDelete(bool logicDelete);
205 
206     void SetCloudTaskConfig(const CloudTaskConfig &config) override;
207 
208     std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
209         const Bytes &hashKey, VBucket &assets) override;
210 
211     int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override;
212 
213     int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records);
214 
215     int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override;
216 
217     int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users) override;
218 
219     int ClearUnLockingNoNeedCompensated() override;
220 
221     int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
222         const std::set<std::string> &gidFilters) override;
223 
224     CloudSyncConfig GetCloudSyncConfig() const override;
225 
226     void SetCloudSyncConfig(const CloudSyncConfig &config);
227 
228     bool IsTableExistReference(const std::string &table) override;
229 
230     bool IsTableExistReferenceOrReferenceBy(const std::string &table) override;
231 
232     void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override;
233 protected:
234     int FillReferenceData(CloudSyncData &syncData);
235 
236     int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
237         const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo);
238 
239     int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
240         DownloadData &downloadData);
241 
242     virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
243         std::map<int64_t, Entries> &referenceGid);
244 
245     int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType,
246         const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid);
247 
248     int UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
249         const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock = false);
250 
251     static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
252         const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend);
253 
254 private:
255     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
256         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
257     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode,
258         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
259     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
260 
261     // get
262     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
263         const DataSizeSpecInfo &dataSizeInfo) const;
264     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
265         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
266 
267     int GetTableReference(const std::string &tableName,
268         std::map<std::string, std::vector<TableReferenceProperty>> &reference);
269 
270     std::pair<std::string, int> GetSourceTableName(const std::string &tableName);
271 
272     std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName);
273     // put
274     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
275     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
276     void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type);
277     StoreInfo GetStoreInfo() const;
278 
279     bool IsCurrentLogicDelete() const;
280 
281     int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
282         const std::vector<VBucket> &records);
283 
284     int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
285         const std::vector<VBucket> &records);
286 
287     int GetCloudTableWithoutShared(std::vector<TableSchema> &tables);
288 
289     int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
290         const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery);
291 
292     int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
293         bool flag = false);
294 
295     bool CheckTableSupportCompensatedSync(const TableSchema &table);
296 
297     void ExecuteDataChangeCallback(
298         const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
299         const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt);
300     // data
301     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
302     std::function<void()> onSchemaChanged_;
303     mutable std::mutex onSchemaChangedMutex_;
304 #ifdef USE_FFRT
305     ffrt::mutex dataChangeDeviceMutex_;
306 #else
307     std::mutex dataChangeDeviceMutex_;
308 #endif
309     std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_;
310     std::function<void()> heartBeatListener_;
311     mutable std::mutex heartBeatMutex_;
312 
313     LruMap<std::string, std::string> remoteDeviceSchema_;
314     StorageExecutor *reusedHandle_;
315     mutable std::mutex reusedHandleMutex_;
316 
317     // cache securityOption
318     mutable std::mutex securityOptionMutex_;
319     mutable SecurityOption securityOption_;
320     mutable bool isCachedOption_;
321 
322     SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr;
323     mutable std::shared_mutex transactionMutex_; // used for transaction
324 
325     SchemaMgr schemaMgr_;
326     mutable std::shared_mutex schemaMgrMutex_;
327     std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr;
328 
329     std::atomic<bool> logicDelete_ = false;
330     std::atomic<bool> allowLogicDelete_ = false;
331 
332     std::function<void (void)> syncFinishFunc_;
333     std::function<void (void)> uploadStartFunc_;
334 
335     mutable std::mutex configMutex_;
336     CloudSyncConfig cloudSyncConfig_;
337 
338     CloudUploadRecorder uploadRecorder_;
339 };
340 }  // namespace DistributedDB
341 #endif
342 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H
343