• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_cloud_db.h"
16 
17 #include <thread>
18 #include "cloud_db_types.h"
19 #include "db_constant.h"
20 #include "cloud_db_constant.h"
21 #include "log_print.h"
22 #include "time_helper.h"
23 
24 namespace DistributedDB {
25 namespace {
26     const char *g_deleteField = CloudDbConstant::DELETE_FIELD;
27     const char *g_gidField = CloudDbConstant::GID_FIELD;
28     const char *g_cursorField = CloudDbConstant::CURSOR_FIELD;
29     const char *g_modifiedField = CloudDbConstant::MODIFY_FIELD;
30 }
31 
BatchInsert(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)32 DBStatus VirtualCloudDb::BatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
33     std::vector<VBucket> &extend)
34 {
35     if (cloudError_) {
36         return DB_ERROR;
37     }
38     if (blockTimeMs_ != 0) {
39         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
40     }
41     if (record.size() != extend.size()) {
42         LOGE("[VirtualCloudDb] not equal records");
43         return DB_ERROR;
44     }
45     std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
46     for (size_t i = 0; i < record.size(); ++i) {
47         if (extend[i].find(g_gidField) != extend[i].end()) {
48             LOGE("[VirtualCloudDb] Insert data should not have gid");
49             return DB_ERROR;
50         }
51         extend[i][g_gidField] = std::to_string(currentGid_++);
52         extend[i][g_cursorField] = std::to_string(currentCursor_++);
53         extend[i][g_deleteField] = false;
54         CloudData cloudData = {
55             .record = std::move(record[i]),
56             .extend = extend[i]
57         };
58         cloudData_[tableName].push_back(cloudData);
59         auto gid = std::get<std::string>(extend[i][g_gidField]);
60     }
61     return OK;
62 }
63 
BatchInsertWithGid(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)64 DBStatus VirtualCloudDb::BatchInsertWithGid(const std::string &tableName, std::vector<VBucket> &&record,
65     std::vector<VBucket> &extend)
66 {
67     if (cloudError_) {
68         return DB_ERROR;
69     }
70     if (blockTimeMs_ != 0) {
71         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
72     }
73     if (record.size() != extend.size()) {
74         LOGE("[VirtualCloudDb] not equal records");
75         return DB_ERROR;
76     }
77     std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
78     for (size_t i = 0; i < record.size(); ++i) {
79         if (extend[i].find(g_gidField) == extend[i].end()) {
80             extend[i][g_gidField] = std::to_string(currentGid_++);
81         }
82         extend[i][g_cursorField] = std::to_string(currentCursor_++);
83         extend[i][g_deleteField] = false;
84         CloudData cloudData = {
85             .record = std::move(record[i]),
86             .extend = extend[i]
87         };
88         cloudData_[tableName].push_back(cloudData);
89     }
90 
91     LOGI("[VirtualCloudDb] BatchInsertWithGid records");
92     return OK;
93 }
94 
BatchUpdate(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)95 DBStatus VirtualCloudDb::BatchUpdate(const std::string &tableName, std::vector<VBucket> &&record,
96     std::vector<VBucket> &extend)
97 {
98     if (cloudError_) {
99         return DB_ERROR;
100     }
101     if (blockTimeMs_ != 0) {
102         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
103     }
104     return InnerUpdate(tableName, std::move(record), extend, false);
105 }
106 
BatchDelete(const std::string & tableName,std::vector<VBucket> & extend)107 DBStatus VirtualCloudDb::BatchDelete(const std::string &tableName, std::vector<VBucket> &extend)
108 {
109     if (cloudError_) {
110         return DB_ERROR;
111     }
112     if (blockTimeMs_ != 0) {
113         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
114     }
115     std::vector<VBucket> record;
116     for (size_t i = 0; i < extend.size(); ++i) {
117         record.emplace_back();
118     }
119     return InnerUpdate(tableName, std::move(record), extend, true);
120 }
121 
HeartBeat()122 DBStatus VirtualCloudDb::HeartBeat()
123 {
124     heartbeatCount_++;
125     if (actionStatus_ != OK) {
126         return actionStatus_;
127     }
128     if (cloudError_) {
129         return DB_ERROR;
130     }
131     if (heartbeatError_) {
132         return DB_ERROR;
133     }
134     if (blockTimeMs_ != 0) {
135         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
136     }
137     lockStatus_ = true;
138     return OK;
139 }
140 
Lock()141 std::pair<DBStatus, uint32_t> VirtualCloudDb::Lock()
142 {
143     if (actionStatus_ != OK) {
144         return { actionStatus_, DBConstant::MIN_TIMEOUT };
145     }
146     if (cloudError_) {
147         return { DB_ERROR, DBConstant::MIN_TIMEOUT };
148     }
149     if (blockTimeMs_ != 0) {
150         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
151     }
152     lockStatus_ = true;
153     return { OK, DBConstant::MIN_TIMEOUT };
154 }
155 
UnLock()156 DBStatus VirtualCloudDb::UnLock()
157 {
158     if (cloudError_) {
159         return DB_ERROR;
160     }
161     if (blockTimeMs_ != 0) {
162         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
163     }
164     lockStatus_ = false;
165     return OK;
166 }
167 
Close()168 DBStatus VirtualCloudDb::Close()
169 {
170     if (cloudError_) {
171         return DB_ERROR;
172     }
173     return OK;
174 }
175 
DeleteByGid(const std::string & tableName,VBucket & extend)176 DBStatus VirtualCloudDb::DeleteByGid(const std::string &tableName, VBucket &extend)
177 {
178     for (auto &tableData : cloudData_[tableName]) {
179         if (std::get<std::string>(tableData.extend[g_gidField]) == std::get<std::string>(extend[g_gidField])) {
180             tableData.extend[g_modifiedField] = (int64_t)TimeHelper::GetSysCurrentTime() /
181                 CloudDbConstant::TEN_THOUSAND;
182             tableData.extend[g_deleteField] = true;
183             LOGD("[VirtualCloudDb] DeleteByGid, gid %s", std::get<std::string>(extend[g_gidField]).c_str());
184             tableData.record.clear();
185             break;
186         }
187     }
188     return OK;
189 }
190 
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)191 DBStatus VirtualCloudDb::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
192 {
193     if (actionStatus_ != OK) {
194         return actionStatus_;
195     }
196     if (cloudError_) {
197         return DB_ERROR;
198     }
199     if (blockTimeMs_ != 0) {
200         std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
201     }
202     if (queryTimes_.find(tableName) == queryTimes_.end()) {
203         queryTimes_.try_emplace(tableName, 0);
204     }
205     queryTimes_[tableName]++;
206     std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
207     if (cloudData_.find(tableName) == cloudData_.end()) {
208         return QUERY_END;
209     }
210     std::string cursor = std::get<std::string>(extend[g_cursorField]);
211     bool isIncreCursor = (cursor.substr(0, increPrefix_.size()) == increPrefix_);
212     LOGD("extend size: %zu type: %zu  expect: %zu, cursor: %s", extend.size(), extend[g_cursorField].index(),
213         TYPE_INDEX<std::string>, cursor.c_str());
214     if (isIncreCursor) {
215         GetCloudData(cursor, isIncreCursor, incrementCloudData_[tableName], data);
216     } else {
217         cursor = cursor.empty() ? "0" : cursor;
218         GetCloudData(cursor, isIncreCursor, cloudData_[tableName], data);
219     }
220     if (!isIncreCursor && data.empty() && isSetCrementCloudData_) {
221         extend[g_cursorField] = increPrefix_;
222         return OK;
223     }
224     return (data.empty() || data.size() < static_cast<size_t>(queryLimit_)) ? QUERY_END : OK;
225 }
226 
GetCloudData(const std::string & cursor,bool isIncreCursor,std::vector<CloudData> allData,std::vector<VBucket> & data)227 void VirtualCloudDb::GetCloudData(const std::string &cursor, bool isIncreCursor, std::vector<CloudData> allData,
228     std::vector<VBucket> &data)
229 {
230     for (auto &tableData : allData) {
231         std::string srcCursor = std::get<std::string>(tableData.extend[g_cursorField]);
232         if ((!isIncreCursor && std::stol(srcCursor) > std::stol(cursor)) || isIncreCursor) {
233             VBucket bucket = tableData.record;
234             for (const auto &ex: tableData.extend) {
235                 bucket.insert(ex);
236             }
237             data.push_back(std::move(bucket));
238         }
239         if (data.size() >= static_cast<size_t>(queryLimit_)) {
240             return;
241         }
242     }
243 }
244 
InnerUpdate(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend,bool isDelete)245 DBStatus VirtualCloudDb::InnerUpdate(const std::string &tableName, std::vector<VBucket> &&record,
246     std::vector<VBucket> &extend, bool isDelete)
247 {
248     if (record.size() != extend.size()) {
249         return DB_ERROR;
250     }
251     std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
252     for (size_t i = 0; i < record.size(); ++i) {
253         if (extend[i].find(g_gidField) == extend[i].end()) {
254             LOGE("[VirtualCloudDb] Update data should have gid");
255             return DB_ERROR;
256         }
257         extend[i][g_cursorField] = std::to_string(currentCursor_++);
258         if (isDelete) {
259             extend[i][g_deleteField] = true;
260         } else {
261             extend[i][g_deleteField] = false;
262         }
263         CloudData cloudData = {
264             .record = std::move(record[i]),
265             .extend = extend[i]
266         };
267         if (UpdateCloudData(tableName, std::move(cloudData)) != OK) {
268             return DB_ERROR;
269         }
270     }
271     return OK;
272 }
273 
UpdateCloudData(const std::string & tableName,VirtualCloudDb::CloudData && cloudData)274 DBStatus VirtualCloudDb::UpdateCloudData(const std::string &tableName, VirtualCloudDb::CloudData &&cloudData)
275 {
276     if (cloudData_.find(tableName) == cloudData_.end()) {
277         LOGE("[VirtualCloudDb] update cloud data failed, not found tableName");
278         return DB_ERROR;
279     }
280     std::string paramGid = std::get<std::string>(cloudData.extend[g_gidField]);
281     bool paramDelete = std::get<bool>(cloudData.extend[g_deleteField]);
282     for (auto &data: cloudData_[tableName]) {
283         std::string srcGid = std::get<std::string>(data.extend[g_gidField]);
284         if (srcGid != paramGid) {
285             continue;
286         }
287         if (paramDelete) {
288             if (data.extend.find(g_deleteField) != data.extend.end() &&
289                 std::get<bool>(data.extend[g_deleteField])) {
290                 LOGE("[VirtualCloudDb] current data has been delete gid %s", paramGid.c_str());
291                 return DB_ERROR;
292             }
293             LOGD("[VirtualCloudDb] delete data, gid %s", paramGid.c_str());
294         }
295         data = std::move(cloudData);
296         return OK;
297     }
298     LOGE("[VirtualCloudDb] update cloud data failed, not found gid %s", paramGid.c_str());
299     return DB_ERROR;
300 }
301 
SetCloudError(bool cloudError)302 void VirtualCloudDb::SetCloudError(bool cloudError)
303 {
304     cloudError_ = cloudError;
305 }
306 
SetBlockTime(int32_t blockTime)307 void VirtualCloudDb::SetBlockTime(int32_t blockTime)
308 {
309     blockTimeMs_ = blockTime;
310 }
311 
ClearHeartbeatCount()312 void VirtualCloudDb::ClearHeartbeatCount()
313 {
314     heartbeatCount_ = 0;
315 }
316 
GetHeartbeatCount()317 int32_t VirtualCloudDb::GetHeartbeatCount()
318 {
319     return heartbeatCount_;
320 }
321 
GetLockStatus()322 bool VirtualCloudDb::GetLockStatus()
323 {
324     return lockStatus_;
325 }
326 
SetHeartbeatError(bool heartbeatError)327 void VirtualCloudDb::SetHeartbeatError(bool heartbeatError)
328 {
329     heartbeatError_ = heartbeatError;
330 }
331 
SetIncrementData(const std::string & tableName,const VBucket & record,const VBucket & extend)332 void VirtualCloudDb::SetIncrementData(const std::string &tableName, const VBucket &record, const VBucket &extend)
333 {
334     std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
335     isSetCrementCloudData_ = true;
336     auto iter = incrementCloudData_.find(tableName);
337     if (iter == incrementCloudData_.end()) {
338         return;
339     }
340     CloudData data = {record, extend};
341     iter->second.push_back(data);
342 }
343 
GetQueryTimes(const std::string & tableName)344 uint32_t VirtualCloudDb::GetQueryTimes(const std::string &tableName)
345 {
346     if (queryTimes_.find(tableName) == queryTimes_.end()) {
347         return 0;
348     }
349     return queryTimes_[tableName];
350 }
351 
SetActionStatus(DBStatus status)352 void VirtualCloudDb::SetActionStatus(DBStatus status)
353 {
354     actionStatus_ = status;
355 }
356 }