• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
18 
19 #include "macro_utils.h"
20 #include "db_types.h"
21 #include "query_object.h"
22 #include "sqlite_utils.h"
23 #include "sqlite_storage_executor.h"
24 #include "single_ver_natural_store_commit_notify_data.h"
25 
26 namespace DistributedDB {
27 enum class SingleVerDataType {
28     META_TYPE,
29     LOCAL_TYPE,
30     SYNC_TYPE,
31 };
32 
33 enum class DataStatus {
34     NOEXISTED,
35     DELETED,
36     EXISTED,
37 };
38 
39 enum class ExecutorState {
40     INVALID = -1,
41     MAINDB,
42     CACHEDB,
43     MAIN_ATTACH_CACHE, // After process crash and cacheDb existed
44     CACHE_ATTACH_MAIN, // while cacheDb migrating to mainDb
45 };
46 
47 struct DataOperStatus {
48     DataStatus preStatus = DataStatus::NOEXISTED;
49     bool isDeleted = false;
50     bool isDefeated = false; // whether the put data is defeated.
51 };
52 
53 struct SingleVerRecord {
54     Key key;
55     Value value;
56     Timestamp timestamp = 0;
57     uint64_t flag = 0;
58     std::string device;
59     std::string origDevice;
60     Key hashKey;
61     Timestamp writeTimestamp = 0;
62 };
63 
64 struct DeviceInfo {
65     bool isLocal = false;
66     std::string deviceName;
67 };
68 
69 struct LocalDataItem {
70     Key key;
71     Value value;
72     Timestamp timestamp = 0;
73     Key hashKey;
74     uint64_t flag = 0;
75 };
76 
77 struct NotifyConflictAndObserverData {
78     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
79     DataItem getData;
80     Key hashKey;
81     DataOperStatus dataStatus;
82 };
83 
84 struct NotifyMigrateSyncData {
85     bool isRemote = false;
86     bool isRemoveDeviceData = false;
87     bool isPermitForceWrite = true;
88     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
89     std::vector<Entry> entries{};
90 };
91 
92 struct SyncDataDevices {
93     std::string origDev;
94     std::string dev;
95 };
96 
97 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor {
98 public:
99     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb);
100     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState);
101     ~SQLiteSingleVerStorageExecutor() override;
102 
103     // Delete the copy and assign constructors
104     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor);
105 
106     // Get the Kv data according the type(sync, meta, local data).
107     int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp &timestamp) const;
108 
109     // Get the sync data record by hash key.
110     int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const;
111 
112     // Put the Kv data according the type(meta and the local data).
113     int PutKvData(SingleVerDataType type, const Key &key, const Value &value,
114         Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData);
115 
116     int GetEntries(SingleVerDataType type, const Key &keyPrefix, std::vector<Entry> &entries) const;
117 
118     int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const;
119 
120     int GetCount(QueryObject &queryObj, int &count) const;
121 
122     // Get all the meta keys.
123     int GetAllMetaKeys(std::vector<Key> &keys) const;
124 
125     int GetAllSyncedEntries(const std::string &deviceName, std::vector<Entry> &entries) const;
126 
127     int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
128         Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true);
129 
130     int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value,
131         Timestamp &timestamp);
132 
133     // delete a row data by hashKey, with no tombstone left.
134     int EraseSyncData(const Key &hashKey);
135 
136     int RemoveDeviceData(const std::string &deviceName);
137 
138     int RemoveDeviceDataInCacheMode(const std::string &deviceName, bool isNeedNotify, uint64_t recordVersion) const;
139 
140     void InitCurrentMaxStamp(Timestamp &maxStamp);
141 
142     void ReleaseContinueStatement();
143 
144     int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin,
145         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
146     int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin,
147         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
148 
149     int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier);
150 
151     int OpenResultSet(const Key &keyPrefix, int &count);
152 
153     int OpenResultSet(QueryObject &queryObj, int &count);
154 
155     int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
156         uint32_t cacheLimit, int &count);
157 
158     int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
159         uint32_t cacheLimit, int &count);
160 
161     int ReloadResultSet(const Key &keyPrefix);
162 
163     int ReloadResultSet(QueryObject &queryObj);
164 
165     int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
166         uint32_t cacheLimit, uint32_t cacheStartPos);
167 
168     int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
169         uint32_t cacheLimit, uint32_t cacheStartPos);
170 
171     int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy);
172 
173     int GetEntryByRowId(int64_t rowId, Entry &entry);
174 
175     void CloseResultSet();
176 
177     int StartTransaction(TransactType type);
178 
179     int Commit();
180 
181     int Rollback();
182 
183     bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp &timestamp) const;
184 
185     int PrepareForSavingData(SingleVerDataType type);
186 
187     int ResetForSavingData(SingleVerDataType type);
188 
189     int Reset() override;
190 
191     int UpdateLocalDataTimestamp(Timestamp timestamp);
192 
193     void SetAttachMetaMode(bool attachMetaMode);
194 
195     int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const;
196 
197     int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp,
198         uint64_t recordVersion, const QueryObject &query);
199 
200     int PrepareForSavingCacheData(SingleVerDataType type);
201     int ResetForSavingCacheData(SingleVerDataType type);
202 
203     int MigrateLocalData();
204 
205     int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
206         std::vector<DataItem> &dataItems);
207     int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &maxVerIncurCacheDb) const;
208 
209     int GetMaxVersionInCacheDb(uint64_t &maxVersion) const;
210     int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
211         const std::string &attachDbAbsPath, EngineState engineState);
212 
213     // Clear migrating data.
214     void ClearMigrateData();
215 
216     // Get current max timestamp.
217     int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const;
218 
219     void SetConflictResolvePolicy(int policy);
220 
221     // Delete multiple meta data records in a transaction.
222     int DeleteMetaData(const std::vector<Key> &keys);
223     // Delete multiple meta data records with key prefix in a transaction.
224     int DeleteMetaDataByPrefixKey(const Key &keyPrefix);
225 
226     int CheckIntegrity() const;
227 
228     int CheckQueryObjectLegal(QueryObject &queryObj) const;
229 
230     int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
231 
232     static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen);
233 
234     int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId);
235 
236     int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds);
237 
238     int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds);
239 
240     int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames);
241 
242     int RemoveTrigger(const std::vector<std::string> &triggers);
243 
244     int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo,
245         const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const;
246 
247     int ForceCheckPoint() const;
248 
249     uint64_t GetLogFileSize() const;
250 
251 private:
252     struct SaveRecordStatements {
253         sqlite3_stmt *queryStatement = nullptr;
254         sqlite3_stmt *insertStatement = nullptr;
255         sqlite3_stmt *updateStatement = nullptr;
256 
257         int ResetStatement();
258 
GetDataSaveStatementSaveRecordStatements259         inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const
260         {
261             return isUpdate ? updateStatement : insertStatement;
262         }
263     };
264 
265     void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status,
266         const Key &hashKey, SingleVerNaturalStoreCommitNotifyData *committedData);
267 
268     static int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey,
269         const SyncDataDevices &devices, bool isUpdate);
270 
271     static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev,
272         const std::string &deviceName);
273 
274     static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo,
275         const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData);
276 
277     static DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet,
278         const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite = true);
279 
280     static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet);
281 
282     int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const;
283 
284     int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const;
285 
286     int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const;
287 
288     int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false)
289         const;
290 
291     int StepForResultEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
292 
293     int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt);
294 
295     int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt);
296 
297     int InitResultSetContent(QueryObject &queryObj);
298 
299     int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt);
300 
301     int GetAllKeys(sqlite3_stmt *statement, std::vector<Key> &keys) const;
302 
303     int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
304 
305     int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp,
306         SingleVerDataType type);
307 
308     int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev,
309         const std::string &deviceName, bool isUpdate);
310 
311     int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp);
312 
313     int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
314         const Key &key, const Value &value);
315 
316     int PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
317         const std::string &updateSql, SaveRecordStatements &statements) const;
318 
319     int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count);
320 
321     int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
322         uint32_t cacheStartPos, int &count);
323 
324     void FinalizeAllStatements();
325     int ResetSaveSyncStatements(int errCode);
326 
327     int BindSyncDataInCacheMode(sqlite3_stmt *statement,
328         const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
329 
330     int BindPrimaryKeySyncDataInCacheMode(
331         sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const;
332 
333     int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const;
334 
335     int BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
336         const std::string &origDev, const std::string &deviceName) const;
337 
338     int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
339 
340     int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
341         uint64_t &verInCurCacheDb, bool isCacheDb) const;
342     int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
343         uint64_t &verInCurCacheDb, bool isCacheDb) const;
344     int DelCacheDbDataByVersion(uint64_t version) const;
345 
346     // use for migrating data
347     int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const;
348 
349     // Process timestamp for syncdata in cacheDB when migrating.
350     int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems);
351 
352     // Get migrateTimeOffset_.
353     int InitMigrateTimestampOffset();
354 
355     // Get min timestamp of local data in sync_data, cacheDB.
356     int GetMinTimestampInCacheDB(Timestamp &minStamp) const;
357 
358     // Prepare conflict notify and commit notify data.
359     int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo,
360         NotifyConflictAndObserverData &notify, bool isPermitForceWrite = true);
361 
362     // Put observer and conflict data into commit notify when migrating cacheDB.
363     int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo,
364         NotifyConflictAndObserverData &notify, bool isPermitForceWrite);
365 
366     int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData);
367 
368     int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData);
369 
370     int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const;
371 
372     // Reset migrateSyncStatements_.
373     int ResetForMigrateCacheData();
374 
375     // Init migrating data.
376     int InitMigrateData();
377 
378     int MigrateRmDevData(const DataItem &dataItem) const;
379     int VacuumLocalData() const;
380 
381     int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
382         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const;
383 
384     int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
385         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const;
386 
387     int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo,
388         std::vector<DataItem> &dataItems);
389 
390     int CheckDataWithQuery(std::vector<DataItem> &dataItems);
391 
392     int GetExpandedCheckSql(QueryObject query, DataItem &dataItem);
393 
394     int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item);
395 
396     sqlite3_stmt *getSyncStatement_;
397     sqlite3_stmt *getResultRowIdStatement_;
398     sqlite3_stmt *getResultEntryStatement_;
399     SaveRecordStatements saveSyncStatements_;
400     SaveRecordStatements saveLocalStatements_;
401 
402     // Used for migrating sync_data.
403     SaveRecordStatements migrateSyncStatements_;
404     bool isTransactionOpen_;
405     bool attachMetaMode_; // true for attach meta mode
406     ExecutorState executorState_;
407 
408     // Max timestamp in mainDB. Used for migrating.
409     Timestamp maxTimestampInMainDB_;
410 
411     // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating.
412     TimeOffset migrateTimeOffset_;
413 
414     // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set,
415     // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful.
416     bool isSyncMigrating_;
417     int conflictResolvePolicy_;
418 };
419 } // namespace DistributedDB
420 
421 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
422