1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SQLITE_RELATIONAL_STORE_H 16 #define SQLITE_RELATIONAL_STORE_H 17 #ifdef RELATIONAL_STORE 18 19 #include <functional> 20 #include <memory> 21 #include <vector> 22 23 #include "irelational_store.h" 24 #include "sqlite_single_relational_storage_engine.h" 25 #include "isyncer.h" 26 #include "sync_able_engine.h" 27 #include "relational_sync_able_storage.h" 28 #include "runtime_context.h" 29 #include "cloud/cloud_syncer.h" 30 31 namespace DistributedDB { 32 using RelationalObserverAction = 33 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData, Origin origin)>; 34 class SQLiteRelationalStore : public IRelationalStore { 35 public: 36 SQLiteRelationalStore() = default; 37 ~SQLiteRelationalStore() override; 38 39 // Delete the copy and assign constructors 40 DISABLE_COPY_ASSIGN_MOVE(SQLiteRelationalStore); 41 42 RelationalStoreConnection *GetDBConnection(int &errCode) override; 43 int Open(const RelationalDBProperties &properties) override; 44 void OnClose(const std::function<void(void)> ¬ifier); 45 46 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode) const; 47 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 48 49 int Sync(const ISyncer::SyncParam &syncParam, uint64_t connectionId); 50 51 void ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection); 52 53 void WakeUpSyncer() override; 54 55 // for test mock GetStorageEngine()56 RelationalSyncAbleStorage *GetStorageEngine() 57 { 58 return storageEngine_; 59 } 60 61 int CreateDistributedTable(const std::string &tableName, TableSyncType syncType, bool trackerSchemaChanged = false); 62 63 int RemoveDeviceData(); 64 int RemoveDeviceData(const std::string &device, const std::string &tableName); 65 66 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 67 const RelationalObserverAction &action); 68 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 69 int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier); 70 71 std::string GetStorePath() const override; 72 73 RelationalDBProperties GetProperties() const override; 74 75 void StopSync(uint64_t connectionId); 76 77 void Dump(int fd) override; 78 79 int RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout, 80 uint64_t connectionId, std::shared_ptr<ResultSet> &result); 81 82 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 83 84 int ChkSchema(const TableName &tableName); 85 86 int SetTrackerTable(const TrackerSchema &trackerSchema); 87 88 int ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records); 89 90 int CleanTrackerData(const std::string &tableName, int64_t cursor); 91 92 int SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty); 93 94 std::function<int(void)> CleanWaterMark(const std::set<std::string> clearWaterMarkTables); 95 96 int Pragma(PragmaCmd cmd, PragmaData &pragmaData); 97 98 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 99 100 int SetDistributedSchema(const DistributedSchema &schema, bool isForceUpgrade); 101 102 int GetDownloadingAssetsCount(int32_t &count); 103 104 int SetTableMode(DistributedTableMode tableMode); 105 106 #ifdef USE_DISTRIBUTEDDB_CLOUD 107 int PrepareAndSetCloudDbSchema(const DataBaseSchema &schema); 108 109 int32_t GetCloudSyncTaskCount(); 110 111 int CleanCloudData(ClearMode mode); 112 113 int ClearCloudWatermark(const std::set<std::string> &tableNames); 114 115 int SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb); 116 117 int Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId); 118 119 int SetCloudSyncConfig(const CloudSyncConfig &config); 120 121 SyncProcess GetCloudTaskStatus(uint64_t taskId); 122 #endif 123 124 int OperateDataStatus(uint32_t dataOperator); 125 126 int32_t GetDeviceSyncTaskCount() const; 127 protected: 128 void ReleaseResources(); 129 130 // 1 store 1 connection 131 void DecreaseConnectionCounter(uint64_t connectionId); 132 int CheckDBMode(); 133 int GetSchemaFromMeta(RelationalSchemaObject &schema); 134 int CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet); 135 int SaveTableModeToMeta(DistributedTableMode mode); 136 int CheckProperties(RelationalDBProperties properties); 137 138 int SaveLogTableVersionToMeta(); 139 140 int CleanDistributedDeviceTable(); 141 142 int StopLifeCycleTimer(); 143 int StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier); 144 void HeartBeat(); 145 int ResetLifeCycleTimer(); 146 147 void IncreaseConnectionCounter(); 148 int InitStorageEngine(const RelationalDBProperties &properties); 149 150 int EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList); 151 152 std::string GetDevTableName(const std::string &device, const std::string &hashDev) const; 153 154 int GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 155 156 int RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device, 157 const std::string &tableName, bool isNeedHash); 158 159 int GetExistDevices(std::set<std::string> &hashDevices) const; 160 161 std::vector<std::string> GetAllDistributedTableName(TableSyncType tableSyncType = DEVICE_COOPERATION); 162 163 int CheckBeforeSync(const CloudSyncOption &option); 164 165 int CheckAssetsOnlyValid(const QuerySyncObject &querySyncObject, const CloudSyncOption &option); 166 167 int CheckQueryValid(const CloudSyncOption &option); 168 169 int CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object, bool isFromTable); 170 171 int CheckTableName(const std::vector<std::string> &tableNames); 172 173 int CleanWaterMarkInner(SQLiteSingleVerRelationalStorageExecutor *&handle, 174 const std::set<std::string> &clearWaterMarkTable); 175 176 int InitTrackerSchemaFromMeta(); 177 178 void AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields, 179 std::vector<Field> &addFields); 180 181 bool CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo, std::vector<Field> &addFields); 182 183 bool PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames, 184 std::map<std::string, std::vector<Field>> &updateTableNames, 185 std::map<std::string, std::string> &alterTableNames); 186 187 int ExecuteCreateSharedTable(const DataBaseSchema &schema); 188 189 int CheckParamForUpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 190 191 int CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records); 192 193 int InitSQLiteStorageEngine(const RelationalDBProperties &properties); 194 195 static int ReFillSyncInfoTable(const std::vector<std::string> &actualTable, CloudSyncer::CloudTaskInfo &info); 196 197 int CheckTrackerTable(const TrackerSchema &trackerSchema, TableInfo &table, bool &isNoTableInSchema, 198 bool &isFirstCreate); 199 200 int SetReferenceInner(const std::vector<TableReferenceProperty> &tableReferenceProperty, 201 std::set<std::string> &clearWaterMarkTables); 202 203 #ifdef USE_DISTRIBUTEDDB_CLOUD 204 void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, 205 CloudSyncer::CloudTaskInfo &info); 206 207 int CheckCloudSchema(const DataBaseSchema &schema); 208 #endif 209 210 int OperateDataStatusInner(const std::vector<std::string> &tables, uint64_t virtualTime) const; 211 212 void CleanDirtyLogIfNeed(const std::string &tableName) const; 213 214 RelationalSchemaObject GetSchemaObj() const; 215 // use for sync Interactive 216 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; // For storage operate sync function 217 // use ref obj same as kv 218 RelationalSyncAbleStorage *storageEngine_ = nullptr; // For storage operate data 219 std::shared_ptr<SQLiteSingleRelationalStorageEngine> sqliteStorageEngine_; 220 CloudSyncer *cloudSyncer_ = nullptr; 221 222 std::mutex connectMutex_; 223 std::atomic<int> connectionCount_ = 0; 224 std::vector<std::function<void(void)>> closeNotifiers_; 225 226 mutable std::mutex initalMutex_; 227 bool isInitialized_ = false; 228 229 // lifeCycle 230 std::mutex lifeCycleMutex_; 231 DatabaseLifeCycleNotifier lifeCycleNotifier_; 232 TimerId lifeTimerId_ {}; 233 }; 234 } // namespace DistributedDB 235 #endif 236 #endif // SQLITE_RELATIONAL_STORE_H