• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
18 
19 #include "macro_utils.h"
20 #include "db_types.h"
21 #include "query_object.h"
22 #include "sqlite_utils.h"
23 #include "sqlite_storage_executor.h"
24 #include "single_ver_natural_store_commit_notify_data.h"
25 
26 namespace DistributedDB {
27 enum class SingleVerDataType {
28     META_TYPE,
29     LOCAL_TYPE,
30     SYNC_TYPE,
31 };
32 
33 enum class DataStatus {
34     NOEXISTED,
35     DELETED,
36     EXISTED,
37 };
38 
39 enum class ExecutorState {
40     INVALID = -1,
41     MAINDB,
42     CACHEDB,
43     MAIN_ATTACH_CACHE, // After process crash and cacheDb existed
44     CACHE_ATTACH_MAIN, // while cacheDb migrating to mainDb
45 };
46 
47 struct DataOperStatus {
48     DataStatus preStatus = DataStatus::NOEXISTED;
49     bool isDeleted = false;
50     bool isDefeated = false; // whether the put data is defeated.
51 };
52 
53 struct SingleVerRecord {
54     Key key;
55     Value value;
56     Timestamp timestamp = 0;
57     uint64_t flag = 0;
58     std::string device;
59     std::string origDevice;
60     Key hashKey;
61     Timestamp writeTimestamp = 0;
62 };
63 
64 struct DeviceInfo {
65     bool isLocal = false;
66     std::string deviceName;
67 };
68 
69 struct LocalDataItem {
70     Key key;
71     Value value;
72     Timestamp timestamp = 0;
73     Key hashKey;
74     uint64_t flag = 0;
75 };
76 
77 struct NotifyConflictAndObserverData {
78     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
79     DataItem getData;
80     Key hashKey;
81     DataOperStatus dataStatus;
82 };
83 
84 struct NotifyMigrateSyncData {
85     bool isRemote = false;
86     bool isRemoveDeviceData = false;
87     bool isPermitForceWrite = true;
88     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
89     std::vector<Entry> entries{};
90 };
91 
92 struct SyncDataDevices {
93     std::string origDev;
94     std::string dev;
95 };
96 
97 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor {
98 public:
99     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb);
100     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState);
101     ~SQLiteSingleVerStorageExecutor() override;
102 
103     // Delete the copy and assign constructors
104     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor);
105 
106     // Get the Kv data according the type(sync, meta, local data).
107     int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp &timestamp) const;
108 
109     // Get the sync data record by hash key.
110     int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const;
111 
112     // Put the Kv data according the type(meta and the local data).
113     int PutKvData(SingleVerDataType type, const Key &key, const Value &value,
114         Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData);
115 
116     int GetEntries(SingleVerDataType type, const Key &keyPrefix, std::vector<Entry> &entries) const;
117 
118     int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const;
119 
120     int GetCount(QueryObject &queryObj, int &count) const;
121 
122     // Get all the meta keys.
123     int GetAllMetaKeys(std::vector<Key> &keys) const;
124 
125     int GetAllSyncedEntries(const std::string &hashDev, std::vector<Entry> &entries) const;
126 
127     int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
128         Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true);
129 
130     int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value,
131         Timestamp &timestamp);
132 
133     // delete a row data by hashKey, with no tombstone left.
134     int EraseSyncData(const Key &hashKey);
135 
136     int RemoveDeviceData(const std::string &deviceName);
137 
138     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify, uint64_t recordVersion) const;
139 
140     void InitCurrentMaxStamp(Timestamp &maxStamp);
141 
142     void ReleaseContinueStatement();
143 
144     int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin,
145         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
146     int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendedLength, Timestamp begin,
147         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
148 
149     int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier);
150 
151     int OpenResultSet(const Key &keyPrefix, int &count);
152 
153     int OpenResultSet(QueryObject &queryObj, int &count);
154 
155     int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
156         uint32_t cacheLimit, int &count);
157 
158     int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
159         uint32_t cacheLimit, int &count);
160 
161     int ReloadResultSet(const Key &keyPrefix);
162 
163     int ReloadResultSet(QueryObject &queryObj);
164 
165     int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
166         uint32_t cacheLimit, uint32_t cacheStartPos);
167 
168     int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
169         uint32_t cacheLimit, uint32_t cacheStartPos);
170 
171     int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy);
172 
173     int GetEntryByRowId(int64_t rowId, Entry &entry);
174 
175     void CloseResultSet();
176 
177     int StartTransaction(TransactType type);
178 
179     int Commit();
180 
181     int Rollback();
182 
183     bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp &timestamp) const;
184 
185     int PrepareForSavingData(SingleVerDataType type);
186 
187     int ResetForSavingData(SingleVerDataType type);
188 
189     int Reset() override;
190 
191     int UpdateLocalDataTimestamp(Timestamp timestamp);
192 
193     void SetAttachMetaMode(bool attachMetaMode);
194 
195     int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const;
196 
197     int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp,
198         uint64_t recordVersion, const QueryObject &query);
199 
200     int PrepareForSavingCacheData(SingleVerDataType type);
201     int ResetForSavingCacheData(SingleVerDataType type);
202 
203     int MigrateLocalData();
204 
205     int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
206         std::vector<DataItem> &dataItems);
207     int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &maxVerIncurCacheDb) const;
208 
209     int GetMaxVersionInCacheDb(uint64_t &maxVersion) const;
210     int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
211         const std::string &attachDbAbsPath, EngineState engineState);
212 
213     // Clear migrating data.
214     void ClearMigrateData();
215 
216     // Get current max timestamp.
217     int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const;
218 
219     void SetConflictResolvePolicy(int policy);
220 
221     // Delete multiple meta data records in a transaction.
222     int DeleteMetaData(const std::vector<Key> &keys);
223     // Delete multiple meta data records with key prefix in a transaction.
224     int DeleteMetaDataByPrefixKey(const Key &keyPrefix);
225 
226     int CheckIntegrity() const;
227 
228     int CheckQueryObjectLegal(QueryObject &queryObj) const;
229 
230     int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
231 
232     static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen);
233 
234     int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId);
235 
236     int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds);
237 
238     int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds);
239 
240     int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames);
241 
242     int RemoveTrigger(const std::vector<std::string> &triggers);
243 
244     int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo,
245         const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const;
246 
247     int ForceCheckPoint() const;
248 
249     uint64_t GetLogFileSize() const;
250 
251     int UpdateKey(const UpdateKeyCallback &callback);
252 
253     int GetExistsDevicesFromMeta(std::set<std::string> &devices) const;
254 private:
255     struct SaveRecordStatements {
256         sqlite3_stmt *queryStatement = nullptr;
257         sqlite3_stmt *insertStatement = nullptr;
258         sqlite3_stmt *updateStatement = nullptr;
259 
260         int ResetStatement();
261 
GetDataSaveStatementSaveRecordStatements262         inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const
263         {
264             return isUpdate ? updateStatement : insertStatement;
265         }
266     };
267 
268     struct UpdateContext {
269         int errCode = E_OK;
270         Key newKey;
271         UpdateKeyCallback callback;
272     };
273 
274     void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status,
275         const Key &hashKey, SingleVerNaturalStoreCommitNotifyData *committedData);
276 
277     static int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey,
278         const SyncDataDevices &devices, bool isUpdate);
279 
280     static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev,
281         const std::string &deviceName);
282 
283     static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo,
284         const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData);
285 
286     static DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet,
287         const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite = true);
288 
289     static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet);
290 
291     int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const;
292 
293     int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const;
294 
295     int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const;
296 
297     int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false)
298         const;
299 
300     int StepForResultEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
301 
302     int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt);
303 
304     int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt);
305 
306     int InitResultSetContent(QueryObject &queryObj);
307 
308     int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt);
309 
310     int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
311 
312     int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp,
313         SingleVerDataType type);
314 
315     int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev,
316         const std::string &deviceName, bool isUpdate);
317 
318     int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp);
319 
320     int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
321         const Key &key, const Value &value);
322 
323     int PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
324         const std::string &updateSql, SaveRecordStatements &statements) const;
325 
326     int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count);
327 
328     int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
329         uint32_t cacheStartPos, int &count);
330 
331     void FinalizeAllStatements();
332     int ResetSaveSyncStatements(int errCode);
333 
334     int BindSyncDataInCacheMode(sqlite3_stmt *statement,
335         const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
336 
337     int BindPrimaryKeySyncDataInCacheMode(
338         sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const;
339 
340     int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const;
341 
342     int BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
343         const std::string &origDev, const std::string &deviceName) const;
344 
345     int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
346 
347     int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
348         uint64_t &verInCurCacheDb, bool isCacheDb) const;
349     int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
350         uint64_t &verInCurCacheDb, bool isCacheDb) const;
351     int DelCacheDbDataByVersion(uint64_t version) const;
352 
353     // use for migrating data
354     int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const;
355 
356     // Process timestamp for syncdata in cacheDB when migrating.
357     int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems);
358 
359     // Get migrateTimeOffset_.
360     int InitMigrateTimestampOffset();
361 
362     // Get min timestamp of local data in sync_data, cacheDB.
363     int GetMinTimestampInCacheDB(Timestamp &minStamp) const;
364 
365     // Prepare conflict notify and commit notify data.
366     int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo,
367         NotifyConflictAndObserverData &notify, bool isPermitForceWrite = true);
368 
369     // Put observer and conflict data into commit notify when migrating cacheDB.
370     int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo,
371         NotifyConflictAndObserverData &notify, bool isPermitForceWrite);
372 
373     int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData);
374 
375     int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData);
376 
377     int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const;
378 
379     // Reset migrateSyncStatements_.
380     int ResetForMigrateCacheData();
381 
382     // Init migrating data.
383     int InitMigrateData();
384 
385     int MigrateRmDevData(const DataItem &dataItem) const;
386     int VacuumLocalData() const;
387 
388     int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
389         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const;
390 
391     int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
392         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const;
393 
394     int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo,
395         std::vector<DataItem> &dataItems);
396 
397     int CheckDataWithQuery(std::vector<DataItem> &dataItems);
398 
399     int GetExpandedCheckSql(QueryObject query, DataItem &dataItem);
400 
401     int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item);
402 
403     int CreateFuncUpdateKey(UpdateContext &context,
404         void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
405         void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const;
406 
407     static void Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv);
408 
409     static void CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv);
410 
411     sqlite3_stmt *getSyncStatement_;
412     sqlite3_stmt *getResultRowIdStatement_;
413     sqlite3_stmt *getResultEntryStatement_;
414     SaveRecordStatements saveSyncStatements_;
415     SaveRecordStatements saveLocalStatements_;
416 
417     // Used for migrating sync_data.
418     SaveRecordStatements migrateSyncStatements_;
419     bool isTransactionOpen_;
420     bool attachMetaMode_; // true for attach meta mode
421     ExecutorState executorState_;
422 
423     // Max timestamp in mainDB. Used for migrating.
424     Timestamp maxTimestampInMainDB_;
425 
426     // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating.
427     TimeOffset migrateTimeOffset_;
428 
429     // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set,
430     // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful.
431     bool isSyncMigrating_;
432     int conflictResolvePolicy_;
433 };
434 } // namespace DistributedDB
435 
436 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
437