• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_engine.h"
17 
18 #include <algorithm>
19 
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23 
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28 
StorageEngine()29 StorageEngine::StorageEngine()
30     : isUpdated_(false),
31       isMigrating_(false),
32       commitNotifyFunc_(nullptr),
33       schemaChangedFunc_(nullptr),
34       isSchemaChanged_(false),
35       isInitialized_(false),
36       perm_(OperatePerm::NORMAL_PERM),
37       operateAbort_(false),
38       isExistConnection_(false),
39       engineState_(EngineState::INVALID)
40 {}
41 
~StorageEngine()42 StorageEngine::~StorageEngine()
43 {
44     CloseExecutor();
45 }
46 
CloseAllExecutor()47 void StorageEngine::CloseAllExecutor()
48 {
49     CloseExecutor();
50 }
51 
InitAllReadWriteExecutor()52 int StorageEngine::InitAllReadWriteExecutor()
53 {
54     return InitReadWriteExecutors();
55 }
56 
GetOption()57 OpenDbProperties StorageEngine::GetOption()
58 {
59     return option_;
60 }
61 
InitReadWriteExecutors()62 int StorageEngine::InitReadWriteExecutors()
63 {
64     int errCode = E_OK;
65     std::scoped_lock initLock(writeMutex_, readMutex_);
66     // only for create the database avoid the minimum number is 0.
67     StorageExecutor *handle = nullptr;
68     if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
69         errCode = CreateNewExecutor(true, handle);
70         if (errCode != E_OK) {
71             return errCode;
72         }
73 
74         if (handle != nullptr) {
75             delete handle;
76             handle = nullptr;
77         }
78     }
79 
80     for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
81         handle = nullptr;
82         errCode = CreateNewExecutor(true, handle);
83         if (errCode != E_OK) {
84             return errCode;
85         }
86         AddStorageExecutor(handle, false);
87     }
88 
89     for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
90         handle = nullptr;
91         errCode = CreateNewExecutor(false, handle);
92         if (errCode != E_OK) {
93             return errCode;
94         }
95         AddStorageExecutor(handle, false);
96     }
97     return E_OK;
98 }
99 
100 
Init()101 int StorageEngine::Init()
102 {
103     if (isInitialized_.load()) {
104         LOGD("Storage engine has been initialized!");
105         return E_OK;
106     }
107 
108     int errCode = InitReadWriteExecutors();
109     if (errCode == E_OK) {
110         isInitialized_.store(true);
111         initCondition_.notify_all();
112         return E_OK;
113     } else if (errCode == -E_EKEYREVOKED) {
114         // Assumed file system has classification function, can only get one write handle
115         std::unique_lock<std::mutex> lock(writeMutex_);
116         if (!writeIdleList_.empty() || !writeUsingList_.empty()) {
117             isInitialized_.store(true);
118             initCondition_.notify_all();
119             return E_OK;
120         }
121     }
122     initCondition_.notify_all();
123     Release();
124     return errCode;
125 }
126 
ReInit()127 int StorageEngine::ReInit()
128 {
129     return E_OK;
130 }
131 
FindExecutor(bool writable,OperatePerm perm,int & errCode,bool isExternal,int waitTime)132 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal,
133     int waitTime)
134 {
135     if (GetEngineState() == EngineState::ENGINE_BUSY) {
136         LOGI("Storage engine is busy!");
137         errCode = -E_BUSY;
138         return nullptr;
139     }
140 
141     {
142         std::unique_lock<std::mutex> lock(initMutex_);
143         bool result = initCondition_.wait_for(lock, std::chrono::seconds(waitTime), [this]() {
144             return isInitialized_.load();
145         });
146         if (!result || !isInitialized_.load()) {
147             LOGE("Storage engine is not initialized");
148             errCode = -E_BUSY; // Usually in reinitialize engine, return BUSY
149             return nullptr;
150         }
151     }
152 
153     if (writable) {
154         return FindWriteExecutor(perm, errCode, waitTime, isExternal);
155     }
156 
157     return FindReadExecutor(perm, errCode, waitTime, isExternal);
158 }
159 
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)160 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
161 {
162     LOGD("[FindWriteExecutor]Finding WriteExecutor");
163     std::unique_lock<std::mutex> lock(writeMutex_);
164     errCode = -E_BUSY;
165     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
166         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
167         return nullptr;
168     }
169     std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
170     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
171     if (waitTime <= 0) { // non-blocking.
172         if (writeUsingList.empty() &&
173                 writeIdleList.size() + writeUsingList.size() == engineAttr_.maxWriteNum) {
174             return nullptr;
175         }
176         return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
177     }
178     // Not prohibited and there is an available handle
179     bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
180         [this, &perm, &writeUsingList, &writeIdleList]() {
181             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList.empty() ||
182                 (writeIdleList.size() + writeUsingList.size() < engineAttr_.maxWriteNum) ||
183                 operateAbort_);
184         });
185     if (operateAbort_) {
186         LOGI("Abort write executor and executor and busy for operate!");
187         return nullptr;
188     }
189     if (!result) {
190         LOGI("Get write handle result[%d], permissType[%u], operType[%u], write[%zu-%zu-%" PRIu32 "]", result,
191             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), writeIdleList.size(), writeUsingList.size(),
192             engineAttr_.maxWriteNum);
193         return nullptr;
194     }
195     return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
196 }
197 
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)198 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
199 {
200     std::unique_lock<std::mutex> lock(readMutex_);
201     errCode = -E_BUSY;
202     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
203         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
204         return nullptr;
205     }
206 
207     std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
208     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
209     if (waitTime <= 0) { // non-blocking.
210         if (readIdleList.empty() &&
211             readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) {
212             return nullptr;
213         }
214         return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
215     }
216 
217     // Not prohibited and there is an available handle
218     bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
219         [this, &perm, &readUsingList, &readIdleList]() {
220             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
221                 (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < engineAttr_.maxReadNum) ||
222                 operateAbort_);
223         });
224     if (operateAbort_) {
225         LOGI("Abort find read executor and busy for operate!");
226         return nullptr;
227     }
228     if (!result) {
229         LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result,
230             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), readIdleList.size(), readUsingList.size(),
231             engineAttr_.maxReadNum);
232         return nullptr;
233     }
234     return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
235 }
236 
Recycle(StorageExecutor * & handle,bool isExternal)237 void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal)
238 {
239     if (handle == nullptr) {
240         return;
241     }
242     LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), hashIdentifier_.c_str());
243     std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
244     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
245     std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
246     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
247     if (handle->GetWritable()) {
248         std::unique_lock<std::mutex> lock(writeMutex_);
249         auto iter = std::find(writeUsingList.begin(), writeUsingList.end(), handle);
250         if (iter != writeUsingList.end()) {
251             writeUsingList.remove(handle);
252             if (!writeIdleList.empty()) {
253                 delete handle;
254                 handle = nullptr;
255                 return;
256             }
257             handle->Reset();
258             writeIdleList.push_back(handle);
259             writeCondition_.notify_one();
260             idleCondition_.notify_all();
261         }
262     } else {
263         std::unique_lock<std::mutex> lock(readMutex_);
264         auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle);
265         if (iter != readUsingList.end()) {
266             readUsingList.remove(handle);
267             if (!readIdleList.empty()) {
268                 delete handle;
269                 handle = nullptr;
270                 return;
271             }
272             handle->Reset();
273             readIdleList.push_back(handle);
274             readCondition_.notify_one();
275         }
276     }
277     handle = nullptr;
278 }
279 
ClearCorruptedFlag()280 void StorageEngine::ClearCorruptedFlag()
281 {
282     return;
283 }
284 
IsEngineCorrupted() const285 bool StorageEngine::IsEngineCorrupted() const
286 {
287     return false;
288 }
289 
Release()290 void StorageEngine::Release()
291 {
292     CloseExecutor();
293     isInitialized_.store(false);
294     isUpdated_ = false;
295     ClearCorruptedFlag();
296     SetEngineState(EngineState::INVALID);
297 }
298 
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)299 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
300 {
301     if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
302         LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
303         return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
304     }
305 
306     std::lock(writeMutex_, readMutex_);
307     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
308     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
309 
310     if (!isNeedCheckAll) {
311         goto END;
312     }
313 
314     if (!writeUsingList_.empty() || !readUsingList_.empty() || !externalWriteUsingList_.empty() ||
315         !externalReadUsingList_.empty()) {
316         LOGE("Database handle used");
317         return -E_BUSY;
318     }
319 END:
320     if (perm_ == OperatePerm::NORMAL_PERM) {
321         LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
322         perm_ = disableType;
323         writeCondition_.notify_all();
324         readCondition_.notify_all();
325     }
326     return E_OK;
327 }
328 
Enable(OperatePerm enableType)329 void StorageEngine::Enable(OperatePerm enableType)
330 {
331     std::lock(writeMutex_, readMutex_);
332     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
333     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
334     if (perm_ == enableType) {
335         LOGI("Re-enable the database");
336         perm_ = OperatePerm::NORMAL_PERM;
337         writeCondition_.notify_all();
338         readCondition_.notify_all();
339     }
340 }
341 
Abort(OperatePerm enableType)342 void StorageEngine::Abort(OperatePerm enableType)
343 {
344     std::lock(writeMutex_, readMutex_);
345     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
346     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
347     if (perm_ == enableType) {
348         LOGI("Abort the handle occupy, release all!");
349         perm_ = OperatePerm::NORMAL_PERM;
350         operateAbort_ = true;
351 
352         writeCondition_.notify_all();
353         readCondition_.notify_all();
354     }
355 }
356 
IsNeedTobeReleased() const357 bool StorageEngine::IsNeedTobeReleased() const
358 {
359     EngineState engineState = GetEngineState();
360     return ((engineState == EngineState::MAINDB) || (engineState == EngineState::INVALID));
361 }
362 
GetIdentifier() const363 const std::string &StorageEngine::GetIdentifier() const
364 {
365     return identifier_;
366 }
367 
GetEngineState() const368 EngineState StorageEngine::GetEngineState() const
369 {
370     return engineState_;
371 }
372 
SetEngineState(EngineState state)373 void StorageEngine::SetEngineState(EngineState state)
374 {
375     LOGI("Storage engine state to [%d]!", state);
376     engineState_ = state;
377 }
378 
ExecuteMigrate()379 int StorageEngine::ExecuteMigrate()
380 {
381     LOGW("Migration is not supported!");
382     return -E_NOT_SUPPORT;
383 }
384 
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)385 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
386 {
387     std::unique_lock<std::shared_mutex> lock(notifyMutex_);
388     commitNotifyFunc_ = callback;
389 }
390 
SetConnectionFlag(bool isExisted)391 void StorageEngine::SetConnectionFlag(bool isExisted)
392 {
393     return isExistConnection_.store(isExisted);
394 }
395 
IsExistConnection() const396 bool StorageEngine::IsExistConnection() const
397 {
398     return isExistConnection_.load();
399 }
400 
CheckEngineOption(const KvDBProperties & kvdbOption) const401 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
402 {
403     return E_OK;
404 }
405 
AddStorageExecutor(StorageExecutor * handle,bool isExternal)406 void StorageEngine::AddStorageExecutor(StorageExecutor *handle, bool isExternal)
407 {
408     if (handle == nullptr) {
409         return;
410     }
411 
412     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
413     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
414     if (handle->GetWritable()) {
415         writeIdleList.push_back(handle);
416     } else {
417         readIdleList.push_back(handle);
418     }
419 }
420 
ClearHandleList(std::list<StorageExecutor * > & handleList)421 void ClearHandleList(std::list<StorageExecutor *> &handleList)
422 {
423     for (auto &item : handleList) {
424         if (item != nullptr) {
425             delete item;
426             item = nullptr;
427         }
428     }
429     handleList.clear();
430 }
431 
CloseExecutor()432 void StorageEngine::CloseExecutor()
433 {
434     {
435         std::lock_guard<std::mutex> lock(writeMutex_);
436         ClearHandleList(writeIdleList_);
437         ClearHandleList(externalWriteIdleList_);
438     }
439 
440     {
441         std::lock_guard<std::mutex> lock(readMutex_);
442         ClearHandleList(readIdleList_);
443         ClearHandleList(externalReadIdleList_);
444     }
445 }
446 
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode,bool isExternal)447 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
448     std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal)
449 {
450     if (idleList.empty()) {
451         StorageExecutor *handle = nullptr;
452         errCode = CreateNewExecutor(isWrite, handle);
453         if ((errCode != E_OK) || (handle == nullptr)) {
454             if (errCode != -E_EKEYREVOKED) {
455                 return nullptr;
456             }
457             LOGE("Key revoked status, couldn't create the new executor");
458             if (!usingList.empty()) {
459                 LOGE("Can't create new executor for revoked");
460                 errCode = -E_BUSY;
461             }
462             return nullptr;
463         }
464 
465         AddStorageExecutor(handle, isExternal);
466     }
467     auto item = idleList.front();
468     usingList.push_back(item);
469     idleList.remove(item);
470     LOGD("Get executor[%d] from [%.3s]", isWrite, hashIdentifier_.c_str());
471     errCode = E_OK;
472     return item;
473 }
474 
CheckEngineAttr(const StorageEngineAttr & poolSize)475 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
476 {
477     return (poolSize.maxReadNum > MAX_READ_SIZE ||
478             poolSize.maxWriteNum > MAX_WRITE_SIZE ||
479             poolSize.minReadNum > poolSize.maxReadNum ||
480             poolSize.minWriteNum > poolSize.maxWriteNum);
481 }
482 
IsMigrating() const483 bool StorageEngine::IsMigrating() const
484 {
485     return isMigrating_.load();
486 }
487 
WaitWriteHandleIdle()488 void StorageEngine::WaitWriteHandleIdle()
489 {
490     std::unique_lock<std::mutex> autoLock(idleMutex_);
491     LOGD("Wait wHandle release id[%s]. write[%zu-%zu-%" PRIu32 "]", hashIdentifier_.c_str(),
492         writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
493     idleCondition_.wait(autoLock, [this]() {
494         return writeUsingList_.empty();
495     });
496     LOGD("Wait wHandle release finish id[%s]. write[%zu-%zu-%" PRIu32 "]",
497         hashIdentifier_.c_str(), writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
498 }
499 
IncreaseCacheRecordVersion()500 void StorageEngine::IncreaseCacheRecordVersion()
501 {
502     return;
503 }
504 
GetCacheRecordVersion() const505 uint64_t StorageEngine::GetCacheRecordVersion() const
506 {
507     return 0;
508 }
509 
GetAndIncreaseCacheRecordVersion()510 uint64_t StorageEngine::GetAndIncreaseCacheRecordVersion()
511 {
512     return 0;
513 }
514 
SetSchemaChangedCallback(const std::function<int (void)> & callback)515 void StorageEngine::SetSchemaChangedCallback(const std::function<int(void)> &callback)
516 {
517     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
518     schemaChangedFunc_ = callback;
519 }
520 }
521