1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_engine.h"
17
18 #include <algorithm>
19
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28
StorageEngine()29 StorageEngine::StorageEngine()
30 : isUpdated_(false),
31 isMigrating_(false),
32 engineState_(EngineState::INVALID),
33 commitNotifyFunc_(nullptr),
34 isInitialized_(false),
35 perm_(OperatePerm::NORMAL_PERM),
36 operateAbort_(false),
37 isExistConnection_(false)
38 {}
39
~StorageEngine()40 StorageEngine::~StorageEngine()
41 {
42 Release();
43 }
44
Init()45 int StorageEngine::Init()
46 {
47 if (isInitialized_) {
48 LOGD("Storage engine has been initialized!");
49 return E_OK;
50 }
51
52 // only for create the database avoid the minimum number is 0.
53 int errCode = E_OK;
54 StorageExecutor *handle = nullptr;
55 if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
56 errCode = CreateNewExecutor(true, handle);
57 if (errCode != E_OK) {
58 goto ERROR;
59 }
60
61 if (handle != nullptr) {
62 delete handle;
63 handle = nullptr;
64 }
65 }
66
67 for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
68 handle = nullptr;
69 errCode = CreateNewExecutor(true, handle);
70 if (errCode != E_OK) {
71 goto ERROR;
72 }
73 AddStorageExecutor(handle);
74 }
75
76 for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
77 handle = nullptr;
78 errCode = CreateNewExecutor(false, handle);
79 if (errCode != E_OK) {
80 goto ERROR;
81 }
82 AddStorageExecutor(handle);
83 }
84 isInitialized_ = true;
85
86 ERROR:
87 if (errCode != E_OK) {
88 // Assumed file system has classification function, can only get one write handle
89 if (errCode == -E_EKEYREVOKED && !writeIdleList_.empty()) {
90 return E_OK;
91 }
92 Release();
93 }
94 return errCode;
95 }
96
FindExecutor(bool writable,OperatePerm perm,int & errCode,int waitTime)97 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, int waitTime)
98 {
99 if (GetEngineState() == EngineState::ENGINE_BUSY) {
100 LOGI("Storage engine is busy!");
101 errCode = -E_BUSY;
102 return nullptr;
103 }
104
105 if (writable) {
106 return FindWriteExecutor(perm, errCode, waitTime);
107 }
108
109 return FindReadExecutor(perm, errCode, waitTime);
110 }
111
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime)112 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime)
113 {
114 std::unique_lock<std::mutex> lock(writeMutex_);
115 errCode = -E_BUSY;
116 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
117 LOGI("Not permitted to get the executor[%d]", perm_);
118 return nullptr;
119 }
120 if (waitTime <= 0) { // non-blocking.
121 if (writeIdleList_.empty() &&
122 writeIdleList_.size() + writeUsingList_.size() == engineAttr_.maxWriteNum) {
123 return nullptr;
124 }
125 return FetchStorageExecutor(true, writeIdleList_, writeUsingList_, errCode);
126 }
127
128 // Not prohibited and there is an available handle
129 bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
130 [this, &perm]() {
131 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList_.empty() ||
132 (writeIdleList_.size() + writeUsingList_.size() < engineAttr_.maxWriteNum) ||
133 operateAbort_);
134 });
135 if (operateAbort_) {
136 LOGI("Abort write executor and executor and busy for operate!");
137 return nullptr;
138 }
139 if (!result) {
140 LOGI("Get write handle result[%d], permissType[%d], operType[%d], write[%d-%d-%d]",
141 result, perm_, perm, writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
142 return nullptr;
143 }
144 return FetchStorageExecutor(true, writeIdleList_, writeUsingList_, errCode);
145 }
146
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime)147 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime)
148 {
149 std::unique_lock<std::mutex> lock(readMutex_);
150 errCode = -E_BUSY;
151 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
152 LOGI("Not permitted to get the executor[%d]", perm_);
153 return nullptr;
154 }
155
156 if (waitTime <= 0) { // non-blocking.
157 if (readIdleList_.empty() &&
158 readIdleList_.size() + readUsingList_.size() == engineAttr_.maxReadNum) {
159 return nullptr;
160 }
161 return FetchStorageExecutor(false, readIdleList_, readUsingList_, errCode);
162 }
163
164 // Not prohibited and there is an available handle
165 bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
166 [this, &perm]() {
167 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
168 (!readIdleList_.empty() || (readIdleList_.size() + readUsingList_.size() < engineAttr_.maxReadNum) ||
169 operateAbort_);
170 });
171 if (operateAbort_) {
172 LOGI("Abort find read executor and busy for operate!");
173 return nullptr;
174 }
175 if (!result) {
176 LOGI("Get read handle result[%d], permissType[%d], operType[%d], read[%d-%d-%d]",
177 result, perm_, perm, readIdleList_.size(), readUsingList_.size(), engineAttr_.maxReadNum);
178 return nullptr;
179 }
180 return FetchStorageExecutor(false, readIdleList_, readUsingList_, errCode);
181 }
182
Recycle(StorageExecutor * & handle)183 void StorageEngine::Recycle(StorageExecutor *&handle)
184 {
185 if (handle == nullptr) {
186 return;
187 }
188 std::string id = DBCommon::TransferStringToHex(identifier_);
189 LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), id.c_str());
190 if (handle->GetWritable()) {
191 std::unique_lock<std::mutex> lock(writeMutex_);
192 auto iter = std::find(writeUsingList_.begin(), writeUsingList_.end(), handle);
193 if (iter != writeUsingList_.end()) {
194 writeUsingList_.remove(handle);
195 if (writeIdleList_.size() >= 1) {
196 delete handle;
197 handle = nullptr;
198 return;
199 }
200 handle->Reset();
201 writeIdleList_.push_back(handle);
202 writeCondition_.notify_one();
203 }
204 } else {
205 std::unique_lock<std::mutex> lock(readMutex_);
206 auto iter = std::find(readUsingList_.begin(), readUsingList_.end(), handle);
207 if (iter != readUsingList_.end()) {
208 readUsingList_.remove(handle);
209 if (readIdleList_.size() >= 1) {
210 delete handle;
211 handle = nullptr;
212 return;
213 }
214 handle->Reset();
215 readIdleList_.push_back(handle);
216 readCondition_.notify_one();
217 }
218 }
219 handle = nullptr;
220 }
221
ClearCorruptedFlag()222 void StorageEngine::ClearCorruptedFlag()
223 {
224 return;
225 }
226
Release()227 void StorageEngine::Release()
228 {
229 CloseExecutor();
230 isInitialized_ = false;
231 isUpdated_ = false;
232 ClearCorruptedFlag();
233 SetEngineState(EngineState::INVALID);
234 }
235
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)236 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
237 {
238 if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
239 LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
240 return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
241 }
242
243 std::lock(writeMutex_, readMutex_);
244 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
245 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
246
247 if (!isNeedCheckAll) {
248 goto END;
249 }
250
251 if (!writeUsingList_.empty() || !readUsingList_.empty()) {
252 LOGE("Database handle used");
253 return -E_BUSY;
254 }
255 END:
256 if (perm_ == OperatePerm::NORMAL_PERM) {
257 LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
258 perm_ = disableType;
259 writeCondition_.notify_all();
260 readCondition_.notify_all();
261 }
262 return E_OK;
263 }
264
Enable(OperatePerm enableType)265 void StorageEngine::Enable(OperatePerm enableType)
266 {
267 std::lock(writeMutex_, readMutex_);
268 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
269 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
270 if (perm_ == enableType) {
271 LOGI("Re-enable the database");
272 perm_ = OperatePerm::NORMAL_PERM;
273 writeCondition_.notify_all();
274 readCondition_.notify_all();
275 }
276 }
277
Abort(OperatePerm enableType)278 void StorageEngine::Abort(OperatePerm enableType)
279 {
280 std::lock(writeMutex_, readMutex_);
281 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
282 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
283 if (perm_ == enableType) {
284 LOGI("Abort the handle occupy, release all!");
285 perm_ = OperatePerm::NORMAL_PERM;
286 operateAbort_ = true;
287
288 writeCondition_.notify_all();
289 readCondition_.notify_all();
290 }
291 }
292
IsNeedTobeReleased() const293 bool StorageEngine::IsNeedTobeReleased() const
294 {
295 return true;
296 }
297
GetIdentifier() const298 const std::string &StorageEngine::GetIdentifier() const
299 {
300 return identifier_;
301 }
302
GetEngineState() const303 EngineState StorageEngine::GetEngineState() const
304 {
305 return engineState_;
306 }
307
SetEngineState(EngineState state)308 void StorageEngine::SetEngineState(EngineState state)
309 {
310 LOGI("Storage engine state to [%d]!", state);
311 engineState_ = state;
312 }
313
IsNeedMigrate() const314 bool StorageEngine::IsNeedMigrate() const
315 {
316 LOGI("No need to migrate!");
317 return false;
318 }
319
ExecuteMigrate()320 int StorageEngine::ExecuteMigrate()
321 {
322 LOGW("Migration is not supported!");
323 return -E_NOT_SUPPORT;
324 }
325
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)326 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
327 {
328 std::unique_lock<std::shared_mutex> lock(notifyMutex_);
329 commitNotifyFunc_ = callback;
330 return;
331 }
332
SetConnectionFlag(bool isExisted)333 void StorageEngine::SetConnectionFlag(bool isExisted)
334 {
335 return isExistConnection_.store(isExisted);
336 }
337
IsExistConnection() const338 bool StorageEngine::IsExistConnection() const
339 {
340 return isExistConnection_.load();
341 }
342
ClearEnginePasswd()343 void StorageEngine::ClearEnginePasswd()
344 {
345 return;
346 }
347
CheckEngineOption(const KvDBProperties & kvdbOption) const348 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
349 {
350 return E_OK;
351 }
352
AddStorageExecutor(StorageExecutor * handle)353 void StorageEngine::AddStorageExecutor(StorageExecutor *handle)
354 {
355 if (handle == nullptr) {
356 return;
357 }
358
359 if (handle->GetWritable()) {
360 writeIdleList_.push_back(handle);
361 } else {
362 readIdleList_.push_back(handle);
363 }
364 }
365
CloseExecutor()366 void StorageEngine::CloseExecutor()
367 {
368 {
369 std::lock_guard<std::mutex> lock(writeMutex_);
370 for (auto &item : writeIdleList_) {
371 if (item != nullptr) {
372 delete item;
373 item = nullptr;
374 }
375 }
376 writeIdleList_.clear();
377 }
378
379 {
380 std::lock_guard<std::mutex> lock(readMutex_);
381 for (auto &item : readIdleList_) {
382 if (item != nullptr) {
383 delete item;
384 item = nullptr;
385 }
386 }
387 readIdleList_.clear();
388 }
389 }
390
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode)391 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
392 std::list<StorageExecutor *> &usingList, int &errCode)
393 {
394 if (idleList.empty()) {
395 StorageExecutor *handle = nullptr;
396 errCode = CreateNewExecutor(isWrite, handle);
397 if ((errCode != E_OK) || (handle == nullptr)) {
398 if (errCode != -E_EKEYREVOKED) {
399 return nullptr;
400 }
401 LOGE("Key revoked status, couldn't create the new executor");
402 if (!usingList.empty()) {
403 LOGE("Can't create new executor for revoked");
404 errCode = -E_BUSY;
405 }
406 return nullptr;
407 }
408
409 AddStorageExecutor(handle);
410 }
411 auto item = idleList.front();
412 usingList.push_back(item);
413 idleList.remove(item);
414 LOGD("Get executor[%d] from [%.6s], using[%d]", isWrite,
415 DBCommon::TransferStringToHex(identifier_).c_str(), usingList.size());
416 errCode = E_OK;
417 return item;
418 }
419
CheckEngineAttr(const StorageEngineAttr & poolSize)420 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
421 {
422 return (poolSize.maxReadNum > MAX_READ_SIZE ||
423 poolSize.maxWriteNum > MAX_WRITE_SIZE ||
424 poolSize.minReadNum > poolSize.maxReadNum ||
425 poolSize.minWriteNum > poolSize.maxWriteNum);
426 }
427
IsMigrating() const428 bool StorageEngine::IsMigrating() const
429 {
430 return isMigrating_.load();
431 }
432 }
433