• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_engine.h"
17 
18 #include <algorithm>
19 
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23 
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28 
StorageEngine()29 StorageEngine::StorageEngine()
30     : isUpdated_(false),
31       isMigrating_(false),
32       commitNotifyFunc_(nullptr),
33       schemaChangedFunc_(nullptr),
34       isSchemaChanged_(false),
35       isEnhance_(false),
36       isInitialized_(false),
37       perm_(OperatePerm::NORMAL_PERM),
38       operateAbort_(false),
39       isExistConnection_(false),
40       engineState_(EngineState::INVALID)
41 {}
42 
~StorageEngine()43 StorageEngine::~StorageEngine()
44 {
45     CloseExecutor();
46 }
47 
CloseAllExecutor()48 void StorageEngine::CloseAllExecutor()
49 {
50     CloseExecutor();
51 }
52 
InitAllReadWriteExecutor()53 int StorageEngine::InitAllReadWriteExecutor()
54 {
55     return InitReadWriteExecutors();
56 }
57 
GetOption()58 OpenDbProperties StorageEngine::GetOption()
59 {
60     return option_;
61 }
62 
InitReadWriteExecutors()63 int StorageEngine::InitReadWriteExecutors()
64 {
65     int errCode = E_OK;
66     std::scoped_lock initLock(writeMutex_, readMutex_);
67     // only for create the database avoid the minimum number is 0.
68     StorageExecutor *handle = nullptr;
69     if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
70         errCode = CreateNewExecutor(true, handle);
71         if (errCode != E_OK) {
72             return errCode;
73         }
74 
75         if (handle != nullptr) {
76             delete handle;
77             handle = nullptr;
78         }
79     }
80 
81     for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
82         handle = nullptr;
83         errCode = CreateNewExecutor(true, handle);
84         if (errCode != E_OK) {
85             return errCode;
86         }
87         AddStorageExecutor(handle, false);
88     }
89 
90     for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
91         handle = nullptr;
92         errCode = CreateNewExecutor(false, handle);
93         if (errCode != E_OK) {
94             return errCode;
95         }
96         AddStorageExecutor(handle, false);
97     }
98     return E_OK;
99 }
100 
101 
Init(bool isEnhance)102 int StorageEngine::Init(bool isEnhance)
103 {
104     if (isInitialized_.load()) {
105         LOGD("Storage engine has been initialized!");
106         return E_OK;
107     }
108     isEnhance_ = isEnhance;
109 
110     int errCode = InitReadWriteExecutors();
111     if (errCode == E_OK) {
112         isInitialized_.store(true);
113         initCondition_.notify_all();
114         return E_OK;
115     } else if (errCode == -E_EKEYREVOKED) {
116         // Assumed file system has classification function, can only get one write handle
117         std::unique_lock<std::mutex> lock(writeMutex_);
118         if (!writeIdleList_.empty() || !writeUsingList_.empty()) {
119             isInitialized_.store(true);
120             initCondition_.notify_all();
121             return E_OK;
122         }
123     }
124     initCondition_.notify_all();
125     Release();
126     return errCode;
127 }
128 
ReInit()129 int StorageEngine::ReInit()
130 {
131     return E_OK;
132 }
133 
FindExecutor(bool writable,OperatePerm perm,int & errCode,bool isExternal,int waitTime)134 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal,
135     int waitTime)
136 {
137     if (GetEngineState() == EngineState::ENGINE_BUSY) {
138         LOGI("Storage engine is busy!");
139         errCode = -E_BUSY;
140         return nullptr;
141     }
142 
143     {
144         std::unique_lock<std::mutex> lock(initMutex_);
145         bool result = initCondition_.wait_for(lock, std::chrono::seconds(waitTime), [this]() {
146             return isInitialized_.load();
147         });
148         if (!result || !isInitialized_.load()) {
149             LOGE("Storage engine is not initialized");
150             errCode = -E_BUSY; // Usually in reinitialize engine, return BUSY
151             return nullptr;
152         }
153     }
154 
155     if (writable) {
156         return FindWriteExecutor(perm, errCode, waitTime, isExternal);
157     }
158 
159     return FindReadExecutor(perm, errCode, waitTime, isExternal);
160 }
161 
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)162 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
163 {
164     LOGD("[FindWriteExecutor]Finding WriteExecutor");
165     std::unique_lock<std::mutex> lock(writeMutex_);
166     errCode = -E_BUSY;
167     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
168         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
169         return nullptr;
170     }
171     std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
172     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
173     if (waitTime <= 0) { // non-blocking.
174         if (writeUsingList.empty() &&
175                 writeIdleList.size() + writeUsingList.size() == engineAttr_.maxWriteNum) {
176             return nullptr;
177         }
178         return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
179     }
180     // Not prohibited and there is an available handle
181     bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
182         [this, &perm, &writeUsingList, &writeIdleList]() {
183             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList.empty() ||
184                 (writeIdleList.size() + writeUsingList.size() < engineAttr_.maxWriteNum) ||
185                 operateAbort_);
186         });
187     if (operateAbort_) {
188         LOGI("Abort write executor and executor and busy for operate!");
189         return nullptr;
190     }
191     if (!result) {
192         LOGI("Get write handle result[%d], permissType[%u], operType[%u], write[%zu-%zu-%" PRIu32 "]", result,
193             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), writeIdleList.size(), writeUsingList.size(),
194             engineAttr_.maxWriteNum);
195         return nullptr;
196     }
197     return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
198 }
199 
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)200 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
201 {
202     std::unique_lock<std::mutex> lock(readMutex_);
203     errCode = -E_BUSY;
204     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
205         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
206         return nullptr;
207     }
208 
209     std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
210     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
211     if (waitTime <= 0) { // non-blocking.
212         if (readIdleList.empty() &&
213             readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) {
214             return nullptr;
215         }
216         return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
217     }
218 
219     // Not prohibited and there is an available handle
220     uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum;
221     bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
222         [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum]() {
223             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
224                 (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < maxReadHandleNum) ||
225                 operateAbort_);
226         });
227     if (operateAbort_) {
228         LOGI("Abort find read executor and busy for operate!");
229         return nullptr;
230     }
231     if (!result) {
232         LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result,
233             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), readIdleList.size(), readUsingList.size(),
234             engineAttr_.maxReadNum);
235         return nullptr;
236     }
237     return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
238 }
239 
Recycle(StorageExecutor * & handle,bool isExternal)240 void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal)
241 {
242     if (handle == nullptr) {
243         return;
244     }
245     if (!isEnhance_) {
246         LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), hashIdentifier_.c_str());
247     }
248     if (handle->GetWritable()) {
249         std::unique_lock<std::mutex> lock(writeMutex_);
250         std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
251         std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
252         auto iter = std::find(writeUsingList.begin(), writeUsingList.end(), handle);
253         if (iter != writeUsingList.end()) {
254             writeUsingList.remove(handle);
255             if (!writeIdleList.empty()) {
256                 delete handle;
257                 handle = nullptr;
258                 return;
259             }
260             handle->Reset();
261             writeIdleList.push_back(handle);
262             writeCondition_.notify_one();
263             idleCondition_.notify_all();
264         }
265     } else {
266         std::unique_lock<std::mutex> lock(readMutex_);
267         std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
268         std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
269         auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle);
270         if (iter != readUsingList.end()) {
271             readUsingList.remove(handle);
272             if (!readIdleList.empty()) {
273                 delete handle;
274                 handle = nullptr;
275                 return;
276             }
277             handle->Reset();
278             readIdleList.push_back(handle);
279             readCondition_.notify_one();
280         }
281     }
282     handle = nullptr;
283 }
284 
ClearCorruptedFlag()285 void StorageEngine::ClearCorruptedFlag()
286 {
287     return;
288 }
289 
IsEngineCorrupted() const290 bool StorageEngine::IsEngineCorrupted() const
291 {
292     return false;
293 }
294 
Release()295 void StorageEngine::Release()
296 {
297     CloseExecutor();
298     isInitialized_.store(false);
299     isUpdated_ = false;
300     ClearCorruptedFlag();
301     SetEngineState(EngineState::INVALID);
302 }
303 
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)304 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
305 {
306     if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
307         LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
308         return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
309     }
310 
311     std::lock(writeMutex_, readMutex_);
312     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
313     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
314 
315     if (!isNeedCheckAll) {
316         goto END;
317     }
318 
319     if (!writeUsingList_.empty() || !readUsingList_.empty() || !externalWriteUsingList_.empty() ||
320         !externalReadUsingList_.empty()) {
321         LOGE("Database handle used");
322         return -E_BUSY;
323     }
324 END:
325     if (perm_ == OperatePerm::NORMAL_PERM) {
326         LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
327         perm_ = disableType;
328         writeCondition_.notify_all();
329         readCondition_.notify_all();
330     }
331     return E_OK;
332 }
333 
Enable(OperatePerm enableType)334 void StorageEngine::Enable(OperatePerm enableType)
335 {
336     std::lock(writeMutex_, readMutex_);
337     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
338     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
339     if (perm_ == enableType) {
340         LOGI("Re-enable the database");
341         perm_ = OperatePerm::NORMAL_PERM;
342         writeCondition_.notify_all();
343         readCondition_.notify_all();
344     }
345 }
346 
Abort(OperatePerm enableType)347 void StorageEngine::Abort(OperatePerm enableType)
348 {
349     std::lock(writeMutex_, readMutex_);
350     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
351     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
352     if (perm_ == enableType) {
353         LOGI("Abort the handle occupy, release all!");
354         perm_ = OperatePerm::NORMAL_PERM;
355         operateAbort_ = true;
356 
357         writeCondition_.notify_all();
358         readCondition_.notify_all();
359     }
360 }
361 
IsNeedTobeReleased() const362 bool StorageEngine::IsNeedTobeReleased() const
363 {
364     EngineState engineState = GetEngineState();
365     return ((engineState == EngineState::MAINDB) || (engineState == EngineState::INVALID));
366 }
367 
GetIdentifier() const368 const std::string &StorageEngine::GetIdentifier() const
369 {
370     return identifier_;
371 }
372 
GetEngineState() const373 EngineState StorageEngine::GetEngineState() const
374 {
375     return engineState_;
376 }
377 
SetEngineState(EngineState state)378 void StorageEngine::SetEngineState(EngineState state)
379 {
380     if (state != EngineState::MAINDB) {
381         LOGD("Storage engine state to [%d]!", state);
382     }
383     engineState_ = state;
384 }
385 
ExecuteMigrate()386 int StorageEngine::ExecuteMigrate()
387 {
388     LOGW("Migration is not supported!");
389     return -E_NOT_SUPPORT;
390 }
391 
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)392 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
393 {
394     std::unique_lock<std::shared_mutex> lock(notifyMutex_);
395     commitNotifyFunc_ = callback;
396 }
397 
SetConnectionFlag(bool isExisted)398 void StorageEngine::SetConnectionFlag(bool isExisted)
399 {
400     return isExistConnection_.store(isExisted);
401 }
402 
IsExistConnection() const403 bool StorageEngine::IsExistConnection() const
404 {
405     return isExistConnection_.load();
406 }
407 
CheckEngineOption(const KvDBProperties & kvdbOption) const408 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
409 {
410     return E_OK;
411 }
412 
AddStorageExecutor(StorageExecutor * handle,bool isExternal)413 void StorageEngine::AddStorageExecutor(StorageExecutor *handle, bool isExternal)
414 {
415     if (handle == nullptr) {
416         return;
417     }
418 
419     if (handle->GetWritable()) {
420         std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
421         writeIdleList.push_back(handle);
422     } else {
423         std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
424         readIdleList.push_back(handle);
425     }
426 }
427 
ClearHandleList(std::list<StorageExecutor * > & handleList)428 void ClearHandleList(std::list<StorageExecutor *> &handleList)
429 {
430     for (auto &item : handleList) {
431         if (item != nullptr) {
432             delete item;
433             item = nullptr;
434         }
435     }
436     handleList.clear();
437 }
438 
CloseExecutor()439 void StorageEngine::CloseExecutor()
440 {
441     {
442         std::lock_guard<std::mutex> lock(writeMutex_);
443         ClearHandleList(writeIdleList_);
444         ClearHandleList(externalWriteIdleList_);
445     }
446 
447     {
448         std::lock_guard<std::mutex> lock(readMutex_);
449         ClearHandleList(readIdleList_);
450         ClearHandleList(externalReadIdleList_);
451     }
452 }
453 
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode,bool isExternal)454 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
455     std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal)
456 {
457     if (idleList.empty()) {
458         StorageExecutor *handle = nullptr;
459         errCode = CreateNewExecutor(isWrite, handle);
460         if ((errCode != E_OK) || (handle == nullptr)) {
461             if (errCode != -E_EKEYREVOKED) {
462                 return nullptr;
463             }
464             LOGE("Key revoked status, couldn't create the new executor");
465             if (!usingList.empty()) {
466                 LOGE("Can't create new executor for revoked");
467                 errCode = -E_BUSY;
468             }
469             return nullptr;
470         }
471 
472         AddStorageExecutor(handle, isExternal);
473     }
474     auto item = idleList.front();
475     usingList.push_back(item);
476     idleList.remove(item);
477     if (!isEnhance_) {
478         LOGD("Get executor[%d] from [%.3s]", isWrite, hashIdentifier_.c_str());
479     }
480     errCode = E_OK;
481     return item;
482 }
483 
CheckEngineAttr(const StorageEngineAttr & poolSize)484 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
485 {
486     return (poolSize.maxReadNum > MAX_READ_SIZE ||
487             poolSize.maxWriteNum > MAX_WRITE_SIZE ||
488             poolSize.minReadNum > poolSize.maxReadNum ||
489             poolSize.minWriteNum > poolSize.maxWriteNum);
490 }
491 
IsMigrating() const492 bool StorageEngine::IsMigrating() const
493 {
494     return isMigrating_.load();
495 }
496 
WaitWriteHandleIdle()497 void StorageEngine::WaitWriteHandleIdle()
498 {
499     std::unique_lock<std::mutex> autoLock(idleMutex_);
500     LOGD("Wait wHandle release id[%s]. write[%zu-%zu-%" PRIu32 "]", hashIdentifier_.c_str(),
501         writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
502     idleCondition_.wait(autoLock, [this]() {
503         return writeUsingList_.empty();
504     });
505     LOGD("Wait wHandle release finish id[%s]. write[%zu-%zu-%" PRIu32 "]",
506         hashIdentifier_.c_str(), writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
507 }
508 
IncreaseCacheRecordVersion()509 void StorageEngine::IncreaseCacheRecordVersion()
510 {
511     return;
512 }
513 
GetCacheRecordVersion() const514 uint64_t StorageEngine::GetCacheRecordVersion() const
515 {
516     return 0;
517 }
518 
GetAndIncreaseCacheRecordVersion()519 uint64_t StorageEngine::GetAndIncreaseCacheRecordVersion()
520 {
521     return 0;
522 }
523 
SetSchemaChangedCallback(const std::function<int (void)> & callback)524 void StorageEngine::SetSchemaChangedCallback(const std::function<int(void)> &callback)
525 {
526     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
527     schemaChangedFunc_ = callback;
528 }
529 }
530