• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_CLOUD_DB_H
17 #define VIRTUAL_CLOUD_DB_H
18 #include <atomic>
19 #include <mutex>
20 #include "icloud_db.h"
21 
22 namespace DistributedDB {
23 class VirtualCloudDb : public ICloudDb {
24 public:
25     struct CloudData {
26         VBucket record;
27         VBucket extend;
28     };
29     VirtualCloudDb() = default;
30     ~VirtualCloudDb() override = default;
31     DBStatus BatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
32         std::vector<VBucket> &extend) override;
33 
34     DBStatus BatchInsertWithGid(const std::string &tableName, std::vector<VBucket> &&record,
35         std::vector<VBucket> &extend);
36 
37     DBStatus BatchUpdate(const std::string &tableName, std::vector<VBucket> &&record,
38         std::vector<VBucket> &extend) override;
39 
40     DBStatus BatchDelete(const std::string &tableName, std::vector<VBucket> &extend) override;
41 
42     DBStatus Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data) override;
43 
44     DBStatus DeleteByGid(const std::string &tableName, VBucket &extend);
45 
46     std::pair<DBStatus, std::string> GetEmptyCursor(const std::string &tableName) override;
47 
48     std::pair<DBStatus, uint32_t> Lock() override;
49 
50     DBStatus UnLock() override;
51 
52     DBStatus HeartBeat() override;
53 
54     DBStatus Close() override;
55 
56     void SetCloudError(bool cloudError);
57 
58     void SetBlockTime(int32_t blockTime);
59 
60     void ClearHeartbeatCount();
61 
62     int32_t GetHeartbeatCount() const;
63 
64     bool GetLockStatus() const;
65 
66     void SetHeartbeatError(bool heartbeatError);
67 
68     void SetIncrementData(const std::string &tableName, const VBucket &record, const VBucket &extend);
69 
70     uint32_t GetQueryTimes(const std::string &tableName);
71 
72     void SetActionStatus(DBStatus status);
73 
74     DBStatus GetDataStatus(const std::string &gid, bool &deleteStatus);
75 
76     void ClearAllData();
77 
78     void ForkQuery(const std::function<void(const std::string &, VBucket &)> &forkFunc);
79 
80     void ForkUpload(const std::function<void(const std::string &, VBucket &)> &forkUploadFunc);
81 
82     void ForkInsertConflict(const std::function<DBStatus(const std::string &, VBucket &, VBucket &,
83         std::vector<CloudData> &)> &forkUploadFunc);
84 
85     int32_t GetLockCount() const;
86 
87     void Reset();
88 
89     void SetInsertFailed(int32_t count);
90 
91     void SetClearExtend(int32_t count);
92 
93     void SetCloudNetworkError(bool cloudNetworkError);
94 
95     void SetConflictInUpload(bool conflict);
96 
97     void SetHeartbeatBlockTime(int32_t blockTime);
98 
99     void SetInsertHook(const std::function<void(VBucket &)> &insertCheckFunc);
100 private:
101     DBStatus InnerBatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
102         std::vector<VBucket> &extend);
103 
104     DBStatus InnerUpdate(const std::string &tableName, std::vector<VBucket> &&record,
105         std::vector<VBucket> &extend, bool isDelete);
106 
107     DBStatus InnerUpdateWithoutLock(const std::string &tableName, std::vector<VBucket> &&record,
108         std::vector<VBucket> &extend, bool isDelete);
109 
110     DBStatus UpdateCloudData(const std::string &tableName, CloudData &&cloudData);
111 
112     void GetCloudData(const std::string &cursor, bool isIncreCursor, std::vector<CloudData> allData,
113         std::vector<VBucket> &data, VBucket &extend);
114 
115     bool IsCloudGidMatching(const std::vector<QueryNode> &queryNodes, VBucket &extend);
116 
117     bool IsCloudGidMatchingInner(const QueryNode &queryNode, VBucket &extend);
118 
119     bool IsPrimaryKeyMatching(const std::vector<QueryNode> &queryNodes, VBucket &record);
120 
121     bool IsPrimaryKeyMatchingInner(const QueryNode &queryNode, VBucket &record);
122 
123     void AddAssetIdForExtend(VBucket &record, VBucket &extend);
124 
125     void AddAssetsIdInner(Assets &assets);
126 
127     std::atomic<bool> cloudError_ = false;
128     std::atomic<bool> cloudNetworkError_ = false;
129     std::atomic<bool> heartbeatError_ = false;
130     std::atomic<bool> lockStatus_ = false;
131     std::atomic<bool> conflictInUpload_ = false;
132     std::atomic<int32_t> blockTimeMs_ = 0;
133     std::atomic<int32_t> heartbeatBlockTimeMs_ = 0;
134     std::atomic<int64_t> currentGid_ = 0;
135     std::atomic<int64_t> currentCursor_ = 1;
136     std::atomic<int64_t> currentVersion_ = 0;
137     std::atomic<int32_t> queryLimit_ = 100;
138     std::atomic<int32_t> heartbeatCount_ = 0;
139     std::atomic<int32_t> lockCount_ = 0;
140     std::atomic<int32_t> insertFailedCount_ = 0;
141     std::atomic<int32_t> missingExtendCount_ = 0;
142     std::mutex cloudDataMutex_;
143     std::map<std::string, std::vector<CloudData>> cloudData_;
144     std::map<std::string, std::vector<CloudData>> incrementCloudData_;
145     bool isSetCrementCloudData_ = false;
146     std::string increPrefix_ = "increPrefix_";
147     std::map<std::string, uint32_t> queryTimes_;
148     DBStatus actionStatus_ = OK;
149     std::function<void(const std::string &, VBucket &)> forkFunc_;
150     std::function<void(const std::string &, VBucket &)> forkUploadFunc_;
151     std::function<DBStatus(const std::string &, VBucket &, VBucket &,
152         std::vector<CloudData> &)> forkUploadConflictFunc_;
153     std::function<void(VBucket &)> insertCheckFunc_;
154 };
155 }
156 #endif // VIRTUAL_CLOUD_DB_H
157