• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_engine_manager.h"
17 #include "log_print.h"
18 #include "db_errno.h"
19 #include "runtime_context.h"
20 #include "sqlite_single_ver_storage_engine.h"
21 
22 namespace DistributedDB {
23 bool StorageEngineManager::isRegLockStatusListener_ = false;
24 std::mutex StorageEngineManager::instanceLock_;
25 std::atomic<StorageEngineManager *> StorageEngineManager::instance_{nullptr};
26 std::mutex StorageEngineManager::storageEnginesLock_;
27 
28 namespace {
GetIdentifier(const KvDBProperties & property)29     std::string GetIdentifier(const KvDBProperties &property)
30     {
31         return property.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
32     }
33 
GetDatabaseType(const KvDBProperties & property)34     int GetDatabaseType(const KvDBProperties &property)
35     {
36         return property.GetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::LOCAL_TYPE);
37     }
38 }
39 
StorageEngineManager()40 StorageEngineManager::StorageEngineManager() : lockStatusListener_(nullptr)
41 {}
42 
~StorageEngineManager()43 StorageEngineManager::~StorageEngineManager()
44 {
45     if (lockStatusListener_ != nullptr) {
46         lockStatusListener_->Drop(true);
47     }
48 }
49 
GetStorageEngine(const KvDBProperties & property,int & errCode)50 StorageEngine *StorageEngineManager::GetStorageEngine(const KvDBProperties &property, int &errCode)
51 {
52     StorageEngineManager *manager = GetInstance();
53     if (manager == nullptr) {
54         LOGE("[StorageEngineManager] GetInstance failed");
55         errCode = -E_OUT_OF_MEMORY;
56         return nullptr;
57     }
58     std::string identifier = GetIdentifier(property);
59     manager->EnterGetEngineProcess(identifier);
60     auto storageEngine = manager->FindStorageEngine(identifier);
61     if (storageEngine == nullptr) {
62         storageEngine = manager->CreateStorageEngine(property, errCode);
63         if (errCode == E_OK) {
64             manager->InsertStorageEngine(identifier, storageEngine);
65         }
66     } else {
67         errCode = storageEngine->CheckEngineOption(property);
68         if (errCode != E_OK) {
69             LOGE("kvdb property mismatch engine option! errCode = [%d]", errCode);
70             storageEngine = nullptr;
71         }
72     }
73 
74     manager->ExitGetEngineProcess(identifier);
75     return storageEngine;
76 }
77 
ReleaseStorageEngine(StorageEngine * storageEngine)78 int StorageEngineManager::ReleaseStorageEngine(StorageEngine *storageEngine)
79 {
80     if (storageEngine == nullptr) {
81         LOGE("[StorageEngineManager] The engine to be released is nullptr");
82         return -E_INVALID_ARGS;
83     }
84 
85     // Clear commit notify callback function.
86     storageEngine->SetNotifiedCallback(nullptr);
87 
88     // If the cacheDB is valid, the storageEngine is not released to prevent the cache mechanism failed
89     bool isRelease = storageEngine->IsNeedTobeReleased();
90     if (!isRelease) {
91         LOGW("[StorageEngineManager] storageEngine do not need to be released.");
92         return E_OK;
93     }
94 
95     StorageEngineManager *manager = GetInstance();
96     if (manager == nullptr) {
97         LOGE("[StorageEngineManager] Release GetInstance failed");
98         return -E_OUT_OF_MEMORY;
99     }
100 
101     LOGD("[StorageEngineManager] storageEngine to be released.");
102     return manager->ReleaseEngine(storageEngine);
103 }
104 
ForceReleaseStorageEngine(const std::string & identifier)105 int StorageEngineManager::ForceReleaseStorageEngine(const std::string &identifier)
106 {
107     StorageEngineManager *manager = GetInstance();
108     if (manager == nullptr) {
109         LOGE("[StorageEngineManager] Force release GetInstance failed");
110         return -E_OUT_OF_MEMORY;
111     }
112 
113     LOGD("[StorageEngineManager] Force release engine.");
114     manager->ReleaseResources(identifier);
115     return E_OK;
116 }
117 
ExecuteMigration(StorageEngine * storageEngine)118 int StorageEngineManager::ExecuteMigration(StorageEngine *storageEngine)
119 {
120     if (storageEngine == nullptr) {
121         LOGE("storage engine is nullptr can not execute migration!");
122         return -E_INVALID_ARGS;
123     }
124     if (storageEngine->IsExistConnection()) {
125         return storageEngine->ExecuteMigrate();
126     }
127     LOGI("connection is not existed, not need execute migration!");
128     return -E_INVALID_DB;
129 }
130 
GetInstance()131 StorageEngineManager *StorageEngineManager::GetInstance()
132 {
133     // For Double-Checked Locking, we need check instance_ twice
134     if (instance_ == nullptr) {
135         std::lock_guard<std::mutex> lockGuard(instanceLock_);
136         if (instance_ == nullptr) {
137             instance_ = new (std::nothrow) StorageEngineManager();
138             if (instance_ == nullptr) {
139                 LOGE("[StorageEngineManager] Failed to alloc the engine manager!");
140                 return nullptr;
141             }
142         }
143     }
144 
145     if (!isRegLockStatusListener_) {
146         int errCode = (instance_.load())->RegisterLockStatusListener();
147         if (errCode != E_OK) {
148             LOGW("[StorageEngineManager] Failed to register lock status listener:%d", errCode);
149         } else {
150             isRegLockStatusListener_ = true;
151         }
152     }
153     return instance_;
154 }
155 
RegisterLockStatusListener()156 int StorageEngineManager::RegisterLockStatusListener()
157 {
158     int errCode = E_OK;
159     lockStatusListener_ = RuntimeContext::GetInstance()->RegisterLockStatusLister(
160         [this](void *lockStatus) {
161             if (lockStatus == nullptr) {
162                 return;
163             }
164             bool isLocked = *static_cast<bool *>(lockStatus);
165             LOGD("[StorageEngineManager] Lock status to %d", isLocked);
166             if (isLocked) {
167                 return;
168             }
169             int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask(
170                 std::bind(&StorageEngineManager::LockStatusNotifier, this, isLocked));
171             if (taskErrCode != E_OK) {
172                 LOGE("[StorageEngineManager] LockStatusNotifier ScheduleTask failed : %d", taskErrCode);
173             }
174         }, errCode);
175     if (errCode != E_OK) {
176         LOGW("[StorageEngineManager] Failed to register lock status listener: %d.", errCode);
177     }
178     return errCode;
179 }
180 
LockStatusNotifier(bool isAccessControlled)181 void StorageEngineManager::LockStatusNotifier(bool isAccessControlled)
182 {
183     (void)isAccessControlled;
184     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
185     StorageEngine *storageEngine = nullptr;
186     for (const auto &item : storageEngines_) {
187         storageEngine = item.second;
188         LOGD("Begin to migrate for lock status change");
189         (void)ExecuteMigration(storageEngine);
190     }
191 }
192 
CreateStorageEngine(const KvDBProperties & property,int & errCode)193 StorageEngine *StorageEngineManager::CreateStorageEngine(const KvDBProperties &property, int &errCode)
194 {
195     int databaseType = GetDatabaseType(property);
196     if (databaseType != KvDBProperties::SINGLE_VER_TYPE) {
197         LOGE("[StorageEngineManager] Database type error : %d", databaseType);
198         errCode = -E_NOT_SUPPORT;
199         return nullptr;
200     }
201 
202     auto storageEngine = new (std::nothrow) SQLiteSingleVerStorageEngine();
203     if (storageEngine == nullptr) {
204         LOGE("[StorageEngineManager] Create storage engine failed");
205         errCode = -E_OUT_OF_MEMORY;
206         return nullptr;
207     }
208     errCode = E_OK;
209     return storageEngine;
210 }
211 
FindStorageEngine(const std::string & identifier)212 StorageEngine *StorageEngineManager::FindStorageEngine(const std::string &identifier)
213 {
214     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
215     auto iter = storageEngines_.find(identifier);
216     if (iter != storageEngines_.end()) {
217         auto storageEngine = iter->second;
218         if (storageEngine == nullptr) {
219             LOGE("[StorageEngineManager] storageEngine in cache is nullptr");
220             storageEngines_.erase(identifier);
221             return nullptr;
222         }
223 
224         return storageEngine;
225     }
226 
227     return nullptr;
228 }
229 
InsertStorageEngine(const std::string & identifier,StorageEngine * & storageEngine)230 void StorageEngineManager::InsertStorageEngine(const std::string &identifier, StorageEngine *&storageEngine)
231 {
232     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
233     storageEngines_.insert(std::pair<std::string, StorageEngine *>(identifier, storageEngine));
234 }
235 
EraseStorageEngine(const std::string & identifier)236 void StorageEngineManager::EraseStorageEngine(const std::string &identifier)
237 {
238     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
239     storageEngines_.erase(identifier);
240 }
241 
ReleaseResources(const std::string & identifier)242 void StorageEngineManager::ReleaseResources(const std::string &identifier)
243 {
244     StorageEngine *storageEngine = nullptr;
245 
246     {
247         std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
248         auto iter = storageEngines_.find(identifier);
249         if (iter != storageEngines_.end()) {
250             storageEngine = iter->second;
251             storageEngines_.erase(identifier);
252         }
253     }
254 
255     if (storageEngine != nullptr) {
256         LOGI("[StorageEngineManager] Release storage engine");
257         delete storageEngine;
258     }
259 }
260 
ReleaseEngine(StorageEngine * releaseEngine)261 int StorageEngineManager::ReleaseEngine(StorageEngine *releaseEngine)
262 {
263     const std::string identifier = releaseEngine->GetIdentifier();
264     StorageEngine *cacheEngine = nullptr;
265 
266     {
267         std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
268         auto iter = storageEngines_.find(identifier);
269         if (iter != storageEngines_.end()) {
270             cacheEngine = iter->second;
271             storageEngines_.erase(identifier);
272         }
273     }
274 
275     if (cacheEngine == nullptr) {
276         LOGE("[StorageEngineManager] cache engine is null");
277         return -E_ALREADY_RELEASE;
278     }
279     if (cacheEngine != releaseEngine) {
280         LOGE("[StorageEngineManager] cache engine is not equal the input engine");
281         return -E_INVALID_ARGS;
282     }
283 
284     delete releaseEngine;
285     return E_OK;
286 }
287 
EnterGetEngineProcess(const std::string & identifier)288 void StorageEngineManager::EnterGetEngineProcess(const std::string &identifier)
289 {
290     std::unique_lock<std::mutex> lock(getEngineMutex_);
291     getEngineCondition_.wait(lock, [this, &identifier]() {
292         return this->getEngineSet_.count(identifier) == 0;
293     });
294     (void)getEngineSet_.insert(identifier);
295 }
296 
ExitGetEngineProcess(const std::string & identifier)297 void StorageEngineManager::ExitGetEngineProcess(const std::string &identifier)
298 {
299     std::unique_lock<std::mutex> lock(getEngineMutex_);
300     (void)getEngineSet_.erase(identifier);
301     getEngineCondition_.notify_all();
302 }
303 } // namespace DistributedDB
304