• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "cloud/cloud_upload_recorder.h"
20 #include "cloud/schema_mgr.h"
21 #ifdef USE_FFRT
22 #include "ffrt.h"
23 #endif
24 #include "icloud_sync_storage_interface.h"
25 #include "lru_map.h"
26 #include "relational_db_sync_interface.h"
27 #include "relationaldb_properties.h"
28 #include "sqlite_single_relational_storage_engine.h"
29 #include "sqlite_single_ver_relational_continue_token.h"
30 #include "sync_able_engine.h"
31 
32 namespace DistributedDB {
33 using RelationalObserverAction =
34     std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData, Origin origin)>;
35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface,
36     public virtual RefObject {
37 public:
38     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
39     ~RelationalSyncAbleStorage() override;
40 
41     // Get interface type of this kvdb.
42     int GetInterfaceType() const override;
43 
44     // Get the interface ref-count, in order to access asynchronously.
45     void IncRefCount() override;
46 
47     // Drop the interface ref-count.
48     void DecRefCount() override;
49 
50     // Get the identifier of this rdb.
51     std::vector<uint8_t> GetIdentifier() const override;
52 
53     // Get the dual tuple identifier of this rdb.
54     std::vector<uint8_t> GetDualTupleIdentifier() const override;
55 
56     // Get the max timestamp of all entries in database.
57     void GetMaxTimestamp(Timestamp &stamp) const override;
58 
59     // Get the max timestamp of one table.
60     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
61 
62     // Get meta data associated with the given key.
63     int GetMetaData(const Key &key, Value &value) const override;
64 
65     int GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const override;
66 
67     // Put meta data as a key-value entry.
68     int PutMetaData(const Key &key, const Value &value) override;
69 
70     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
71 
72     // Delete multiple meta data records in a transaction.
73     int DeleteMetaData(const std::vector<Key> &keys) override;
74 
75     // Delete multiple meta data records with key prefix in a transaction.
76     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
77 
78     // Get all meta data keys.
79     int GetAllMetaKeys(std::vector<Key> &keys) const override;
80 
81     const RelationalDBProperties &GetDbProperties() const override;
82 
83     // Get the data which would be synced with query condition
84     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
85         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
86         std::vector<SingleVerKvEntry *> &entries) const override;
87 
88     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
89         const DataSizeSpecInfo &dataSizeInfo) const override;
90 
91     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
92         const DeviceID &deviceName) override;
93 
94     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
95 
96     RelationalSchemaObject GetSchemaInfo() const override;
97 
98     int GetSecurityOption(SecurityOption &option) const override;
99 
100     void NotifyRemotePushFinished(const std::string &deviceId) const override;
101 
102     // Get the timestamp when database created or imported
103     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
104 
105     std::vector<QuerySyncObject> GetTablesQuery() override;
106 
107     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
108 
109     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
110         const std::string &targetID, bool isPush) const override;
111 
112     int CheckAndInitQueryCondition(QueryObject &query) const override;
113     int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
114         const RelationalObserverAction &action);
115     int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer);
116     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override;
117 
118     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
119 
120     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
121 
122     void NotifySchemaChanged();
123 
124     void RegisterHeartBeatListener(const std::function<void()> &listener);
125 
126     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
127 
128     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
129 
130     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
131         ContinueToken &token) const override;
132 
133     int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override;
134 
135     int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) const override;
136 
137     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
138 
139     // recycling the write handle
140     void SetReusedHandle(StorageExecutor *handle);
141 
142     int StartTransaction(TransactType type, bool isAsyncDownload = false) override;
143 
144     int Commit(bool isAsyncDownload = false) override;
145 
146     int Rollback(bool isAsyncDownload = false) override;
147 
148     int GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp, bool isCloudForcePush,
149         bool isCompensatedTask, int64_t &count) override;
150 
151     int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> &timestampVec,
152         bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override;
153 
154     int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime,
155         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
156 
157     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
158 
159     int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush,
160         bool isCompensatedTask, std::vector<std::string> &cloudGid) override;
161 
162     int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override;
163 
164     int GetSchemaFromDB(RelationalSchemaObject &schema) override;
165 
166     int ChkSchema(const TableName &tableName) override;
167 
168     int SetCloudDbSchema(const DataBaseSchema &schema) override;
169 
170     int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override;
171 
172     int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override;
173 
174     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
175         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
176 
177     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override;
178 
179     int UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset) override;
180 
181     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
182         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override;
183 
184     int ClearCloudLogVersion(const std::vector<std::string> &tableNameList) override;
185 
186     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
187 
188     int FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
189 
190     int SetLogTriggerStatus(bool status) override;
191 
192     int SetLogTriggerStatusForAsyncDownload(bool status) override;
193 
194     int SetCursorIncFlag(bool flag) override;
195 
196     int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override;
197 
198     void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine);
199 
200     std::string GetIdentify() const override;
201 
202     void EraseDataChangeCallback(uint64_t connectionId);
203 
204     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
205 
206     int CheckQueryValid(const QuerySyncObject &query) override;
207 
208     int CreateTempSyncTrigger(const std::string &tableName) override;
209     int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override;
210     int ClearAllTempSyncTrigger() override;
211     bool IsSharedTable(const std::string &tableName) override;
212 
213     std::map<std::string, std::string> GetSharedTableOriginNames();
214 
215     void SetLogicDelete(bool logicDelete);
216 
217     std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
218         const Bytes &hashKey, VBucket &assets) override;
219 
220     std::pair<int, uint32_t> GetAssetsByGidOrHashKeyForAsyncDownload(
221         const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets) override;
222 
223     int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override;
224 
225     int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records);
226 
227     int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override;
228 
229     int UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict,
230         const LogInfo &logInfo) override;
231 
232     int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users,
233         bool isQueryDownloadRecords) override;
234 
235     int ClearUnLockingNoNeedCompensated() override;
236 
237     int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
238         const std::set<std::string> &gidFilters) override;
239 
240     int MarkFlagAsAssetAsyncDownload(const std::string &tableName, const DownloadData &downloadData,
241         const std::set<std::string> &gidFilters) override;
242 
243     CloudSyncConfig GetCloudSyncConfig() const override;
244 
245     void SetCloudSyncConfig(const CloudSyncConfig &config);
246 
247     bool IsTableExistReference(const std::string &table) override;
248 
249     bool IsTableExistReferenceOrReferenceBy(const std::string &table) override;
250 
251     void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override;
252 
253     int GetCursor(const std::string &tableName, uint64_t &cursor) override;
254 
255     bool IsCurrentLogicDelete() const override;
256 
257     int GetLocalDataCount(const std::string &tableName, int &dataCount, int &logicDeleteDataCount) override;
258 
259     std::pair<int, std::vector<std::string>> GetDownloadAssetTable() override;
260 
261     std::pair<int, std::vector<std::string>> GetDownloadAssetRecords(const std::string &tableName,
262         int64_t beginTime) override;
263 
264     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, bool useTransaction,
265         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
266 
267     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData,
268         Origin origin);
269 
270     void PrintCursorChange(const std::string &tableName) override;
271 
272     int GetLockStatusByGid(const std::string &tableName, const std::string &gid, LockStatus &status) override;
273 
274     bool IsExistTableContainAssets() override;
275 
276     int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override;
277 protected:
278     int FillReferenceData(CloudSyncData &syncData);
279 
280     int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
281         const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo);
282 
283     int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
284         DownloadData &downloadData);
285 
286     virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
287         std::map<int64_t, Entries> &referenceGid);
288 
289     int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType,
290         const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid);
291 
292     static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
293         const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend);
294 
295     int ReviseLocalModTime(const std::string &tableName,
296         const std::vector<ReviseModTimeInfo> &revisedData) override;
297 
298     bool IsSetDistributedSchema(const std::string &tableName, RelationalSchemaObject &schemaObj);
299 
300 private:
301     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
302         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
303     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode,
304         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
305     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransactionForAsyncDownload(bool isWrite, int &errCode,
306         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
307     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
308 
309     // get
310     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
311         const DataSizeSpecInfo &dataSizeInfo, RelationalSchemaObject &&filterSchema) const;
312     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
313         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
314 
315     int GetTableReference(const std::string &tableName,
316         std::map<std::string, std::vector<TableReferenceProperty>> &reference);
317 
318     std::pair<std::string, int> GetSourceTableName(const std::string &tableName);
319 
320     std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName);
321     // put
322     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
323     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
324     void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type);
325     StoreInfo GetStoreInfo() const;
326 
327     int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
328         const std::vector<VBucket> &records);
329 
330     int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
331         const std::vector<VBucket> &records);
332 
333     int GetCloudTableWithoutShared(std::vector<TableSchema> &tables);
334 
335     int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
336         const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords);
337 
338     int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
339         bool flag = false);
340 
341     bool CheckTableSupportCompensatedSync(const TableSchema &table);
342 
343     void ExecuteDataChangeCallback(
344         const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
345         const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin);
346 
347     void SaveCursorChange(const std::string &tableName, uint64_t currCursor);
348 
349     int CommitForAsyncDownload();
350 
351     int RollbackForAsyncDownload();
352 
353     int StartTransactionForAsyncDownload(TransactType type);
354 
355     // data
356     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
357     std::function<void()> onSchemaChanged_;
358     mutable std::mutex onSchemaChangedMutex_;
359 #ifdef USE_FFRT
360     ffrt::mutex dataChangeDeviceMutex_;
361 #else
362     std::mutex dataChangeDeviceMutex_;
363 #endif
364     std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_;
365     std::function<void()> heartBeatListener_;
366     mutable std::mutex heartBeatMutex_;
367 
368     mutable LruMap<std::string, std::string> remoteDeviceSchema_;
369     StorageExecutor *reusedHandle_;
370     mutable std::mutex reusedHandleMutex_;
371 
372     // cache securityOption
373     mutable std::mutex securityOptionMutex_;
374     mutable SecurityOption securityOption_;
375     mutable bool isCachedOption_;
376 
377     SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr;
378     SQLiteSingleVerRelationalStorageExecutor *asyncDownloadTransactionHandle_ = nullptr;
379     mutable std::shared_mutex transactionMutex_; // used for transaction
380     mutable std::shared_mutex asyncDownloadtransactionMutex_; // used for async download transaction
381 
382     SchemaMgr schemaMgr_;
383     mutable std::shared_mutex schemaMgrMutex_;
384     std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr;
385 
386     std::atomic<bool> logicDelete_ = false;
387 
388     std::function<void (void)> syncFinishFunc_;
389     std::function<void (void)> uploadStartFunc_;
390 
391     mutable std::mutex configMutex_;
392     CloudSyncConfig cloudSyncConfig_;
393 
394     CloudUploadRecorder uploadRecorder_;
395 
396     std::map<std::string, std::pair<uint64_t, uint64_t>> cursorChangeMap_;
397 
398     std::mutex cursorChangeMutex_;
399 };
400 }  // namespace DistributedDB
401 #endif
402 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H