• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
17 #define SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
18 
19 #include "macro_utils.h"
20 #include "db_types.h"
21 #include "query_object.h"
22 #include "sqlite_utils.h"
23 #include "sqlite_storage_executor.h"
24 #include "single_ver_natural_store_commit_notify_data.h"
25 #include "time_helper.h"
26 
27 namespace DistributedDB {
28 class SQLiteSingleVerStorageExecutor : public SQLiteStorageExecutor {
29 public:
30     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb);
31     SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb, ExecutorState executorState);
32     ~SQLiteSingleVerStorageExecutor() override;
33 
34     // Delete the copy and assign constructors
35     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerStorageExecutor);
36 
37     // Get the Kv data according the type(sync, meta, local data).
38     virtual int GetKvData(SingleVerDataType type, const Key &key, Value &value, Timestamp &timestamp) const;
39 
40     // Get the sync data record by hash key.
41     int GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const;
42 
43     // Put the Kv data according the type(meta and the local data).
44     virtual int PutKvData(SingleVerDataType type, const Key &key, const Value &value,
45         Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData);
46 
47     virtual int GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix,
48         std::vector<Entry> &entries) const;
49 
50     int GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const;
51 
52     int GetCount(QueryObject &queryObj, int &count) const;
53 
54     // Get all the meta keys.
55     int GetAllMetaKeys(std::vector<Key> &keys) const;
56 
57     int GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const;
58 
59     int GetAllSyncedEntries(const std::string &hashDev, std::vector<Entry> &entries) const;
60 
61     int SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
62         Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite = true);
63 
64     virtual int DeleteLocalKvData(const Key &key, SingleVerNaturalStoreCommitNotifyData *committedData, Value &value,
65         Timestamp &timestamp);
66 
67     // delete a row data by hashKey, with no tombstone left.
68     int EraseSyncData(const Key &hashKey);
69 
70     int RemoveDeviceData(const std::string &deviceName);
71 
72     int RemoveDeviceData(const std::string &deviceName, ClearMode mode);
73 
74     int RemoveDeviceData(const std::string &deviceName, const std::string &user, ClearMode mode);
75 
76     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify, uint64_t recordVersion) const;
77 
78     void InitCurrentMaxStamp(Timestamp &maxStamp);
79 
80     void ReleaseContinueStatement();
81 
82     int GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin,
83         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
84     int GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength, Timestamp begin,
85         Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const;
86 
87     int GetUnSyncTotalByTimestamp(Timestamp begin, Timestamp end, uint32_t &total) const;
88 
89     int GetDeletedSyncTotalByTimestamp(Timestamp begin, Timestamp end, uint32_t &total) const;
90 
91     int GetSyncTotalWithQuery(QueryObject &query, const std::pair<Timestamp, Timestamp> &timeRange,
92         uint32_t &total) const;
93 
94     int GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier);
95 
96     int OpenResultSet(const Key &keyPrefix, int &count);
97 
98     int OpenResultSet(QueryObject &queryObj, int &count);
99 
100     int OpenResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
101         uint32_t cacheLimit, int &count);
102 
103     int OpenResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
104         uint32_t cacheLimit, int &count);
105 
106     int ReloadResultSet(const Key &keyPrefix);
107 
108     int ReloadResultSet(QueryObject &queryObj);
109 
110     int ReloadResultSetForCacheRowIdMode(const Key &keyPrefix, std::vector<int64_t> &rowIdCache,
111         uint32_t cacheLimit, uint32_t cacheStartPos);
112 
113     int ReloadResultSetForCacheRowIdMode(QueryObject &queryObj, std::vector<int64_t> &rowIdCache,
114         uint32_t cacheLimit, uint32_t cacheStartPos);
115 
116     int GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy);
117 
118     int GetEntryByRowId(int64_t rowId, Entry &entry);
119 
120     void CloseResultSet();
121 
122     int StartTransaction(TransactType type);
123 
124     int Commit();
125 
126     int Rollback();
127 
128     bool CheckIfKeyExisted(const Key &key, bool isLocal, Value &value, Timestamp &timestamp) const;
129 
130     int PrepareForSavingData(SingleVerDataType type);
131 
132     int ResetForSavingData(SingleVerDataType type);
133 
134     int Reset() override;
135 
136     int UpdateLocalDataTimestamp(Timestamp timestamp);
137 
138     void SetAttachMetaMode(bool attachMetaMode);
139 
140     int PutLocalDataToCacheDB(const LocalDataItem &dataItem) const;
141 
142     int SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo, Timestamp &maxStamp,
143         uint64_t recordVersion, const QueryObject &query);
144 
145     int PrepareForSavingCacheData(SingleVerDataType type);
146     int ResetForSavingCacheData(SingleVerDataType type);
147 
148     int MigrateLocalData();
149 
150     int MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
151         std::vector<DataItem> &dataItems);
152     int GetMinVersionCacheData(std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const;
153 
154     int GetMaxVersionInCacheDb(uint64_t &maxVersion) const;
155     int AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
156         const std::string &attachDbAbsPath, EngineState engineState);
157 
158     // Clear migrating data.
159     void ClearMigrateData();
160 
161     // Get current max timestamp.
162     int GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const;
163 
164     void SetConflictResolvePolicy(int policy);
165 
166     // Delete multiple meta data records in a transaction.
167     int DeleteMetaData(const std::vector<Key> &keys);
168     // Delete multiple meta data records with key prefix in a transaction.
169     int DeleteMetaDataByPrefixKey(const Key &keyPrefix);
170 
171     int CheckIntegrity() const;
172 
173     int CheckQueryObjectLegal(QueryObject &queryObj) const;
174 
175     int CheckDataWithQuery(QueryObject query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
176 
177     static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen);
178 
179     int AddSubscribeTrigger(QueryObject &query, const std::string &subscribeId);
180 
181     int RemoveSubscribeTrigger(const std::vector<std::string> &subscribeIds);
182 
183     int RemoveSubscribeTriggerWaterMark(const std::vector<std::string> &subscribeIds);
184 
185     int GetTriggers(const std::string &namePreFix, std::vector<std::string> &triggerNames);
186 
187     int RemoveTrigger(const std::vector<std::string> &triggers);
188 
189     int GetSyncDataWithQuery(const QueryObject &query, size_t appendLength, const DataSizeSpecInfo &dataSizeInfo,
190         const std::pair<Timestamp, Timestamp> &timeRange, std::vector<DataItem> &dataItems) const;
191 
192     int ForceCheckPoint() const;
193 
194     uint64_t GetLogFileSize() const;
195 
196     int GetExistsDevicesFromMeta(std::set<std::string> &devices);
197 
198     int UpdateKey(const UpdateKeyCallback &callback);
199 
200     int CreateCloudLogTable();
201 
202     int GetEntries(const std::string &device, std::vector<Entry> &entries) const;
203 
204     int ClearCloudWatermark();
205 protected:
206     virtual int SaveKvData(SingleVerDataType type, const Key &key, const Value &value, Timestamp timestamp);
207 
208     virtual int DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
209         const Key &key, const Value &value);
210 
211 private:
212     struct SaveRecordStatements {
213         sqlite3_stmt *queryStatement = nullptr;
214         sqlite3_stmt *insertStatement = nullptr;
215         sqlite3_stmt *updateStatement = nullptr;
216 
217         int ResetStatement();
218 
GetDataSaveStatementSaveRecordStatements219         inline sqlite3_stmt *GetDataSaveStatement(bool isUpdate) const
220         {
221             return isUpdate ? updateStatement : insertStatement;
222         }
223     };
224 
225     struct UpdateContext {
226         int errCode = E_OK;
227         Key newKey;
228         UpdateKeyCallback callback;
229     };
230 
231     void PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet, const DataOperStatus &status,
232         SingleVerNaturalStoreCommitNotifyData *committedData);
233 
234     int BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const Key &hashKey,
235         const SyncDataDevices &devices, bool isUpdate);
236 
237     static int BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem, const std::string &origDev,
238         const std::string &deviceName);
239 
240     static void PutConflictData(const DataItem &itemPut, const DataItem &itemGet, const DeviceInfo &deviceInfo,
241         const DataOperStatus &dataStatus, SingleVerNaturalStoreCommitNotifyData *commitData);
242 
243     DataOperStatus JudgeSyncSaveType(DataItem &dataItem, const DataItem &itemGet,
244         const DeviceInfo &deviceInfo, bool isHashKeyExisted, bool isPermitForceWrite = true);
245 
246     static std::string GetOriginDevName(const DataItem &dataItem, const std::string &origDevGet);
247 
248     int GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet, Key &hashKey) const;
249 
250     int GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet, const DataOperStatus &dataStatus) const;
251 
252     int GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const;
253 
254     int PrepareForSyncDataByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement, bool getDeletedData = false)
255         const;
256 
257     int PrepareForUnSyncTotalByTime(Timestamp begin, Timestamp end, sqlite3_stmt *&statement,
258         bool getDeletedData = false) const;
259 
260     int GetCountValue(sqlite3_stmt *&countStatement, uint32_t &total) const;
261 
262     int StepForResultEntries(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries) const;
263 
264     int InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt);
265 
266     int InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt);
267 
268     int InitResultSetContent(QueryObject &queryObj);
269 
270     int InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt);
271 
272     int GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const;
273 
274     int BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value, Timestamp timestamp,
275         SingleVerDataType type);
276 
277     int SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey, const std::string &origDev,
278         const std::string &deviceName, bool isUpdate);
279 
280     int PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
281         const std::string &updateSql, SaveRecordStatements &statements) const;
282 
283     int OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count);
284 
285     int ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
286         uint32_t cacheStartPos, int &count);
287 
288     void FinalizeAllStatements();
289     int ResetSaveSyncStatements(int errCode);
290 
291     int BindSyncDataInCacheMode(sqlite3_stmt *statement,
292         const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
293 
294     int BindPrimaryKeySyncDataInCacheMode(
295         sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const;
296 
297     int BindTimestampSyncDataInCacheMode(sqlite3_stmt *statement, const DataItem &dataItem) const;
298 
299     int BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
300         const std::string &origDev, const std::string &deviceName) const;
301 
302     int SaveSyncDataToCacheDatabase(const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const;
303 
304     int GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
305         uint64_t &verInCurCacheDb, bool isCacheDb) const;
306     int GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
307         uint64_t &verInCurCacheDb, bool isCacheDb) const;
308     int DelCacheDbDataByVersion(uint64_t version) const;
309 
310     // use for migrating data
311     int BindLocalDataInCacheMode(sqlite3_stmt *statement, const LocalDataItem &dataItem) const;
312 
313     // Process timestamp for syncdata in cacheDB when migrating.
314     int ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems);
315 
316     // Get migrateTimeOffset_.
317     int InitMigrateTimestampOffset();
318 
319     // Get min timestamp of local data in sync_data, cacheDB.
320     int GetMinTimestampInCacheDB(Timestamp &minStamp) const;
321 
322     // Prepare conflict notify and commit notify data.
323     int PrepareForNotifyConflictAndObserver(DataItem &dataItem, const DeviceInfo &deviceInfo,
324         NotifyConflictAndObserverData &notify, bool isPermitForceWrite = true);
325 
326     // Put observer and conflict data into commit notify when migrating cacheDB.
327     int PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem, const DeviceInfo &deviceInfo,
328         NotifyConflictAndObserverData &notify, bool isPermitForceWrite);
329 
330     int MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData);
331 
332     int MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData);
333 
334     int GetEntriesForNotifyRemoveDevData(const DataItem &item, std::vector<Entry> &entries) const;
335 
336     // Reset migrateSyncStatements_.
337     int ResetForMigrateCacheData();
338 
339     // Init migrating data.
340     int InitMigrateData();
341 
342     int MigrateRmDevData(const DataItem &dataItem) const;
343     int VacuumLocalData() const;
344 
345     int GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
346         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const;
347 
348     int GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
349         size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const;
350 
351     int CheckMissQueryDataItems(sqlite3_stmt *&stmt, const SqliteQueryHelper &helper, const DeviceInfo &deviceInfo,
352         std::vector<DataItem> &dataItems);
353 
354     int CheckDataWithQuery(std::vector<DataItem> &dataItems);
355 
356     int GetExpandedCheckSql(QueryObject query, DataItem &dataItem);
357 
358     int CheckMissQueryDataItem(sqlite3_stmt *stmt, const std::string &deviceName, DataItem &item);
359 
360     int CreateFuncUpdateKey(UpdateContext &context,
361         void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
362         void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const;
363 
364     static void Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv);
365 
366     static void CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv);
367 
368     bool IsPrintTimestamp();
369 
370     int BindSyncDataTime(sqlite3_stmt *statement, const DataItem &dataItem, bool isUpdate);
371 
372     int CloudExcuteRemoveOrUpdate(const std::string &sql, const std::string &deviceName, const std::string &user,
373         bool isUserBlobType = false);
374 
375     int CloudCheckDataExist(const std::string &sql, const std::string &deviceName, const std::string &user,
376         bool &isExist);
377 
378     int RemoveDeviceDataInner(ClearMode mode);
379 
380     int RemoveDeviceDataInner(const std::string &deviceName, ClearMode mode);
381 
382     int RemoveDeviceDataWithUserInner(const std::string &user, ClearMode mode);
383 
384     int RemoveDeviceDataWithUserInner(const std::string &deviceName, const std::string &user, ClearMode mode);
385 
386     int RemoveCloudUploadFlag(const std::vector<uint8_t> &hashKey);
387 
388     bool IsFromDataOwner(const DataItem &itemGet, const std::string &syncDev);
389     sqlite3_stmt *getSyncStatement_;
390     sqlite3_stmt *getResultRowIdStatement_;
391     sqlite3_stmt *getResultEntryStatement_;
392     SaveRecordStatements saveSyncStatements_;
393     SaveRecordStatements saveLocalStatements_;
394 
395     // Used for migrating sync_data.
396     SaveRecordStatements migrateSyncStatements_;
397     bool isTransactionOpen_;
398     bool attachMetaMode_; // true for attach meta mode
399     ExecutorState executorState_;
400 
401     // Max timestamp in mainDB. Used for migrating.
402     Timestamp maxTimestampInMainDB_;
403 
404     // The offset between min timestamp in cacheDB and max timestamp in mainDB. Used for migrating.
405     TimeOffset migrateTimeOffset_;
406 
407     // Migrating sync flag. When the flag is true, mainDB and cacheDB are attached, migrateSyncStatements_ is set,
408     // maxTimestampInMainDB_ and migrateTimeOffset_ is meaningful.
409     bool isSyncMigrating_;
410     int conflictResolvePolicy_;
411 
412     // Record log print count.
413     static constexpr uint64_t maxLogTimesPerSecond = 100;
414     static constexpr Timestamp printIntervalSeconds = (1000*TimeHelper::MS_TO_100_NS);
415     Timestamp startTime_ = 0;
416     uint64_t logCount_ = 0;
417     uint64_t droppedCount_ = 0;
418 };
419 } // namespace DistributedDB
420 
421 #endif // SQLITE_SINGLE_VER_STORAGE_EXECUTOR_H
422