• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef SQLITE_SINGLE_VER_NATURAL_STORE_H
16 #define SQLITE_SINGLE_VER_NATURAL_STORE_H
17 #include <atomic>
18 #include <mutex>
19 
20 #include "sync_able_kvdb.h"
21 #include "sqlite_single_ver_storage_engine.h"
22 #include "sqlite_utils.h"
23 #include "isyncer.h"
24 #include "single_ver_natural_store_commit_notify_data.h"
25 #include "single_ver_kvdb_sync_interface.h"
26 #include "kv_store_nb_conflict_data_impl.h"
27 #include "runtime_context.h"
28 #include "sqlite_single_ver_continue_token.h"
29 
30 namespace DistributedDB {
31 class SQLiteSingleVerNaturalStore : public SyncAbleKvDB, public SingleVerKvDBSyncInterface {
32 public:
33     SQLiteSingleVerNaturalStore();
34     ~SQLiteSingleVerNaturalStore() override;
35 
36     // Delete the copy and assign constructors
37     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerNaturalStore);
38 
39     // Open the database
40     int Open(const KvDBProperties &kvDBProp) override;
41 
42     // Invoked automatically when connection count is zero
43     void Close() override;
44 
45     // Create a connection object.
46     GenericKvDBConnection *NewConnection(int &errCode) override;
47 
48     // Get interface type of this kvdb.
49     int GetInterfaceType() const override;
50 
51     // Get the interface ref-count, in order to access asynchronously.
52     void IncRefCount() override;
53 
54     // Drop the interface ref-count.
55     void DecRefCount() override;
56 
57     // Get the identifier of this kvdb.
58     std::vector<uint8_t> GetIdentifier() const override;
59     // Get the dual tuple identifier of this kvdb.
60     std::vector<uint8_t> GetDualTupleIdentifier() const override;
61 
62     // Get interface for syncer.
63     IKvDBSyncInterface *GetSyncInterface() override;
64 
65     int GetMetaData(const Key &key, Value &value) const override;
66 
67     int PutMetaData(const Key &key, const Value &value) override;
68 
69     // Delete multiple meta data records in a transaction.
70     int DeleteMetaData(const std::vector<Key> &keys) override;
71     // Delete multiple meta data records with key prefix in a transaction.
72     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
73 
74     int GetAllMetaKeys(std::vector<Key> &keys) const override;
75 
76     int GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
77         const DataSizeSpecInfo &dataSizeInfo) const override;
78 
79     int GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries,
80         ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override;
81 
82     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
83         ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const override;
84 
85     int GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
86         const DataSizeSpecInfo &dataSizeInfo) const override;
87 
88     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
89         const DataSizeSpecInfo &dataSizeInfo) const override;
90 
91     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
92 
93     int PutSyncDataWithQuery(const QueryObject &query, const std::vector<SingleVerKvEntry *> &entries,
94         const std::string &deviceName) override;
95 
96     void GetMaxTimestamp(Timestamp &stamp) const override;
97 
98     int SetMaxTimestamp(Timestamp timestamp);
99 
100     int Rekey(const CipherPassword &passwd) override;
101 
102     int Export(const std::string &filePath, const CipherPassword &passwd) override;
103 
104     int Import(const std::string &filePath, const CipherPassword &passwd) override;
105 
106     // In sync procedure, call this function
107     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
108 
109     // In local procedure, call this function
110     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify, bool isInSync);
111 
112     SQLiteSingleVerStorageExecutor *GetHandle(bool isWrite, int &errCode,
113         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
114 
115     void ReleaseHandle(SQLiteSingleVerStorageExecutor *&handle) const;
116 
117     int TransObserverTypeToRegisterFunctionType(int observerType, RegisterFuncType &type) const override;
118 
119     int TransConflictTypeToRegisterFunctionType(int conflictType, RegisterFuncType &type) const override;
120 
121     bool CheckWritePermission() const override;
122 
123     SchemaObject GetSchemaInfo() const override;
124 
125     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
126 
127     Timestamp GetCurrentTimestamp();
128 
129     SchemaObject GetSchemaObject() const;
130 
131     const SchemaObject &GetSchemaObjectConstRef() const;
132 
133     const KvDBProperties &GetDbProperties() const override;
134 
135     int RemoveKvDB(const KvDBProperties &properties) override;
136 
137     int GetKvDBSize(const KvDBProperties &properties, uint64_t &size) const override;
138     KvDBProperties &GetDbPropertyForUpdate();
139 
140     int InitDatabaseContext(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt = false);
141 
142     int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier);
143 
144     int SetAutoLifeCycleTime(uint32_t time);
145 
146     int GetSecurityOption(SecurityOption &option) const override;
147 
148     bool IsDataMigrating() const override;
149 
150     void SetConnectionFlag(bool isExisted) const override;
151 
152     int TriggerToMigrateData() const;
153 
154     int CheckValueAndAmendIfNeed(ValueSource sourceType, const Value &oriValue, Value &amendValue,
155         bool &useAmendValue) const;
156 
157     int CheckReadDataControlled() const;
158     bool IsCacheDBMode() const;
159     bool IsExtendedCacheDBMode() const;
160 
161     void IncreaseCacheRecordVersion() const;
162     uint64_t GetCacheRecordVersion() const;
163     uint64_t GetAndIncreaseCacheRecordVersion() const;
164 
165     void NotifyRemotePushFinished(const std::string &targetId) const override;
166 
167     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
168 
169     int CheckIntegrity() const override;
170 
171     int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override;
172     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
173 
174     // Check and init query object for query sync and subscribe, flatbuffer schema will always return E_NOT_SUPPORT.
175     // return E_OK if subscribe is legal, ERROR on exception.
176     int CheckAndInitQueryCondition(QueryObject &query) const override;
177 
178     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
179         const std::string &targetID) const override;
180 
181     void SetDataInterceptor(const PushDataInterceptor &interceptor) override;
182 
183     int AddSubscribe(const std::string &subscribeId, const QueryObject &query, bool needCacheSubscribe) override;
184 
185     int RemoveSubscribe(const std::string &subscribeId) override;
186 
187     int RemoveSubscribe(const std::vector<std::string> &subscribeIds) override;
188 
189     int SetMaxLogSize(uint64_t limit);
190 
191     uint64_t GetMaxLogSize() const;
192 
193     void Dump(int fd) override;
194 
195 private:
196     struct TransPair {
197         int index;
198         RegisterFuncType funcType;
199     };
200     static RegisterFuncType GetFuncType(int index, const TransPair *transMap, int32_t len);
201     int CheckDatabaseRecovery(const KvDBProperties &kvDBProp);
202 
203     void CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
204         bool isNeedCommit, int eventType);
205 
206     int RegisterNotification();
207 
208     void ReleaseResources();
209 
210     void InitCurrentMaxStamp();
211 
212     int SaveSyncDataItems(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
213         bool checkValueContent);
214 
215     int InitStorageEngine(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt);
216 
217     void InitialLocalDataTimestamp();
218 
219     int GetSchema(SchemaObject &schema) const;
220 
221     static void InitDataBaseOption(const KvDBProperties &kvDBProp, OpenDbProperties &option);
222 
223     static int SetUserVer(const KvDBProperties &kvDBProp, int version);
224 
225     static std::string GetDatabasePath(const KvDBProperties &kvDBProp);
226     static std::string GetSubDirPath(const KvDBProperties &kvDBProp);
227     void NotifyRemovedData(std::vector<Entry> &entries);
228 
229     // Decide read only based on schema situation
230     int DecideReadOnlyBaseOnSchema(const KvDBProperties &kvDBProp, bool &isReadOnly,
231         SchemaObject &savedSchemaObj) const;
232 
233     void HeartBeatForLifeCycle() const;
234 
235     int StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier) const;
236 
237     int ResetLifeCycleTimer() const;
238 
239     int StopLifeCycleTimer() const;
240     void InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *committedData);
241 
242     void AsyncDataMigration() const;
243 
244     // Change value that should be amended, and neglect value that is incompatible
245     void CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const;
246 
247     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify);
248 
249     int RemoveDeviceDataNormally(const std::string &hashDev, bool isNeedNotify);
250 
251     int SaveSyncDataToMain(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
252 
253     // Currently, this function only suitable to be call from sync in insert_record_from_sync procedure
254     // Take attention if future coder attempt to call it in other situation procedure
255     int SaveSyncItems(const QueryObject& query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
256         Timestamp &maxTimestamp, SingleVerNaturalStoreCommitNotifyData *commitData) const;
257 
258     int SaveSyncDataToCacheDB(const QueryObject &query, std::vector<DataItem> &dataItems,
259         const DeviceInfo &deviceInfo);
260 
261     int SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor *handle, const QueryObject &query,
262         std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, Timestamp &maxTimestamp) const;
263 
264     int ClearIncompleteDatabase(const KvDBProperties &kvDBPro) const;
265 
266     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerContinueToken *&continueStmtToken,
267         const DataSizeSpecInfo &dataSizeInfo) const;
268 
269     int SaveCreateDBTime();
270     int SaveCreateDBTimeIfNotExisted();
271 
272     int GetAndInitStorageEngine(const KvDBProperties &kvDBProp);
273 
274     int RemoveAllSubscribe();
275 
276     int RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify, bool isInSync);
277 
278     int GetExistsDeviceList(std::set<std::string> &devices) const;
279 
280     DECLARE_OBJECT_TAG(SQLiteSingleVerNaturalStore);
281 
282     Timestamp currentMaxTimestamp_ = 0;
283 
284     mutable std::shared_mutex engineMutex_;
285     SQLiteSingleVerStorageEngine *storageEngine_;
286 
287     bool notificationEventsRegistered_;
288     bool notificationConflictEventsRegistered_;
289     bool isInitialized_;
290     bool isReadOnly_;
291     mutable std::mutex initialMutex_;
292     mutable std::mutex maxTimestampMutex_;
293     mutable std::mutex lifeCycleMutex_;
294     mutable DatabaseLifeCycleNotifier lifeCycleNotifier_;
295     mutable TimerId lifeTimerId_;
296     uint32_t autoLifeTime_;
297     mutable Timestamp createDBTime_;
298     mutable std::mutex createDBTimeMutex_;
299 
300     mutable std::shared_mutex dataInterceptorMutex_;
301     PushDataInterceptor dataInterceptor_;
302     std::atomic<uint64_t> maxLogSize_;
303 };
304 }
305 #endif
306