• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef SQLITE_SINGLE_VER_NATURAL_STORE_H
16 #define SQLITE_SINGLE_VER_NATURAL_STORE_H
17 #include <atomic>
18 #include <mutex>
19 
20 #include "isyncer.h"
21 #include "kv_storage_handle.h"
22 #include "kv_store_nb_conflict_data_impl.h"
23 #include "runtime_context.h"
24 #include "single_ver_natural_store.h"
25 #include "single_ver_natural_store_commit_notify_data.h"
26 #include "sqlite_cloud_kv_store.h"
27 #include "sqlite_single_ver_continue_token.h"
28 #include "sqlite_single_ver_storage_engine.h"
29 #include "sqlite_utils.h"
30 
31 namespace DistributedDB {
32 class SQLiteSingleVerNaturalStore : public SingleVerNaturalStore, public KvStorageHandle {
33 public:
34     SQLiteSingleVerNaturalStore();
35     ~SQLiteSingleVerNaturalStore() override;
36 
37     // Delete the copy and assign constructors
38     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerNaturalStore);
39 
40     // Open the database
41     int Open(const KvDBProperties &kvDBProp) override;
42 
43     // Invoked automatically when connection count is zero
44     void Close() override;
45 
46     // Create a connection object.
47     GenericKvDBConnection *NewConnection(int &errCode) override;
48 
49     // Get interface type of this kvdb.
50     int GetInterfaceType() const override;
51 
52     // Get the interface ref-count, in order to access asynchronously.1
53     void IncRefCount() override;
54 
55     // Drop the interface ref-count.
56     void DecRefCount() override;
57 
58     // Get the identifier of this kvdb.
59     std::vector<uint8_t> GetIdentifier() const override;
60     // Get the dual tuple identifier of this kvdb.
61     std::vector<uint8_t> GetDualTupleIdentifier() const override;
62 
63     // Get interface for syncer.
64     IKvDBSyncInterface *GetSyncInterface() override;
65 
66     int GetMetaData(const Key &key, Value &value) const override;
67 
68     int GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const override;
69 
70     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
71 
72     // Delete multiple meta data records in a transaction.
73     int DeleteMetaData(const std::vector<Key> &keys) override;
74     // Delete multiple meta data records with key prefix in a transaction.
75     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
76 
77     int GetAllMetaKeys(std::vector<Key> &keys) const override;
78 
79     int GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
80         const DataSizeSpecInfo &dataSizeInfo) const override;
81 
82     int GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries,
83         ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override;
84 
85     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
86         ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const override;
87 
88     int GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
89         const DataSizeSpecInfo &dataSizeInfo) const override;
90 
91     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
92         const DataSizeSpecInfo &dataSizeInfo) const override;
93 
94     int GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const override;
95 
96     int GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange, uint32_t &total) const override;
97 
98     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
99 
100     int PutSyncDataWithQuery(const QueryObject &query, const std::vector<SingleVerKvEntry *> &entries,
101         const std::string &deviceName) override;
102 
103     void GetMaxTimestamp(Timestamp &stamp) const override;
104 
105     int Rekey(const CipherPassword &passwd) override;
106 
107     int Export(const std::string &filePath, const CipherPassword &passwd) override;
108 
109     int Import(const std::string &filePath, const CipherPassword &passwd,
110         bool isNeedIntegrityCheck = false) override;
111 
112     // In sync procedure, call this function
113     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
114 
115     // In local procedure, call this function
116     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify, bool isInSync);
117 
118     // remove device data for cloud
119     int RemoveDeviceData(const std::string &deviceName, ClearMode mode);
120 
121     // remove device data for cloud and user
122     int RemoveDeviceData(const std::string &deviceName, const std::string &user, ClearMode mode);
123     SQLiteSingleVerStorageExecutor *GetHandle(bool isWrite, int &errCode,
124         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
125 
126     void ReleaseHandle(SQLiteSingleVerStorageExecutor *&handle) const;
127 
128     int TransObserverTypeToRegisterFunctionType(int observerType, RegisterFuncType &type) const override;
129 
130     int TransConflictTypeToRegisterFunctionType(int conflictType, RegisterFuncType &type) const override;
131 
132     bool CheckWritePermission() const override;
133 
134     SchemaObject GetSchemaInfo() const override;
135 
136     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
137 
138     Timestamp GetCurrentTimestamp(bool needStartSync = true) override;
139 
140     SchemaObject GetSchemaObject() const;
141 
142     const SchemaObject &GetSchemaObjectConstRef() const;
143 
144     const KvDBProperties &GetDbProperties() const override;
145 
146     int GetKvDBSize(const KvDBProperties &properties, uint64_t &size) const override;
147     KvDBProperties &GetDbPropertyForUpdate();
148 
149     int InitDatabaseContext(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt = false);
150 
151     int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier);
152 
153     int SetAutoLifeCycleTime(uint32_t time);
154 
155     int GetSecurityOption(SecurityOption &option) const override;
156 
157     bool IsDataMigrating() const override;
158 
159     void SetConnectionFlag(bool isExisted) const override;
160 
161     int TriggerToMigrateData() const;
162 
163     int CheckValueAndAmendIfNeed(ValueSource sourceType, const Value &oriValue, Value &amendValue,
164         bool &useAmendValue) const;
165 
166     int CheckReadDataControlled() const;
167     bool IsCacheDBMode() const;
168     bool IsExtendedCacheDBMode() const;
169 
170     void IncreaseCacheRecordVersion() const;
171     uint64_t GetCacheRecordVersion() const;
172     uint64_t GetAndIncreaseCacheRecordVersion() const;
173 
174     void NotifyRemotePushFinished(const std::string &targetId) const override;
175 
176     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
177 
178     int CheckIntegrity() const override;
179 
180     int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override;
181     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
182 
183     // Check and init query object for query sync and subscribe, flatbuffer schema will always return E_NOT_SUPPORT.
184     // return E_OK if subscribe is legal, ERROR on exception.
185     int CheckAndInitQueryCondition(QueryObject &query) const override;
186 
187     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
188         const std::string &targetID, bool isPush) const override;
189 
190     void SetSendDataInterceptor(const PushDataInterceptor &interceptor) override;
191 
192     int AddSubscribe(const std::string &subscribeId, const QueryObject &query, bool needCacheSubscribe) override;
193 
194     int RemoveSubscribe(const std::string &subscribeId) override;
195 
196     int RemoveSubscribe(const std::vector<std::string> &subscribeIds) override;
197 
198     int SetMaxLogSize(uint64_t limit);
199 
200     uint64_t GetMaxLogSize() const;
201 
202     int SetMaxValueSize(uint32_t maxValueSize);
203 
204     uint32_t GetMaxValueSize() const override;
205 
206     void Dump(int fd) override;
207 
208     int IsSupportSubscribe() const override;
209 
210     void AbortHandle() override;
211 
212     void EnableHandle() override;
213 
214     int TryHandle() const override;
215 
216     std::pair<int, SQLiteSingleVerStorageExecutor*> GetStorageExecutor(bool isWrite) override;
217 
218     void RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor) override;
219 
220     TimeOffset GetLocalTimeOffsetForCloud() override;
221 
222     int SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema);
223 
224     int RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action);
225 
226     int UnRegisterObserverAction(const KvStoreObserver *observer);
227 
228     int GetCloudVersion(const std::string &device, std::map<std::string, std::string> &versionMap);
229 
230     void SetReceiveDataInterceptor(const DataInterceptor &interceptor) override;
231 
232     int SetCloudSyncConfig(const CloudSyncConfig &config);
233 
234     CloudSyncConfig GetCloudSyncConfig() const override;
235 
236     uint64_t GetTimestampFromDB() override;
237 
238     // for test mock
GetCloudKvStore()239     const SqliteCloudKvStore* GetCloudKvStore()
240     {
241         return sqliteCloudKvStore_;
242     }
243 
244     int OperateDataStatus(uint32_t dataOperator);
245 
246 #ifdef USE_DISTRIBUTEDDB_CLOUD
247     int ClearCloudWatermark();
248     std::function<int(void)> ClearCloudWatermarkInner();
249 #endif
250 protected:
251     void AsyncDataMigration(SQLiteSingleVerStorageEngine *storageEngine) const;
252 
253     void ReleaseResources();
254 
255     std::map<std::string, DataBaseSchema> GetDataBaseSchemas() override;
256 
257 #ifdef USE_DISTRIBUTEDDB_CLOUD
258     ICloudSyncStorageInterface *GetICloudSyncInterface() const override;
259 
260     bool CheckSchemaSupportForCloudSync() const override;
261 #endif
262 private:
263 
264     int CheckDatabaseRecovery(const KvDBProperties &kvDBProp);
265 
266     int RegisterNotification();
267 
268     int SaveSyncDataItems(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
269         bool checkValueContent);
270 
271     int InitStorageEngine(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt);
272 
273     void InitialLocalDataTimestamp();
274 
275     int GetSchema(SchemaObject &schema) const;
276 
277     static void InitDataBaseOption(const KvDBProperties &kvDBProp, OpenDbProperties &option);
278 
279     static int SetUserVer(const KvDBProperties &kvDBProp, int version);
280 
281     void NotifyRemovedData(std::vector<Entry> &entries);
282 
283     // Decide read only based on schema situation
284     int DecideReadOnlyBaseOnSchema(const KvDBProperties &kvDBProp, bool &isReadOnly,
285         SchemaObject &savedSchemaObj) const;
286 
287     void HeartBeatForLifeCycle() const;
288 
289     int StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier) const;
290 
291     int ResetLifeCycleTimer() const;
292 
293     int StopLifeCycleTimer() const;
294     void InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *committedData);
295 
296     // Change value that should be amended, and neglect value that is incompatible
297     void CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const;
298 
299     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify) const;
300 
301     int RemoveDeviceDataNormally(const std::string &hashDev, bool isNeedNotify);
302 
303     int SaveSyncDataToMain(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
304 
305     // Currently, this function only suitable to be call from sync in insert_record_from_sync procedure
306     // Take attention if future coder attempt to call it in other situation procedure
307     int SaveSyncItems(const QueryObject& query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
308         Timestamp &maxTimestamp, SingleVerNaturalStoreCommitNotifyData *commitData) const;
309 
310     int SaveSyncDataToCacheDB(const QueryObject &query, std::vector<DataItem> &dataItems,
311         const DeviceInfo &deviceInfo);
312 
313     int SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor *handle, const QueryObject &query,
314         std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, Timestamp &maxTimestamp) const;
315 
316     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerContinueToken *&continueStmtToken,
317         const DataSizeSpecInfo &dataSizeInfo) const;
318 
319     int SaveCreateDBTime();
320     int SaveCreateDBTimeIfNotExisted();
321 
322     virtual int GetAndInitStorageEngine(const KvDBProperties &kvDBProp);
323 
324     int RemoveAllSubscribe();
325 
326     int GetExistsDeviceList(std::set<std::string> &devices) const;
327 
328     int EraseAllDeviceWaterMark(const std::string &hashDev);
329 
330     std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify);
331 
332     std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode);
333 
334     std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, const std::string &user, ClearMode mode);
335 
336     void GetAndResizeLocalIdentity(std::string &outTarget) const;
337 
338     DECLARE_OBJECT_TAG(SQLiteSingleVerNaturalStore);
339 
340     mutable std::shared_mutex engineMutex_;
341     SQLiteSingleVerStorageEngine *storageEngine_;
342 
343     bool notificationEventsRegistered_;
344     bool notificationConflictEventsRegistered_;
345     bool isInitialized_;
346     bool isReadOnly_;
347     mutable std::mutex initialMutex_;
348     mutable std::mutex lifeCycleMutex_;
349     mutable DatabaseLifeCycleNotifier lifeCycleNotifier_;
350     mutable TimerId lifeTimerId_;
351     uint32_t autoLifeTime_;
352     mutable Timestamp createDBTime_;
353     mutable std::mutex createDBTimeMutex_;
354 
355     mutable std::shared_mutex dataInterceptorMutex_;
356     PushDataInterceptor pushDataInterceptor_;
357     DataInterceptor receiveDataInterceptor_;
358 
359     std::atomic<uint64_t> maxLogSize_;
360 
361     mutable std::shared_mutex abortHandleMutex_;
362     OperatePerm abortPerm_;
363 
364     mutable std::mutex cloudStoreMutex_;
365     SqliteCloudKvStore *sqliteCloudKvStore_;
366 
367     Timestamp lastLocalSysTime_ = 0ULL;
368 };
369 } // namespace DistributedDB
370 #endif // SQLITE_SINGLE_VER_NATURAL_STORE_H
371