• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
18 
19 #include "macro_utils.h"
20 #include "db_types.h"
21 #include "query_object.h"
22 #include "sqlite_utils.h"
23 #include "sqlite_storage_executor.h"
24 #include "single_ver_natural_store_commit_notify_data.h"
25 
26 namespace DistributedDB {
27 enum class SingleVerDataType {
28     META_TYPE,
29     LOCAL_TYPE,
30     SYNC_TYPE,
31 };
32 
33 enum class DataStatus {
34     NOEXISTED,
35     DELETED,
36     EXISTED,
37 };
38 
39 enum class ExecutorState {
40     INVALID = -1,
41     MAINDB,
42     CACHEDB,
43     MAIN_ATTACH_CACHE, // After process crash and cacheDb existed
44     CACHE_ATTACH_MAIN, // while cacheDb migrating to mainDb
45 };
46 
47 struct DataOperStatus {
48     DataStatus preStatus = DataStatus::NOEXISTED;
49     bool isDeleted = false;
50     bool isDefeated = false; // whether the put data is defeated.
51 };
52 
53 struct SingleVerRecord {
54     Key key;
55     Value value;
56     Timestamp timestamp = 0;
57     uint64_t flag = 0;
58     std::string device;
59     std::string origDevice;
60     Key hashKey;
61     Timestamp writeTimestamp = 0;
62 };
63 
64 struct DeviceInfo {
65     bool isLocal = false;
66     std::string deviceName;
67 };
68 
69 struct LocalDataItem {
70     Key key;
71     Value value;
72     Timestamp timestamp = 0;
73     Key hashKey;
74     uint64_t flag = 0;
75 };
76 
77 struct NotifyConflictAndObserverData {
78     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
79     DataItem getData;
80     Key hashKey;
81     DataOperStatus dataStatus;
82 };
83 
84 struct NotifyMigrateSyncData {
85     bool isRemote = false;
86     bool isRemoveDeviceData = false;
87     bool isPermitForceWrite = true;
88     SingleVerNaturalStoreCommitNotifyData *committedData = nullptr;
89     std::vector<Entry> entries{};
90 };
91 
92 struct SyncDataDevices {
93     std::string origDev;
94     std::string dev;
95 };
96 
97 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor {
98 public:
99     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb);
100     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState);
101     ~SQLiteSingleVerStorageExecutor() override;
102 
103     // Delete the copy and assign constructors
104     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor);
105 
106     // Get the Kv data according the type(sync, meta, local data).
107     int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp &timestamp) const;
108 
109     // Get the sync data record by hash key.
110     int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const;
111 
112     // Put the Kv data according the type(meta and the local data).
113     int PutKvData(SingleVerDataType type, const Key &key, const Value &value,
114         Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData);
115 
116     int GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix, std::vector<Entry> &entries) const;
117 
118     int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const;
119 
120     int GetCount(QueryObject &queryObj, int &count) const;
121 
122     // Get all the meta keys.
123     int GetAllMetaKeys(std::vector<Key> &keys) const;
124 
125     int GetAllSyncedEntries(const std::string &hashDev, std::vector<Entry> &entries) const;
126 
127     int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
128         Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true);
129 
130     int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value,
131         Timestamp &timestamp);
132 
133     // delete a row data by hashKey, with no tombstone left.
134     int EraseSyncData(const Key &hashKey);
135 
136     int RemoveDeviceData(const std::string &deviceName);
137 
138     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify, uint64_t recordVersion) const;
139 
140     void InitCurrentMaxStamp(Timestamp &maxStamp);
141 
142     void ReleaseContinueStatement();
143 
144     int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin,
145         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
146     int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin,
147         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
148 
149     int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier);
150 
151     int OpenResultSet(const Key &keyPrefix, int &count);
152 
153     int OpenResultSet(QueryObject &queryObj, int &count);
154 
155     int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
156         uint32_t cacheLimit, int &count);
157 
158     int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
159         uint32_t cacheLimit, int &count);
160 
161     int ReloadResultSet(const Key &keyPrefix);
162 
163     int ReloadResultSet(QueryObject &queryObj);
164 
165     int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
166         uint32_t cacheLimit, uint32_t cacheStartPos);
167 
168     int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
169         uint32_t cacheLimit, uint32_t cacheStartPos);
170 
171     int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy);
172 
173     int GetEntryByRowId(int64_t rowId, Entry &entry);
174 
175     void CloseResultSet();
176 
177     int StartTransaction(TransactType type);
178 
179     int Commit();
180 
181     int Rollback();
182 
183     bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp &timestamp) const;
184 
185     int PrepareForSavingData(SingleVerDataType type);
186 
187     int ResetForSavingData(SingleVerDataType type);
188 
189     int Reset() override;
190 
191     int UpdateLocalDataTimestamp(Timestamp timestamp);
192 
193     void SetAttachMetaMode(bool attachMetaMode);
194 
195     int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const;
196 
197     int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp,
198         uint64_t recordVersion, const QueryObject &query);
199 
200     int PrepareForSavingCacheData(SingleVerDataType type);
201     int ResetForSavingCacheData(SingleVerDataType type);
202 
203     int MigrateLocalData();
204 
205     int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
206         std::vector<DataItem> &dataItems);
207     int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const;
208 
209     int GetMaxVersionInCacheDb(uint64_t &maxVersion) const;
210     int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
211         const std::string &attachDbAbsPath, EngineState engineState);
212 
213     // Clear migrating data.
214     void ClearMigrateData();
215 
216     // Get current max timestamp.
217     int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const;
218 
219     void SetConflictResolvePolicy(int policy);
220 
221     // Delete multiple meta data records in a transaction.
222     int DeleteMetaData(const std::vector<Key> &keys);
223     // Delete multiple meta data records with key prefix in a transaction.
224     int DeleteMetaDataByPrefixKey(const Key &keyPrefix);
225 
226     int CheckIntegrity() const;
227 
228     int CheckQueryObjectLegal(QueryObject &queryObj) const;
229 
230     int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
231 
232     static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen);
233 
234     int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId);
235 
236     int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds);
237 
238     int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds);
239 
240     int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames);
241 
242     int RemoveTrigger(const std::vector<std::string> &triggers);
243 
244     int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo,
245         const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const;
246 
247     int ForceCheckPoint() const;
248 
249     uint64_t GetLogFileSize() const;
250 
251     int GetExistsDevicesFromMeta(std::set<std::string> &devices);
252 
253     int UpdateKey(const UpdateKeyCallback &callback);
254 
255 private:
256     struct SaveRecordStatements {
257         sqlite3_stmt *queryStatement = nullptr;
258         sqlite3_stmt *insertStatement = nullptr;
259         sqlite3_stmt *updateStatement = nullptr;
260 
261         int ResetStatement();
262 
GetDataSaveStatementSaveRecordStatements263         inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const
264         {
265             return isUpdate ? updateStatement : insertStatement;
266         }
267     };
268 
269     struct UpdateContext {
270         int errCode = E_OK;
271         Key newKey;
272         UpdateKeyCallback callback;
273     };
274 
275     void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status,
276         SingleVerNaturalStoreCommitNotifyData *committedData);
277 
278     static int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey,
279         const SyncDataDevices &devices, bool isUpdate);
280 
281     static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev,
282         const std::string &deviceName);
283 
284     static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo,
285         const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData);
286 
287     static DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet,
288         const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite = true);
289 
290     static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet);
291 
292     int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const;
293 
294     int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const;
295 
296     int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const;
297 
298     int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false)
299         const;
300 
301     int StepForResultEntries(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries) const;
302 
303     int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt);
304 
305     int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt);
306 
307     int InitResultSetContent(QueryObject &queryObj);
308 
309     int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt);
310 
311     int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
312 
313     int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp,
314         SingleVerDataType type);
315 
316     int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev,
317         const std::string &deviceName, bool isUpdate);
318 
319     int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp);
320 
321     int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
322         const Key &key, const Value &value);
323 
324     int PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
325         const std::string &updateSql, SaveRecordStatements &statements) const;
326 
327     int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count);
328 
329     int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
330         uint32_t cacheStartPos, int &count);
331 
332     void FinalizeAllStatements();
333     int ResetSaveSyncStatements(int errCode);
334 
335     int BindSyncDataInCacheMode(sqlite3_stmt *statement,
336         const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
337 
338     int BindPrimaryKeySyncDataInCacheMode(
339         sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const;
340 
341     int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const;
342 
343     int BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
344         const std::string &origDev, const std::string &deviceName) const;
345 
346     int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
347 
348     int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
349         uint64_t &verInCurCacheDb, bool isCacheDb) const;
350     int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
351         uint64_t &verInCurCacheDb, bool isCacheDb) const;
352     int DelCacheDbDataByVersion(uint64_t version) const;
353 
354     // use for migrating data
355     int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const;
356 
357     // Process timestamp for syncdata in cacheDB when migrating.
358     int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems);
359 
360     // Get migrateTimeOffset_.
361     int InitMigrateTimestampOffset();
362 
363     // Get min timestamp of local data in sync_data, cacheDB.
364     int GetMinTimestampInCacheDB(Timestamp &minStamp) const;
365 
366     // Prepare conflict notify and commit notify data.
367     int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo,
368         NotifyConflictAndObserverData &notify, bool isPermitForceWrite = true);
369 
370     // Put observer and conflict data into commit notify when migrating cacheDB.
371     int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo,
372         NotifyConflictAndObserverData &notify, bool isPermitForceWrite);
373 
374     int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData);
375 
376     int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData);
377 
378     int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const;
379 
380     // Reset migrateSyncStatements_.
381     int ResetForMigrateCacheData();
382 
383     // Init migrating data.
384     int InitMigrateData();
385 
386     int MigrateRmDevData(const DataItem &dataItem) const;
387     int VacuumLocalData() const;
388 
389     int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
390         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const;
391 
392     int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
393         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const;
394 
395     int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo,
396         std::vector<DataItem> &dataItems);
397 
398     int CheckDataWithQuery(std::vector<DataItem> &dataItems);
399 
400     int GetExpandedCheckSql(QueryObject query, DataItem &dataItem);
401 
402     int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item);
403 
404     int CreateFuncUpdateKey(UpdateContext &context,
405         void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
406         void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const;
407 
408     static void Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv);
409 
410     static void CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv);
411 
412     sqlite3_stmt *getSyncStatement_;
413     sqlite3_stmt *getResultRowIdStatement_;
414     sqlite3_stmt *getResultEntryStatement_;
415     SaveRecordStatements saveSyncStatements_;
416     SaveRecordStatements saveLocalStatements_;
417 
418     // Used for migrating sync_data.
419     SaveRecordStatements migrateSyncStatements_;
420     bool isTransactionOpen_;
421     bool attachMetaMode_; // true for attach meta mode
422     ExecutorState executorState_;
423 
424     // Max timestamp in mainDB. Used for migrating.
425     Timestamp maxTimestampInMainDB_;
426 
427     // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating.
428     TimeOffset migrateTimeOffset_;
429 
430     // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set,
431     // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful.
432     bool isSyncMigrating_;
433     int conflictResolvePolicy_;
434 };
435 } // namespace DistributedDB
436 
437 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
438