1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_engine.h"
17
18 #include <algorithm>
19
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28
StorageEngine()29 StorageEngine::StorageEngine()
30 : isUpdated_(false),
31 isMigrating_(false),
32 commitNotifyFunc_(nullptr),
33 schemaChangedFunc_(nullptr),
34 isSchemaChanged_(false),
35 isEnhance_(false),
36 isInitialized_(false),
37 perm_(OperatePerm::NORMAL_PERM),
38 operateAbort_(false),
39 isExistConnection_(false),
40 engineState_(EngineState::INVALID)
41 {}
42
~StorageEngine()43 StorageEngine::~StorageEngine()
44 {
45 CloseExecutor();
46 }
47
CloseAllExecutor()48 void StorageEngine::CloseAllExecutor()
49 {
50 CloseExecutor();
51 }
52
InitAllReadWriteExecutor()53 int StorageEngine::InitAllReadWriteExecutor()
54 {
55 return InitReadWriteExecutors();
56 }
57
GetOption()58 OpenDbProperties StorageEngine::GetOption()
59 {
60 return option_;
61 }
62
InitReadWriteExecutors()63 int StorageEngine::InitReadWriteExecutors()
64 {
65 int errCode = E_OK;
66 std::scoped_lock initLock(writeMutex_, readMutex_);
67 // only for create the database avoid the minimum number is 0.
68 StorageExecutor *handle = nullptr;
69 if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
70 errCode = CreateNewExecutor(true, handle);
71 if (errCode != E_OK) {
72 return errCode;
73 }
74
75 if (handle != nullptr) {
76 delete handle;
77 handle = nullptr;
78 }
79 }
80
81 for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
82 handle = nullptr;
83 errCode = CreateNewExecutor(true, handle);
84 if (errCode != E_OK) {
85 return errCode;
86 }
87 AddStorageExecutor(handle, false);
88 }
89
90 for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
91 handle = nullptr;
92 errCode = CreateNewExecutor(false, handle);
93 if (errCode != E_OK) {
94 return errCode;
95 }
96 AddStorageExecutor(handle, false);
97 }
98 return E_OK;
99 }
100
101
Init(bool isEnhance)102 int StorageEngine::Init(bool isEnhance)
103 {
104 if (isInitialized_.load()) {
105 LOGD("Storage engine has been initialized!");
106 return E_OK;
107 }
108 isEnhance_ = isEnhance;
109
110 int errCode = InitReadWriteExecutors();
111 if (errCode == E_OK) {
112 isInitialized_.store(true);
113 initCondition_.notify_all();
114 return E_OK;
115 } else if (errCode == -E_EKEYREVOKED) {
116 // Assumed file system has classification function, can only get one write handle
117 std::unique_lock<std::mutex> lock(writeMutex_);
118 if (!writeIdleList_.empty() || !writeUsingList_.empty()) {
119 isInitialized_.store(true);
120 initCondition_.notify_all();
121 return E_OK;
122 }
123 }
124 initCondition_.notify_all();
125 Release();
126 return errCode;
127 }
128
ReInit()129 int StorageEngine::ReInit()
130 {
131 return E_OK;
132 }
133
FindExecutor(bool writable,OperatePerm perm,int & errCode,bool isExternal,int waitTime)134 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal,
135 int waitTime)
136 {
137 if (GetEngineState() == EngineState::ENGINE_BUSY) {
138 LOGI("Storage engine is busy!");
139 errCode = -E_BUSY;
140 return nullptr;
141 }
142
143 {
144 std::unique_lock<std::mutex> lock(initMutex_);
145 bool result = initCondition_.wait_for(lock, std::chrono::seconds(waitTime), [this]() {
146 return isInitialized_.load();
147 });
148 if (!result || !isInitialized_.load()) {
149 LOGE("Storage engine is not initialized");
150 errCode = -E_BUSY; // Usually in reinitialize engine, return BUSY
151 return nullptr;
152 }
153 }
154
155 if (writable) {
156 return FindWriteExecutor(perm, errCode, waitTime, isExternal);
157 }
158
159 return FindReadExecutor(perm, errCode, waitTime, isExternal);
160 }
161
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)162 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
163 {
164 LOGD("[FindWriteExecutor]Finding WriteExecutor");
165 std::unique_lock<std::mutex> lock(writeMutex_);
166 errCode = -E_BUSY;
167 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
168 LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
169 return nullptr;
170 }
171 std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
172 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
173 if (waitTime <= 0) { // non-blocking.
174 if (writeUsingList.empty() &&
175 writeIdleList.size() + writeUsingList.size() == engineAttr_.maxWriteNum) {
176 return nullptr;
177 }
178 return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
179 }
180 // Not prohibited and there is an available handle
181 bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
182 [this, &perm, &writeUsingList, &writeIdleList]() {
183 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList.empty() ||
184 (writeIdleList.size() + writeUsingList.size() < engineAttr_.maxWriteNum) ||
185 operateAbort_);
186 });
187 if (operateAbort_) {
188 LOGI("Abort write executor and executor and busy for operate!");
189 return nullptr;
190 }
191 if (!result) {
192 LOGI("Get write handle result[%d], permissType[%u], operType[%u], write[%zu-%zu-%" PRIu32 "]", result,
193 static_cast<unsigned>(perm_), static_cast<unsigned>(perm), writeIdleList.size(), writeUsingList.size(),
194 engineAttr_.maxWriteNum);
195 return nullptr;
196 }
197 return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
198 }
199
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)200 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
201 {
202 std::unique_lock<std::mutex> lock(readMutex_);
203 errCode = -E_BUSY;
204 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
205 LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
206 return nullptr;
207 }
208
209 std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
210 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
211 if (waitTime <= 0) { // non-blocking.
212 if (readIdleList.empty() &&
213 readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) {
214 return nullptr;
215 }
216 return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
217 }
218
219 // Not prohibited and there is an available handle
220 uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum;
221 bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
222 [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum]() {
223 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
224 (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < maxReadHandleNum) ||
225 operateAbort_);
226 });
227 if (operateAbort_) {
228 LOGI("Abort find read executor and busy for operate!");
229 return nullptr;
230 }
231 if (!result) {
232 LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result,
233 static_cast<unsigned>(perm_), static_cast<unsigned>(perm), readIdleList.size(), readUsingList.size(),
234 engineAttr_.maxReadNum);
235 return nullptr;
236 }
237 return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
238 }
239
Recycle(StorageExecutor * & handle,bool isExternal)240 void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal)
241 {
242 if (handle == nullptr) {
243 return;
244 }
245 if (!isEnhance_) {
246 LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), hashIdentifier_.c_str());
247 }
248 if (handle->GetWritable()) {
249 std::unique_lock<std::mutex> lock(writeMutex_);
250 std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
251 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
252 auto iter = std::find(writeUsingList.begin(), writeUsingList.end(), handle);
253 if (iter != writeUsingList.end()) {
254 writeUsingList.remove(handle);
255 if (!writeIdleList.empty()) {
256 delete handle;
257 handle = nullptr;
258 return;
259 }
260 handle->Reset();
261 writeIdleList.push_back(handle);
262 writeCondition_.notify_one();
263 idleCondition_.notify_all();
264 }
265 } else {
266 std::unique_lock<std::mutex> lock(readMutex_);
267 std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
268 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
269 auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle);
270 if (iter != readUsingList.end()) {
271 readUsingList.remove(handle);
272 if (!readIdleList.empty()) {
273 delete handle;
274 handle = nullptr;
275 return;
276 }
277 handle->Reset();
278 readIdleList.push_back(handle);
279 readCondition_.notify_one();
280 }
281 }
282 handle = nullptr;
283 }
284
ClearCorruptedFlag()285 void StorageEngine::ClearCorruptedFlag()
286 {
287 return;
288 }
289
IsEngineCorrupted() const290 bool StorageEngine::IsEngineCorrupted() const
291 {
292 return false;
293 }
294
Release()295 void StorageEngine::Release()
296 {
297 CloseExecutor();
298 isInitialized_.store(false);
299 isUpdated_ = false;
300 ClearCorruptedFlag();
301 SetEngineState(EngineState::INVALID);
302 }
303
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)304 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
305 {
306 if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
307 LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
308 return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
309 }
310
311 std::lock(writeMutex_, readMutex_);
312 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
313 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
314
315 if (!isNeedCheckAll) {
316 goto END;
317 }
318
319 if (!writeUsingList_.empty() || !readUsingList_.empty() || !externalWriteUsingList_.empty() ||
320 !externalReadUsingList_.empty()) {
321 LOGE("Database handle used");
322 return -E_BUSY;
323 }
324 END:
325 if (perm_ == OperatePerm::NORMAL_PERM) {
326 LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
327 perm_ = disableType;
328 writeCondition_.notify_all();
329 readCondition_.notify_all();
330 }
331 return E_OK;
332 }
333
Enable(OperatePerm enableType)334 void StorageEngine::Enable(OperatePerm enableType)
335 {
336 std::lock(writeMutex_, readMutex_);
337 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
338 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
339 if (perm_ == enableType) {
340 LOGI("Re-enable the database");
341 perm_ = OperatePerm::NORMAL_PERM;
342 writeCondition_.notify_all();
343 readCondition_.notify_all();
344 }
345 }
346
Abort(OperatePerm enableType)347 void StorageEngine::Abort(OperatePerm enableType)
348 {
349 std::lock(writeMutex_, readMutex_);
350 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
351 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
352 if (perm_ == enableType) {
353 LOGI("Abort the handle occupy, release all!");
354 perm_ = OperatePerm::NORMAL_PERM;
355 operateAbort_ = true;
356
357 writeCondition_.notify_all();
358 readCondition_.notify_all();
359 }
360 }
361
IsNeedTobeReleased() const362 bool StorageEngine::IsNeedTobeReleased() const
363 {
364 EngineState engineState = GetEngineState();
365 return ((engineState == EngineState::MAINDB) || (engineState == EngineState::INVALID));
366 }
367
GetIdentifier() const368 const std::string &StorageEngine::GetIdentifier() const
369 {
370 return identifier_;
371 }
372
GetEngineState() const373 EngineState StorageEngine::GetEngineState() const
374 {
375 return engineState_;
376 }
377
SetEngineState(EngineState state)378 void StorageEngine::SetEngineState(EngineState state)
379 {
380 if (state != EngineState::MAINDB) {
381 LOGD("Storage engine state to [%d]!", state);
382 }
383 engineState_ = state;
384 }
385
ExecuteMigrate()386 int StorageEngine::ExecuteMigrate()
387 {
388 LOGW("Migration is not supported!");
389 return -E_NOT_SUPPORT;
390 }
391
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)392 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
393 {
394 std::unique_lock<std::shared_mutex> lock(notifyMutex_);
395 commitNotifyFunc_ = callback;
396 }
397
SetConnectionFlag(bool isExisted)398 void StorageEngine::SetConnectionFlag(bool isExisted)
399 {
400 return isExistConnection_.store(isExisted);
401 }
402
IsExistConnection() const403 bool StorageEngine::IsExistConnection() const
404 {
405 return isExistConnection_.load();
406 }
407
CheckEngineOption(const KvDBProperties & kvdbOption) const408 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
409 {
410 return E_OK;
411 }
412
AddStorageExecutor(StorageExecutor * handle,bool isExternal)413 void StorageEngine::AddStorageExecutor(StorageExecutor *handle, bool isExternal)
414 {
415 if (handle == nullptr) {
416 return;
417 }
418
419 if (handle->GetWritable()) {
420 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
421 writeIdleList.push_back(handle);
422 } else {
423 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
424 readIdleList.push_back(handle);
425 }
426 }
427
ClearHandleList(std::list<StorageExecutor * > & handleList)428 void ClearHandleList(std::list<StorageExecutor *> &handleList)
429 {
430 for (auto &item : handleList) {
431 if (item != nullptr) {
432 delete item;
433 item = nullptr;
434 }
435 }
436 handleList.clear();
437 }
438
CloseExecutor()439 void StorageEngine::CloseExecutor()
440 {
441 {
442 std::lock_guard<std::mutex> lock(writeMutex_);
443 ClearHandleList(writeIdleList_);
444 ClearHandleList(externalWriteIdleList_);
445 }
446
447 {
448 std::lock_guard<std::mutex> lock(readMutex_);
449 ClearHandleList(readIdleList_);
450 ClearHandleList(externalReadIdleList_);
451 }
452 }
453
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode,bool isExternal)454 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
455 std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal)
456 {
457 if (idleList.empty()) {
458 StorageExecutor *handle = nullptr;
459 errCode = CreateNewExecutor(isWrite, handle);
460 if ((errCode != E_OK) || (handle == nullptr)) {
461 if (errCode != -E_EKEYREVOKED) {
462 return nullptr;
463 }
464 LOGE("Key revoked status, couldn't create the new executor");
465 if (!usingList.empty()) {
466 LOGE("Can't create new executor for revoked");
467 errCode = -E_BUSY;
468 }
469 return nullptr;
470 }
471
472 AddStorageExecutor(handle, isExternal);
473 }
474 auto item = idleList.front();
475 usingList.push_back(item);
476 idleList.remove(item);
477 if (!isEnhance_) {
478 LOGD("Get executor[%d] from [%.3s]", isWrite, hashIdentifier_.c_str());
479 }
480 errCode = E_OK;
481 return item;
482 }
483
CheckEngineAttr(const StorageEngineAttr & poolSize)484 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
485 {
486 return (poolSize.maxReadNum > MAX_READ_SIZE ||
487 poolSize.maxWriteNum > MAX_WRITE_SIZE ||
488 poolSize.minReadNum > poolSize.maxReadNum ||
489 poolSize.minWriteNum > poolSize.maxWriteNum);
490 }
491
IsMigrating() const492 bool StorageEngine::IsMigrating() const
493 {
494 return isMigrating_.load();
495 }
496
WaitWriteHandleIdle()497 void StorageEngine::WaitWriteHandleIdle()
498 {
499 std::unique_lock<std::mutex> autoLock(idleMutex_);
500 LOGD("Wait wHandle release id[%s]. write[%zu-%zu-%" PRIu32 "]", hashIdentifier_.c_str(),
501 writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
502 idleCondition_.wait(autoLock, [this]() {
503 return writeUsingList_.empty();
504 });
505 LOGD("Wait wHandle release finish id[%s]. write[%zu-%zu-%" PRIu32 "]",
506 hashIdentifier_.c_str(), writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
507 }
508
IncreaseCacheRecordVersion()509 void StorageEngine::IncreaseCacheRecordVersion()
510 {
511 return;
512 }
513
GetCacheRecordVersion() const514 uint64_t StorageEngine::GetCacheRecordVersion() const
515 {
516 return 0;
517 }
518
GetAndIncreaseCacheRecordVersion()519 uint64_t StorageEngine::GetAndIncreaseCacheRecordVersion()
520 {
521 return 0;
522 }
523
SetSchemaChangedCallback(const std::function<int (void)> & callback)524 void StorageEngine::SetSchemaChangedCallback(const std::function<int(void)> &callback)
525 {
526 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
527 schemaChangedFunc_ = callback;
528 }
529 }
530