• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "cloud/cloud_upload_recorder.h"
20 #include "cloud/schema_mgr.h"
21 #ifdef USE_FFRT
22 #include "ffrt.h"
23 #endif
24 #include "icloud_sync_storage_interface.h"
25 #include "lru_map.h"
26 #include "relational_db_sync_interface.h"
27 #include "relationaldb_properties.h"
28 #include "sqlite_single_relational_storage_engine.h"
29 #include "sqlite_single_ver_relational_continue_token.h"
30 #include "sync_able_engine.h"
31 
32 namespace DistributedDB {
33 using RelationalObserverAction =
34     std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData, Origin origin)>;
35 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface,
36     public virtual RefObject {
37 public:
38     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
39     ~RelationalSyncAbleStorage() override;
40 
41     // Get interface type of this kvdb.
42     int GetInterfaceType() const override;
43 
44     // Get the interface ref-count, in order to access asynchronously.
45     void IncRefCount() override;
46 
47     // Drop the interface ref-count.
48     void DecRefCount() override;
49 
50     // Get the identifier of this rdb.
51     std::vector<uint8_t> GetIdentifier() const override;
52 
53     // Get the dual tuple identifier of this rdb.
54     std::vector<uint8_t> GetDualTupleIdentifier() const override;
55 
56     // Get the max timestamp of all entries in database.
57     void GetMaxTimestamp(Timestamp &stamp) const override;
58 
59     // Get the max timestamp of one table.
60     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
61 
62     // Get meta data associated with the given key.
63     int GetMetaData(const Key &key, Value &value) const override;
64 
65     // Put meta data as a key-value entry.
66     int PutMetaData(const Key &key, const Value &value) override;
67 
68     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
69 
70     // Delete multiple meta data records in a transaction.
71     int DeleteMetaData(const std::vector<Key> &keys) override;
72 
73     // Delete multiple meta data records with key prefix in a transaction.
74     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
75 
76     // Get all meta data keys.
77     int GetAllMetaKeys(std::vector<Key> &keys) const override;
78 
79     const RelationalDBProperties &GetDbProperties() const override;
80 
81     // Get the data which would be synced with query condition
82     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
83         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
84         std::vector<SingleVerKvEntry *> &entries) const override;
85 
86     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
87         const DataSizeSpecInfo &dataSizeInfo) const override;
88 
89     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
90         const DeviceID &deviceName) override;
91 
92     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
93 
94     RelationalSchemaObject GetSchemaInfo() const override;
95 
96     int GetSecurityOption(SecurityOption &option) const override;
97 
98     void NotifyRemotePushFinished(const std::string &deviceId) const override;
99 
100     // Get the timestamp when database created or imported
101     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
102 
103     std::vector<QuerySyncObject> GetTablesQuery() override;
104 
105     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
106 
107     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
108         const std::string &targetID, bool isPush) const override;
109 
110     int CheckAndInitQueryCondition(QueryObject &query) const override;
111     int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
112         const RelationalObserverAction &action);
113     int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer);
114     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override;
115 
116     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
117 
118     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
119 
120     void NotifySchemaChanged();
121 
122     void RegisterHeartBeatListener(const std::function<void()> &listener);
123 
124     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
125 
126     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
127 
128     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
129         ContinueToken &token) const override;
130 
131     int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override;
132 
133     int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) const override;
134 
135     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
136 
137     // recycling the write handle
138     void SetReusedHandle(StorageExecutor *handle);
139 
140     int StartTransaction(TransactType type) override;
141 
142     int Commit() override;
143 
144     int Rollback() override;
145 
146     int GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp, bool isCloudForcePush,
147         bool isCompensatedTask, int64_t &count) override;
148 
149     int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> &timestampVec,
150         bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override;
151 
152     int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime,
153         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
154 
155     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
156 
157     int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush,
158         bool isCompensatedTask, std::vector<std::string> &cloudGid) override;
159 
160     int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override;
161 
162     int GetSchemaFromDB(RelationalSchemaObject &schema) override;
163 
164     int ChkSchema(const TableName &tableName) override;
165 
166     int SetCloudDbSchema(const DataBaseSchema &schema) override;
167 
168     int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override;
169 
170     int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override;
171 
172     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
173         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
174 
175     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override;
176 
177     int UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset) override;
178 
179     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
180         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override;
181 
182     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
183 
184     int FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
185 
186     int SetLogTriggerStatus(bool status) override;
187 
188     int SetLogTriggerStatusForAsyncDownload(bool status) override;
189 
190     int SetCursorIncFlag(bool flag) override;
191 
192     int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override;
193 
194     void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine);
195 
196     std::string GetIdentify() const override;
197 
198     void EraseDataChangeCallback(uint64_t connectionId);
199 
200     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
201 
202     int CheckQueryValid(const QuerySyncObject &query) override;
203 
204     int CreateTempSyncTrigger(const std::string &tableName) override;
205     int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override;
206     int ClearAllTempSyncTrigger() override;
207     bool IsSharedTable(const std::string &tableName) override;
208 
209     std::map<std::string, std::string> GetSharedTableOriginNames();
210 
211     void SetLogicDelete(bool logicDelete);
212 
213     std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
214         const Bytes &hashKey, VBucket &assets) override;
215 
216     std::pair<int, uint32_t> GetAssetsByGidOrHashKeyForAsyncDownload(
217         const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets) override;
218 
219     int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override;
220 
221     int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records);
222 
223     int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override;
224 
225     int UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict,
226         const LogInfo &logInfo) override;
227 
228     int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users,
229         bool isQueryDownloadRecords) override;
230 
231     int ClearUnLockingNoNeedCompensated() override;
232 
233     int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
234         const std::set<std::string> &gidFilters) override;
235 
236     int MarkFlagAsAssetAsyncDownload(const std::string &tableName, const DownloadData &downloadData,
237         const std::set<std::string> &gidFilters) override;
238 
239     CloudSyncConfig GetCloudSyncConfig() const override;
240 
241     void SetCloudSyncConfig(const CloudSyncConfig &config);
242 
243     bool IsTableExistReference(const std::string &table) override;
244 
245     bool IsTableExistReferenceOrReferenceBy(const std::string &table) override;
246 
247     void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override;
248 
249     int GetCursor(const std::string &tableName, uint64_t &cursor) override;
250 
251     bool IsCurrentLogicDelete() const override;
252 
253     int GetLocalDataCount(const std::string &tableName, int &dataCount, int &logicDeleteDataCount) override;
254 
255     std::pair<int, std::vector<std::string>> GetDownloadAssetTable() override;
256 
257     std::pair<int, std::vector<std::string>> GetDownloadAssetRecords(const std::string &tableName,
258         int64_t beginTime) override;
259 
260     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, bool useTransaction,
261         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
262 
263     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData,
264         Origin origin);
265 
266     void PrintCursorChange(const std::string &tableName) override;
267 
268     int GetLockStatusByGid(const std::string &tableName, const std::string &gid, LockStatus &status) override;
269 
270     bool IsExistTableContainAssets() override;
271 protected:
272     int FillReferenceData(CloudSyncData &syncData);
273 
274     int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
275         const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo);
276 
277     int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
278         DownloadData &downloadData);
279 
280     virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
281         std::map<int64_t, Entries> &referenceGid);
282 
283     int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType,
284         const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid);
285 
286     static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
287         const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend);
288 
289     int ReviseLocalModTime(const std::string &tableName,
290         const std::vector<ReviseModTimeInfo> &revisedData) override;
291 
292 private:
293     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
294         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
295     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode,
296         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
297     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
298 
299     // get
300     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
301         const DataSizeSpecInfo &dataSizeInfo, RelationalSchemaObject &&filterSchema) const;
302     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
303         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
304 
305     int GetTableReference(const std::string &tableName,
306         std::map<std::string, std::vector<TableReferenceProperty>> &reference);
307 
308     std::pair<std::string, int> GetSourceTableName(const std::string &tableName);
309 
310     std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName);
311     // put
312     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
313     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
314     void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type);
315     StoreInfo GetStoreInfo() const;
316 
317     int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
318         const std::vector<VBucket> &records);
319 
320     int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
321         const std::vector<VBucket> &records);
322 
323     int GetCloudTableWithoutShared(std::vector<TableSchema> &tables);
324 
325     int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
326         const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords);
327 
328     int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
329         bool flag = false);
330 
331     bool CheckTableSupportCompensatedSync(const TableSchema &table);
332 
333     void ExecuteDataChangeCallback(
334         const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
335         const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin);
336 
337     void SaveCursorChange(const std::string &tableName, uint64_t currCursor);
338 
339     // data
340     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
341     std::function<void()> onSchemaChanged_;
342     mutable std::mutex onSchemaChangedMutex_;
343 #ifdef USE_FFRT
344     ffrt::mutex dataChangeDeviceMutex_;
345 #else
346     std::mutex dataChangeDeviceMutex_;
347 #endif
348     std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_;
349     std::function<void()> heartBeatListener_;
350     mutable std::mutex heartBeatMutex_;
351 
352     mutable LruMap<std::string, std::string> remoteDeviceSchema_;
353     StorageExecutor *reusedHandle_;
354     mutable std::mutex reusedHandleMutex_;
355 
356     // cache securityOption
357     mutable std::mutex securityOptionMutex_;
358     mutable SecurityOption securityOption_;
359     mutable bool isCachedOption_;
360 
361     SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr;
362     mutable std::shared_mutex transactionMutex_; // used for transaction
363 
364     SchemaMgr schemaMgr_;
365     mutable std::shared_mutex schemaMgrMutex_;
366     std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr;
367 
368     std::atomic<bool> logicDelete_ = false;
369 
370     std::function<void (void)> syncFinishFunc_;
371     std::function<void (void)> uploadStartFunc_;
372 
373     mutable std::mutex configMutex_;
374     CloudSyncConfig cloudSyncConfig_;
375 
376     CloudUploadRecorder uploadRecorder_;
377 
378     std::map<std::string, std::pair<uint64_t, uint64_t>> cursorChangeMap_;
379 };
380 }  // namespace DistributedDB
381 #endif
382 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H