• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef STORAGE_ENGINE_H
17 #define STORAGE_ENGINE_H
18 
19 #include <condition_variable>
20 #include <list>
21 #include <mutex>
22 #include <shared_mutex>
23 
24 #include "db_types.h"
25 #include "macro_utils.h"
26 #include "storage_executor.h"
27 #include "kvdb_commit_notify_filterable_data.h"
28 
29 namespace DistributedDB {
30 struct StorageEngineAttr {
31     uint32_t minWriteNum = 1;
32     uint32_t maxWriteNum = 1;
33     uint32_t minReadNum = 1;
34     uint32_t maxReadNum = 1;
35 };
36 
37 class StorageEngine {
38 public:
39     StorageEngine();
40     virtual ~StorageEngine();
41 
42     // Delete the copy and assign constructors
43     DISABLE_COPY_ASSIGN_MOVE(StorageEngine);
44 
45     int Init();
46 
47     StorageExecutor *FindExecutor(bool writable, OperatePerm perm, int &errCode, int waitTime = MAX_WAIT_TIME);
48 
49     void Recycle(StorageExecutor *&handle);
50 
51     void Release();
52 
53     int TryToDisable(bool isNeedCheckAll, OperatePerm disableType = OperatePerm::DISABLE_PERM);
54 
55     void Enable(OperatePerm enableType = OperatePerm::NORMAL_PERM);
56 
57     void Abort(OperatePerm enableType = OperatePerm::NORMAL_PERM);
58 
59     virtual bool IsNeedTobeReleased() const;
60 
61     virtual const std::string &GetIdentifier() const;
62 
63     virtual EngineState GetEngineState() const;
64 
65     virtual void SetEngineState(EngineState state);
66 
67     virtual bool IsNeedMigrate() const;
68 
69     virtual int ExecuteMigrate();
70 
71     virtual void SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback);
72 
73     void SetConnectionFlag(bool isExisted);
74 
75     bool IsExistConnection() const;
76 
77     virtual void ClearEnginePasswd();
78 
79     virtual int CheckEngineOption(const KvDBProperties &kvdbOption) const;
80 
81     virtual bool IsMigrating() const;
82 
83 protected:
84     virtual int CreateNewExecutor(bool isWrite, StorageExecutor *&handle) = 0;
85 
86     void CloseExecutor();
87 
88     virtual void AddStorageExecutor(StorageExecutor *handle);
89 
90     static bool CheckEngineAttr(const StorageEngineAttr &poolSize);
91 
92     int InitReadWriteExecutors();
93 
94     StorageEngineAttr engineAttr_;
95     bool isUpdated_;
96     std::atomic<bool> isMigrating_;
97     std::string identifier_;
98     EngineState engineState_;
99 
100     // Mutex for commitNotifyFunc_.
101     mutable std::shared_mutex notifyMutex_;
102 
103     // Callback function for commit notify.
104     std::function<void(int, KvDBCommitNotifyFilterAbleData *)> commitNotifyFunc_;
105 
106 private:
107     StorageExecutor *FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
108         std::list<StorageExecutor *> &usingList, int &errCode);
109 
110     StorageExecutor *FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime);
111     StorageExecutor *FindReadExecutor(OperatePerm perm, int &errCode, int waitTime);
112 
113     virtual void ClearCorruptedFlag();
114 
115     static const int MAX_WAIT_TIME;
116     static const int MAX_WRITE_SIZE;
117     static const int MAX_READ_SIZE;
118 
119     std::mutex initMutex_;
120     std::condition_variable initCondition_;
121     std::atomic<bool> isInitialized_;
122     OperatePerm perm_;
123     bool operateAbort_;
124 
125     std::mutex readMutex_;
126     std::mutex writeMutex_;
127     std::condition_variable writeCondition_;
128     std::condition_variable readCondition_;
129     std::list<StorageExecutor *> writeUsingList_;
130     std::list<StorageExecutor *> writeIdleList_;
131     std::list<StorageExecutor *> readUsingList_;
132     std::list<StorageExecutor *> readIdleList_;
133     std::atomic<bool> isExistConnection_;
134 };
135 } // namespace DistributedDB
136 #endif // STORAGE_ENGINE_H
137