1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef STORAGE_ENGINE_H 17 #define STORAGE_ENGINE_H 18 19 #include <condition_variable> 20 #include <list> 21 #include <mutex> 22 #include <shared_mutex> 23 24 #include "db_types.h" 25 #include "macro_utils.h" 26 #include "sqlite_utils.h" 27 #include "storage_executor.h" 28 #include "kvdb_commit_notify_filterable_data.h" 29 30 namespace DistributedDB { 31 struct StorageEngineAttr { 32 uint32_t minWriteNum = 1; 33 uint32_t maxWriteNum = 1; 34 uint32_t minReadNum = 1; 35 uint32_t maxReadNum = 1; 36 }; 37 38 class StorageEngine : public RefObject { 39 public: 40 StorageEngine(); 41 ~StorageEngine() override; 42 43 // Delete the copy and assign constructors 44 DISABLE_COPY_ASSIGN_MOVE(StorageEngine); 45 46 int Init(bool isEnhance = false); 47 48 virtual int ReInit(); 49 50 StorageExecutor *FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal = false, 51 int waitTime = MAX_WAIT_TIME); 52 53 void Recycle(StorageExecutor *&handle, bool isExternal = false); 54 55 virtual bool IsEngineCorrupted() const; 56 57 void Release(); 58 59 int TryToDisable(bool isNeedCheckAll, OperatePerm disableType = OperatePerm::DISABLE_PERM); 60 61 void Enable(OperatePerm enableType = OperatePerm::NORMAL_PERM); 62 63 void Abort(OperatePerm enableType = OperatePerm::NORMAL_PERM); 64 65 virtual bool IsNeedTobeReleased() const; 66 67 virtual const std::string &GetIdentifier() const; 68 69 EngineState GetEngineState() const; 70 71 void SetEngineState(EngineState state); 72 73 virtual int ExecuteMigrate(); 74 75 virtual void SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback); 76 77 void SetConnectionFlag(bool isExisted); 78 79 bool IsExistConnection() const; 80 81 virtual int CheckEngineOption(const KvDBProperties &kvdbOption) const; 82 83 virtual bool IsMigrating() const; 84 85 void WaitWriteHandleIdle(); 86 87 virtual void IncreaseCacheRecordVersion(); 88 virtual uint64_t GetCacheRecordVersion() const; 89 virtual uint64_t GetAndIncreaseCacheRecordVersion(); 90 91 virtual void SetSchemaChangedCallback(const std::function<int(void)> &callback); 92 93 void CloseAllExecutor(); 94 95 int InitAllReadWriteExecutor(); 96 97 OpenDbProperties GetOption() const; 98 99 protected: 100 virtual int CreateNewExecutor(bool isWrite, StorageExecutor *&handle) = 0; 101 102 void CloseExecutor(); 103 104 virtual void AddStorageExecutor(StorageExecutor *handle, bool isExternal); 105 106 static bool CheckEngineAttr(const StorageEngineAttr &poolSize); 107 108 int InitReadWriteExecutors(); 109 110 void SetUri(const std::string &uri); 111 void SetSQL(const std::vector<std::string> &sql); 112 void SetSecurityOption(const SecurityOption &option); 113 void SetCreateIfNecessary(bool isCreateIfNecessary); 114 115 mutable std::mutex optionMutex_; 116 OpenDbProperties option_; 117 118 StorageEngineAttr engineAttr_; 119 bool isUpdated_; 120 std::atomic<bool> isMigrating_; 121 std::string identifier_; 122 std::string hashIdentifier_; 123 124 // Mutex for commitNotifyFunc_. 125 mutable std::shared_mutex notifyMutex_; 126 127 // Callback function for commit notify. 128 std::function<void(int, KvDBCommitNotifyFilterAbleData *)> commitNotifyFunc_; 129 130 // Mutex for schemaChangedFunc_. 131 mutable std::shared_mutex schemaChangedMutex_; 132 133 // Callback function for schema changed. 134 std::function<int(void)> schemaChangedFunc_; 135 136 bool isSchemaChanged_; 137 138 bool isEnhance_; 139 140 private: 141 StorageExecutor *FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList, 142 std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal = false); 143 144 StorageExecutor *FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); 145 StorageExecutor *FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); 146 147 StorageExecutor *FetchReadStorageExecutor(int &errCode, bool isExternal, bool isNeedCreate); 148 149 virtual void ClearCorruptedFlag(); 150 151 void PrintDbFileMsg(bool isOpen); 152 153 static const int MAX_WAIT_TIME; 154 static const int MAX_WRITE_SIZE; 155 static const int MAX_READ_SIZE; 156 157 std::mutex initMutex_; 158 std::condition_variable initCondition_; 159 std::atomic<bool> isInitialized_; 160 OperatePerm perm_; 161 bool operateAbort_; 162 163 std::mutex readMutex_; 164 std::mutex writeMutex_; 165 std::condition_variable writeCondition_; 166 std::condition_variable readCondition_; 167 std::list<StorageExecutor *> writeUsingList_; 168 std::list<StorageExecutor *> writeIdleList_; 169 std::list<StorageExecutor *> readUsingList_; 170 std::list<StorageExecutor *> readIdleList_; 171 std::list<StorageExecutor *> externalWriteUsingList_; 172 std::list<StorageExecutor *> externalWriteIdleList_; 173 std::list<StorageExecutor *> externalReadUsingList_; 174 std::list<StorageExecutor *> externalReadIdleList_; 175 std::atomic<bool> isExistConnection_; 176 177 std::mutex idleMutex_; 178 std::condition_variable idleCondition_; 179 180 std::atomic<int> readPendingCount_; 181 std::atomic<int> externalReadPendingCount_; 182 183 EngineState engineState_; 184 }; 185 } // namespace DistributedDB 186 #endif // STORAGE_ENGINE_H 187