1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_engine_manager.h"
17 #include "log_print.h"
18 #include "db_errno.h"
19 #include "runtime_context.h"
20 #include "sqlite_single_ver_storage_engine.h"
21
22 namespace DistributedDB {
23 bool StorageEngineManager::isRegLockStatusListener_ = false;
24 std::mutex StorageEngineManager::instanceLock_;
25 std::atomic<StorageEngineManager *> StorageEngineManager::instance_{nullptr};
26 std::mutex StorageEngineManager::storageEnginesLock_;
27
28 namespace {
GetIdentifier(const KvDBProperties & property)29 std::string GetIdentifier(const KvDBProperties &property)
30 {
31 return property.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
32 }
33
GetDatabaseType(const KvDBProperties & property)34 int GetDatabaseType(const KvDBProperties &property)
35 {
36 return property.GetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::LOCAL_TYPE);
37 }
38 }
39
StorageEngineManager()40 StorageEngineManager::StorageEngineManager() : lockStatusListener_(nullptr)
41 {}
42
~StorageEngineManager()43 StorageEngineManager::~StorageEngineManager()
44 {
45 if (lockStatusListener_ != nullptr) {
46 lockStatusListener_->Drop(true);
47 }
48 }
49
GetStorageEngine(const KvDBProperties & property,int & errCode)50 StorageEngine *StorageEngineManager::GetStorageEngine(const KvDBProperties &property, int &errCode)
51 {
52 StorageEngineManager *manager = GetInstance();
53 if (manager == nullptr) {
54 LOGE("[StorageEngineManager] GetInstance failed");
55 errCode = -E_OUT_OF_MEMORY;
56 return nullptr;
57 }
58 std::string identifier = GetIdentifier(property);
59 manager->EnterGetEngineProcess(identifier);
60 auto storageEngine = manager->FindStorageEngine(identifier);
61 if (storageEngine == nullptr) {
62 storageEngine = manager->CreateStorageEngine(property, errCode);
63 if (errCode == E_OK) {
64 manager->InsertStorageEngine(identifier, storageEngine);
65 }
66 } else {
67 errCode = storageEngine->CheckEngineOption(property);
68 if (errCode != E_OK) {
69 LOGE("kvdb property mismatch engine option! errCode = [%d]", errCode);
70 storageEngine = nullptr;
71 }
72 }
73
74 manager->ExitGetEngineProcess(identifier);
75 return storageEngine;
76 }
77
ReleaseStorageEngine(StorageEngine * storageEngine)78 int StorageEngineManager::ReleaseStorageEngine(StorageEngine *storageEngine)
79 {
80 if (storageEngine == nullptr) {
81 LOGE("[StorageEngineManager] The engine to be released is nullptr");
82 return -E_INVALID_ARGS;
83 }
84
85 // Clear commit notify callback function.
86 storageEngine->SetNotifiedCallback(nullptr);
87
88 // If the cacheDB is valid, the storageEngine is not released to prevent the cache mechanism failed
89 bool isRelease = storageEngine->IsNeedTobeReleased();
90 if (!isRelease) {
91 LOGW("[StorageEngineManager] storageEngine do not need to be released.");
92 return E_OK;
93 }
94
95 StorageEngineManager *manager = GetInstance();
96 if (manager == nullptr) {
97 LOGE("[StorageEngineManager] Release GetInstance failed");
98 return -E_OUT_OF_MEMORY;
99 }
100
101 LOGD("[StorageEngineManager] storageEngine to be released.");
102 return manager->ReleaseEngine(storageEngine);
103 }
104
ForceReleaseStorageEngine(const std::string & identifier)105 int StorageEngineManager::ForceReleaseStorageEngine(const std::string &identifier)
106 {
107 StorageEngineManager *manager = GetInstance();
108 if (manager == nullptr) {
109 LOGE("[StorageEngineManager] Force release GetInstance failed");
110 return -E_OUT_OF_MEMORY;
111 }
112
113 LOGD("[StorageEngineManager] Force release engine.");
114 manager->ReleaseResources(identifier);
115 return E_OK;
116 }
117
ExecuteMigration(StorageEngine * storageEngine)118 int StorageEngineManager::ExecuteMigration(StorageEngine *storageEngine)
119 {
120 if (storageEngine == nullptr) {
121 LOGE("storage engine is nullptr can not execute migration!");
122 return -E_INVALID_ARGS;
123 }
124 if (storageEngine->IsExistConnection()) {
125 return storageEngine->ExecuteMigrate();
126 }
127 LOGI("connection is not existed, not need execute migration!");
128 return -E_INVALID_DB;
129 }
130
GetInstance()131 StorageEngineManager *StorageEngineManager::GetInstance()
132 {
133 // For Double-Checked Locking, we need check instance_ twice
134 if (instance_ == nullptr) {
135 std::lock_guard<std::mutex> lockGuard(instanceLock_);
136 if (instance_ == nullptr) {
137 instance_ = new (std::nothrow) StorageEngineManager();
138 if (instance_ == nullptr) {
139 LOGE("[StorageEngineManager] Failed to alloc the engine manager!");
140 return nullptr;
141 }
142 }
143 }
144
145 if (!isRegLockStatusListener_) {
146 int errCode = (instance_.load())->RegisterLockStatusListener();
147 if (errCode != E_OK) {
148 LOGW("[StorageEngineManager] Failed to register lock status listener:%d", errCode);
149 } else {
150 isRegLockStatusListener_ = true;
151 }
152 }
153 return instance_;
154 }
155
RegisterLockStatusListener()156 int StorageEngineManager::RegisterLockStatusListener()
157 {
158 int errCode = E_OK;
159 lockStatusListener_ = RuntimeContext::GetInstance()->RegisterLockStatusLister(
160 [this](void *lockStatus) {
161 if (lockStatus == nullptr) {
162 return;
163 }
164 bool isLocked = *static_cast<bool *>(lockStatus);
165 LOGD("[StorageEngineManager] Lock status to %d", isLocked);
166 if (isLocked) {
167 return;
168 }
169 int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask(
170 std::bind(&StorageEngineManager::LockStatusNotifier, this, isLocked));
171 if (taskErrCode != E_OK) {
172 LOGE("[StorageEngineManager] LockStatusNotifier ScheduleTask failed : %d", taskErrCode);
173 }
174 }, errCode);
175 if (errCode != E_OK) {
176 LOGW("[StorageEngineManager] Failed to register lock status listener: %d.", errCode);
177 }
178 return errCode;
179 }
180
LockStatusNotifier(bool isAccessControlled)181 void StorageEngineManager::LockStatusNotifier(bool isAccessControlled)
182 {
183 (void)isAccessControlled;
184 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
185 StorageEngine *storageEngine = nullptr;
186 for (const auto &item : storageEngines_) {
187 storageEngine = item.second;
188 LOGD("Begin to migrate for lock status change");
189 (void)ExecuteMigration(storageEngine);
190 }
191 }
192
RemoveEngineFromCache(const std::string & identifier)193 void StorageEngineManager::RemoveEngineFromCache(const std::string &identifier)
194 {
195 StorageEngineManager *manager = GetInstance();
196 if (manager != nullptr) {
197 manager->EraseStorageEngine(identifier);
198 }
199 }
200
CreateStorageEngine(const KvDBProperties & property,int & errCode)201 StorageEngine *StorageEngineManager::CreateStorageEngine(const KvDBProperties &property, int &errCode)
202 {
203 int databaseType = GetDatabaseType(property);
204 if (databaseType != KvDBProperties::SINGLE_VER_TYPE) {
205 LOGE("[StorageEngineManager] Database type error : %d", databaseType);
206 errCode = -E_NOT_SUPPORT;
207 return nullptr;
208 }
209
210 auto storageEngine = new (std::nothrow) SQLiteSingleVerStorageEngine();
211 if (storageEngine == nullptr) {
212 LOGE("[StorageEngineManager] Create storage engine failed");
213 errCode = -E_OUT_OF_MEMORY;
214 return nullptr;
215 }
216 errCode = E_OK;
217 return storageEngine;
218 }
219
FindStorageEngine(const std::string & identifier)220 StorageEngine *StorageEngineManager::FindStorageEngine(const std::string &identifier)
221 {
222 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
223 auto iter = storageEngines_.find(identifier);
224 if (iter != storageEngines_.end()) {
225 auto storageEngine = iter->second;
226 if (storageEngine == nullptr) {
227 LOGE("[StorageEngineManager] storageEngine in cache is nullptr");
228 storageEngines_.erase(identifier);
229 return nullptr;
230 }
231
232 return storageEngine;
233 }
234
235 return nullptr;
236 }
237
InsertStorageEngine(const std::string & identifier,StorageEngine * & storageEngine)238 void StorageEngineManager::InsertStorageEngine(const std::string &identifier, StorageEngine *&storageEngine)
239 {
240 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
241 storageEngines_.insert(std::pair<std::string, StorageEngine *>(identifier, storageEngine));
242 }
243
EraseStorageEngine(const std::string & identifier)244 void StorageEngineManager::EraseStorageEngine(const std::string &identifier)
245 {
246 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
247 storageEngines_.erase(identifier);
248 }
249
ReleaseResources(const std::string & identifier)250 void StorageEngineManager::ReleaseResources(const std::string &identifier)
251 {
252 StorageEngine *storageEngine = nullptr;
253
254 {
255 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
256 auto iter = storageEngines_.find(identifier);
257 if (iter != storageEngines_.end()) {
258 storageEngine = iter->second;
259 storageEngines_.erase(identifier);
260 }
261 }
262
263 if (storageEngine != nullptr) {
264 LOGI("[StorageEngineManager] Release storage engine");
265 delete storageEngine;
266 storageEngine = nullptr;
267 }
268
269 return;
270 }
271
ReleaseEngine(StorageEngine * releaseEngine)272 int StorageEngineManager::ReleaseEngine(StorageEngine *releaseEngine)
273 {
274 const std::string identifier = releaseEngine->GetIdentifier();
275 StorageEngine *cacheEngine = nullptr;
276
277 {
278 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
279 auto iter = storageEngines_.find(identifier);
280 if (iter != storageEngines_.end()) {
281 cacheEngine = iter->second;
282 storageEngines_.erase(identifier);
283 }
284 }
285
286 if (cacheEngine == nullptr) {
287 LOGE("[StorageEngineManager] cache engine is null");
288 return -E_ALREADY_RELEASE;
289 }
290 if (cacheEngine != releaseEngine) {
291 LOGE("[StorageEngineManager] cache engine is not equal the input engine");
292 return -E_INVALID_ARGS;
293 }
294
295 delete releaseEngine;
296 releaseEngine = nullptr;
297 return E_OK;
298 }
299
EnterGetEngineProcess(const std::string & identifier)300 void StorageEngineManager::EnterGetEngineProcess(const std::string &identifier)
301 {
302 std::unique_lock<std::mutex> lock(getEngineMutex_);
303 getEngineCondition_.wait(lock, [this, &identifier]() {
304 return this->getEngineSet_.count(identifier) == 0;
305 });
306 (void)getEngineSet_.insert(identifier);
307 }
308
ExitGetEngineProcess(const std::string & identifier)309 void StorageEngineManager::ExitGetEngineProcess(const std::string &identifier)
310 {
311 std::unique_lock<std::mutex> lock(getEngineMutex_);
312 (void)getEngineSet_.erase(identifier);
313 getEngineCondition_.notify_all();
314 }
315 } // namespace DistributedDB
316