1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SQLITE_SINGLE_VER_NATURAL_STORE_H 16 #define SQLITE_SINGLE_VER_NATURAL_STORE_H 17 #include <atomic> 18 #include <mutex> 19 20 #include "isyncer.h" 21 #include "kv_storage_handle.h" 22 #include "kv_store_nb_conflict_data_impl.h" 23 #include "runtime_context.h" 24 #include "single_ver_natural_store.h" 25 #include "single_ver_natural_store_commit_notify_data.h" 26 #include "sqlite_cloud_kv_store.h" 27 #include "sqlite_single_ver_continue_token.h" 28 #include "sqlite_single_ver_storage_engine.h" 29 #include "sqlite_utils.h" 30 31 namespace DistributedDB { 32 class SQLiteSingleVerNaturalStore : public SingleVerNaturalStore, public KvStorageHandle { 33 public: 34 SQLiteSingleVerNaturalStore(); 35 ~SQLiteSingleVerNaturalStore() override; 36 37 // Delete the copy and assign constructors 38 DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerNaturalStore); 39 40 // Open the database 41 int Open(const KvDBProperties &kvDBProp) override; 42 43 // Invoked automatically when connection count is zero 44 void Close() override; 45 46 // Create a connection object. 47 GenericKvDBConnection *NewConnection(int &errCode) override; 48 49 // Get interface type of this kvdb. 50 int GetInterfaceType() const override; 51 52 // Get the interface ref-count, in order to access asynchronously.1 53 void IncRefCount() override; 54 55 // Drop the interface ref-count. 56 void DecRefCount() override; 57 58 // Get the identifier of this kvdb. 59 std::vector<uint8_t> GetIdentifier() const override; 60 // Get the dual tuple identifier of this kvdb. 61 std::vector<uint8_t> GetDualTupleIdentifier() const override; 62 63 // Get interface for syncer. 64 IKvDBSyncInterface *GetSyncInterface() override; 65 66 int GetMetaData(const Key &key, Value &value) const override; 67 68 int GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const override; 69 70 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 71 72 // Delete multiple meta data records in a transaction. 73 int DeleteMetaData(const std::vector<Key> &keys) override; 74 // Delete multiple meta data records with key prefix in a transaction. 75 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 76 77 int GetAllMetaKeys(std::vector<Key> &keys) const override; 78 79 int GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken, 80 const DataSizeSpecInfo &dataSizeInfo) const override; 81 82 int GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries, 83 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override; 84 85 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo, 86 ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const override; 87 88 int GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken, 89 const DataSizeSpecInfo &dataSizeInfo) const override; 90 91 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 92 const DataSizeSpecInfo &dataSizeInfo) const override; 93 94 int GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const override; 95 96 int GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange, uint32_t &total) const override; 97 98 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 99 100 int PutSyncDataWithQuery(const QueryObject &query, const std::vector<SingleVerKvEntry *> &entries, 101 const std::string &deviceName) override; 102 103 void GetMaxTimestamp(Timestamp &stamp) const override; 104 105 int Rekey(const CipherPassword &passwd) override; 106 107 int Export(const std::string &filePath, const CipherPassword &passwd) override; 108 109 int Import(const std::string &filePath, const CipherPassword &passwd, 110 bool isNeedIntegrityCheck = false) override; 111 112 // In sync procedure, call this function 113 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 114 115 // In local procedure, call this function 116 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify, bool isInSync); 117 118 // remove device data for cloud 119 int RemoveDeviceData(const std::string &deviceName, ClearMode mode); 120 121 // remove device data for cloud and user 122 int RemoveDeviceData(const std::string &deviceName, const std::string &user, ClearMode mode); 123 SQLiteSingleVerStorageExecutor *GetHandle(bool isWrite, int &errCode, 124 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 125 126 void ReleaseHandle(SQLiteSingleVerStorageExecutor *&handle) const; 127 128 int TransObserverTypeToRegisterFunctionType(int observerType, RegisterFuncType &type) const override; 129 130 int TransConflictTypeToRegisterFunctionType(int conflictType, RegisterFuncType &type) const override; 131 132 bool CheckWritePermission() const override; 133 134 SchemaObject GetSchemaInfo() const override; 135 136 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 137 138 Timestamp GetCurrentTimestamp(bool needStartSync = true) override; 139 140 SchemaObject GetSchemaObject() const; 141 142 const SchemaObject &GetSchemaObjectConstRef() const; 143 144 const KvDBProperties &GetDbProperties() const override; 145 146 int GetKvDBSize(const KvDBProperties &properties, uint64_t &size) const override; 147 KvDBProperties &GetDbPropertyForUpdate(); 148 149 int InitDatabaseContext(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt = false); 150 151 int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier); 152 153 int SetAutoLifeCycleTime(uint32_t time); 154 155 int GetSecurityOption(SecurityOption &option) const override; 156 157 bool IsDataMigrating() const override; 158 159 void SetConnectionFlag(bool isExisted) const override; 160 161 int TriggerToMigrateData() const; 162 163 int CheckValueAndAmendIfNeed(ValueSource sourceType, const Value &oriValue, Value &amendValue, 164 bool &useAmendValue) const; 165 166 int CheckReadDataControlled() const; 167 bool IsCacheDBMode() const; 168 bool IsExtendedCacheDBMode() const; 169 170 void IncreaseCacheRecordVersion() const; 171 uint64_t GetCacheRecordVersion() const; 172 uint64_t GetAndIncreaseCacheRecordVersion() const; 173 174 void NotifyRemotePushFinished(const std::string &targetId) const override; 175 176 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 177 178 int CheckIntegrity() const override; 179 180 int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override; 181 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 182 183 // Check and init query object for query sync and subscribe, flatbuffer schema will always return E_NOT_SUPPORT. 184 // return E_OK if subscribe is legal, ERROR on exception. 185 int CheckAndInitQueryCondition(QueryObject &query) const override; 186 187 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 188 const std::string &targetID, bool isPush) const override; 189 190 void SetSendDataInterceptor(const PushDataInterceptor &interceptor) override; 191 192 int AddSubscribe(const std::string &subscribeId, const QueryObject &query, bool needCacheSubscribe) override; 193 194 int RemoveSubscribe(const std::string &subscribeId) override; 195 196 int RemoveSubscribe(const std::vector<std::string> &subscribeIds) override; 197 198 int SetMaxLogSize(uint64_t limit); 199 200 uint64_t GetMaxLogSize() const; 201 202 int SetMaxValueSize(uint32_t maxValueSize); 203 204 uint32_t GetMaxValueSize() const override; 205 206 void Dump(int fd) override; 207 208 int IsSupportSubscribe() const override; 209 210 void AbortHandle() override; 211 212 void EnableHandle() override; 213 214 int TryHandle() const override; 215 216 std::pair<int, SQLiteSingleVerStorageExecutor*> GetStorageExecutor(bool isWrite) override; 217 218 void RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor) override; 219 220 TimeOffset GetLocalTimeOffsetForCloud() override; 221 222 int SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema); 223 224 int RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action); 225 226 int UnRegisterObserverAction(const KvStoreObserver *observer); 227 228 int GetCloudVersion(const std::string &device, std::map<std::string, std::string> &versionMap); 229 230 void SetReceiveDataInterceptor(const DataInterceptor &interceptor) override; 231 232 int SetCloudSyncConfig(const CloudSyncConfig &config); 233 234 CloudSyncConfig GetCloudSyncConfig() const override; 235 236 uint64_t GetTimestampFromDB() override; 237 238 // for test mock GetCloudKvStore()239 const SqliteCloudKvStore* GetCloudKvStore() 240 { 241 return sqliteCloudKvStore_; 242 } 243 244 int OperateDataStatus(uint32_t dataOperator); 245 246 #ifdef USE_DISTRIBUTEDDB_CLOUD 247 int ClearCloudWatermark(); 248 std::function<int(void)> ClearCloudWatermarkInner(); 249 #endif 250 protected: 251 void AsyncDataMigration(SQLiteSingleVerStorageEngine *storageEngine) const; 252 253 void ReleaseResources(); 254 255 std::map<std::string, DataBaseSchema> GetDataBaseSchemas() override; 256 257 #ifdef USE_DISTRIBUTEDDB_CLOUD 258 ICloudSyncStorageInterface *GetICloudSyncInterface() const override; 259 260 bool CheckSchemaSupportForCloudSync() const override; 261 #endif 262 private: 263 264 int CheckDatabaseRecovery(const KvDBProperties &kvDBProp); 265 266 int RegisterNotification(); 267 268 int SaveSyncDataItems(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, 269 bool checkValueContent); 270 271 int InitStorageEngine(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt); 272 273 void InitialLocalDataTimestamp(); 274 275 int GetSchema(SchemaObject &schema) const; 276 277 static void InitDataBaseOption(const KvDBProperties &kvDBProp, OpenDbProperties &option); 278 279 static int SetUserVer(const KvDBProperties &kvDBProp, int version); 280 281 void NotifyRemovedData(std::vector<Entry> &entries); 282 283 // Decide read only based on schema situation 284 int DecideReadOnlyBaseOnSchema(const KvDBProperties &kvDBProp, bool &isReadOnly, 285 SchemaObject &savedSchemaObj) const; 286 287 void HeartBeatForLifeCycle() const; 288 289 int StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier) const; 290 291 int ResetLifeCycleTimer() const; 292 293 int StopLifeCycleTimer() const; 294 void InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *committedData); 295 296 // Change value that should be amended, and neglect value that is incompatible 297 void CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const; 298 299 int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify) const; 300 301 int RemoveDeviceDataNormally(const std::string &hashDev, bool isNeedNotify); 302 303 int SaveSyncDataToMain(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo); 304 305 // Currently, this function only suitable to be call from sync in insert_record_from_sync procedure 306 // Take attention if future coder attempt to call it in other situation procedure 307 int SaveSyncItems(const QueryObject& query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, 308 Timestamp &maxTimestamp, SingleVerNaturalStoreCommitNotifyData *commitData) const; 309 310 int SaveSyncDataToCacheDB(const QueryObject &query, std::vector<DataItem> &dataItems, 311 const DeviceInfo &deviceInfo); 312 313 int SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor *handle, const QueryObject &query, 314 std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, Timestamp &maxTimestamp) const; 315 316 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerContinueToken *&continueStmtToken, 317 const DataSizeSpecInfo &dataSizeInfo) const; 318 319 int SaveCreateDBTime(); 320 int SaveCreateDBTimeIfNotExisted(); 321 322 virtual int GetAndInitStorageEngine(const KvDBProperties &kvDBProp); 323 324 int RemoveAllSubscribe(); 325 326 int GetExistsDeviceList(std::set<std::string> &devices) const; 327 328 int EraseAllDeviceWaterMark(const std::string &hashDev); 329 330 std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify); 331 332 std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode); 333 334 std::function<int(void)> RemoveDeviceDataInner(const std::string &hashDev, const std::string &user, ClearMode mode); 335 336 void GetAndResizeLocalIdentity(std::string &outTarget) const; 337 338 DECLARE_OBJECT_TAG(SQLiteSingleVerNaturalStore); 339 340 mutable std::shared_mutex engineMutex_; 341 SQLiteSingleVerStorageEngine *storageEngine_; 342 343 bool notificationEventsRegistered_; 344 bool notificationConflictEventsRegistered_; 345 bool isInitialized_; 346 bool isReadOnly_; 347 mutable std::mutex initialMutex_; 348 mutable std::mutex lifeCycleMutex_; 349 mutable DatabaseLifeCycleNotifier lifeCycleNotifier_; 350 mutable TimerId lifeTimerId_; 351 uint32_t autoLifeTime_; 352 mutable Timestamp createDBTime_; 353 mutable std::mutex createDBTimeMutex_; 354 355 mutable std::shared_mutex dataInterceptorMutex_; 356 PushDataInterceptor pushDataInterceptor_; 357 DataInterceptor receiveDataInterceptor_; 358 359 std::atomic<uint64_t> maxLogSize_; 360 361 mutable std::shared_mutex abortHandleMutex_; 362 OperatePerm abortPerm_; 363 364 mutable std::mutex cloudStoreMutex_; 365 SqliteCloudKvStore *sqliteCloudKvStore_; 366 367 Timestamp lastLocalSysTime_ = 0ULL; 368 }; 369 } // namespace DistributedDB 370 #endif // SQLITE_SINGLE_VER_NATURAL_STORE_H 371