1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_cloud_db.h"
16
17 #include <thread>
18 #include "cloud_db_types.h"
19 #include "db_constant.h"
20 #include "cloud_db_constant.h"
21 #include "log_print.h"
22 #include "time_helper.h"
23
24 namespace DistributedDB {
25 namespace {
26 const char *g_deleteField = CloudDbConstant::DELETE_FIELD;
27 const char *g_gidField = CloudDbConstant::GID_FIELD;
28 const char *g_cursorField = CloudDbConstant::CURSOR_FIELD;
29 const char *g_modifiedField = CloudDbConstant::MODIFY_FIELD;
30 }
31
BatchInsert(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)32 DBStatus VirtualCloudDb::BatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
33 std::vector<VBucket> &extend)
34 {
35 if (cloudError_) {
36 return DB_ERROR;
37 }
38 if (blockTimeMs_ != 0) {
39 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
40 }
41 if (record.size() != extend.size()) {
42 LOGE("[VirtualCloudDb] not equal records");
43 return DB_ERROR;
44 }
45 std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
46 for (size_t i = 0; i < record.size(); ++i) {
47 if (extend[i].find(g_gidField) != extend[i].end()) {
48 LOGE("[VirtualCloudDb] Insert data should not have gid");
49 return DB_ERROR;
50 }
51 extend[i][g_gidField] = std::to_string(currentGid_++);
52 extend[i][g_cursorField] = std::to_string(currentCursor_++);
53 extend[i][g_deleteField] = false;
54 CloudData cloudData = {
55 .record = std::move(record[i]),
56 .extend = extend[i]
57 };
58 cloudData_[tableName].push_back(cloudData);
59 auto gid = std::get<std::string>(extend[i][g_gidField]);
60 }
61 return OK;
62 }
63
BatchInsertWithGid(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)64 DBStatus VirtualCloudDb::BatchInsertWithGid(const std::string &tableName, std::vector<VBucket> &&record,
65 std::vector<VBucket> &extend)
66 {
67 if (cloudError_) {
68 return DB_ERROR;
69 }
70 if (blockTimeMs_ != 0) {
71 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
72 }
73 if (record.size() != extend.size()) {
74 LOGE("[VirtualCloudDb] not equal records");
75 return DB_ERROR;
76 }
77 std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
78 for (size_t i = 0; i < record.size(); ++i) {
79 if (extend[i].find(g_gidField) == extend[i].end()) {
80 extend[i][g_gidField] = std::to_string(currentGid_++);
81 }
82 extend[i][g_cursorField] = std::to_string(currentCursor_++);
83 extend[i][g_deleteField] = false;
84 CloudData cloudData = {
85 .record = std::move(record[i]),
86 .extend = extend[i]
87 };
88 cloudData_[tableName].push_back(cloudData);
89 }
90
91 LOGI("[VirtualCloudDb] BatchInsertWithGid records");
92 return OK;
93 }
94
BatchUpdate(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend)95 DBStatus VirtualCloudDb::BatchUpdate(const std::string &tableName, std::vector<VBucket> &&record,
96 std::vector<VBucket> &extend)
97 {
98 if (cloudError_) {
99 return DB_ERROR;
100 }
101 if (blockTimeMs_ != 0) {
102 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
103 }
104 return InnerUpdate(tableName, std::move(record), extend, false);
105 }
106
BatchDelete(const std::string & tableName,std::vector<VBucket> & extend)107 DBStatus VirtualCloudDb::BatchDelete(const std::string &tableName, std::vector<VBucket> &extend)
108 {
109 if (cloudError_) {
110 return DB_ERROR;
111 }
112 if (blockTimeMs_ != 0) {
113 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
114 }
115 std::vector<VBucket> record;
116 for (size_t i = 0; i < extend.size(); ++i) {
117 record.emplace_back();
118 }
119 return InnerUpdate(tableName, std::move(record), extend, true);
120 }
121
HeartBeat()122 DBStatus VirtualCloudDb::HeartBeat()
123 {
124 heartbeatCount_++;
125 if (actionStatus_ != OK) {
126 return actionStatus_;
127 }
128 if (cloudError_) {
129 return DB_ERROR;
130 }
131 if (heartbeatError_) {
132 return DB_ERROR;
133 }
134 if (blockTimeMs_ != 0) {
135 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
136 }
137 lockStatus_ = true;
138 return OK;
139 }
140
Lock()141 std::pair<DBStatus, uint32_t> VirtualCloudDb::Lock()
142 {
143 if (actionStatus_ != OK) {
144 return { actionStatus_, DBConstant::MIN_TIMEOUT };
145 }
146 if (cloudError_) {
147 return { DB_ERROR, DBConstant::MIN_TIMEOUT };
148 }
149 if (blockTimeMs_ != 0) {
150 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
151 }
152 lockStatus_ = true;
153 return { OK, DBConstant::MIN_TIMEOUT };
154 }
155
UnLock()156 DBStatus VirtualCloudDb::UnLock()
157 {
158 if (cloudError_) {
159 return DB_ERROR;
160 }
161 if (blockTimeMs_ != 0) {
162 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
163 }
164 lockStatus_ = false;
165 return OK;
166 }
167
Close()168 DBStatus VirtualCloudDb::Close()
169 {
170 if (cloudError_) {
171 return DB_ERROR;
172 }
173 return OK;
174 }
175
DeleteByGid(const std::string & tableName,VBucket & extend)176 DBStatus VirtualCloudDb::DeleteByGid(const std::string &tableName, VBucket &extend)
177 {
178 for (auto &tableData : cloudData_[tableName]) {
179 if (std::get<std::string>(tableData.extend[g_gidField]) == std::get<std::string>(extend[g_gidField])) {
180 tableData.extend[g_modifiedField] = (int64_t)TimeHelper::GetSysCurrentTime() /
181 CloudDbConstant::TEN_THOUSAND;
182 tableData.extend[g_deleteField] = true;
183 LOGD("[VirtualCloudDb] DeleteByGid, gid %s", std::get<std::string>(extend[g_gidField]).c_str());
184 tableData.record.clear();
185 break;
186 }
187 }
188 return OK;
189 }
190
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)191 DBStatus VirtualCloudDb::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
192 {
193 if (actionStatus_ != OK) {
194 return actionStatus_;
195 }
196 if (cloudError_) {
197 return DB_ERROR;
198 }
199 if (blockTimeMs_ != 0) {
200 std::this_thread::sleep_for(std::chrono::milliseconds(blockTimeMs_));
201 }
202 if (queryTimes_.find(tableName) == queryTimes_.end()) {
203 queryTimes_.try_emplace(tableName, 0);
204 }
205 queryTimes_[tableName]++;
206 std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
207 if (cloudData_.find(tableName) == cloudData_.end()) {
208 return QUERY_END;
209 }
210 std::string cursor = std::get<std::string>(extend[g_cursorField]);
211 bool isIncreCursor = (cursor.substr(0, increPrefix_.size()) == increPrefix_);
212 LOGD("extend size: %zu type: %zu expect: %zu, cursor: %s", extend.size(), extend[g_cursorField].index(),
213 TYPE_INDEX<std::string>, cursor.c_str());
214 if (isIncreCursor) {
215 GetCloudData(cursor, isIncreCursor, incrementCloudData_[tableName], data);
216 } else {
217 cursor = cursor.empty() ? "0" : cursor;
218 GetCloudData(cursor, isIncreCursor, cloudData_[tableName], data);
219 }
220 if (!isIncreCursor && data.empty() && isSetCrementCloudData_) {
221 extend[g_cursorField] = increPrefix_;
222 return OK;
223 }
224 return (data.empty() || data.size() < static_cast<size_t>(queryLimit_)) ? QUERY_END : OK;
225 }
226
GetCloudData(const std::string & cursor,bool isIncreCursor,std::vector<CloudData> allData,std::vector<VBucket> & data)227 void VirtualCloudDb::GetCloudData(const std::string &cursor, bool isIncreCursor, std::vector<CloudData> allData,
228 std::vector<VBucket> &data)
229 {
230 for (auto &tableData : allData) {
231 std::string srcCursor = std::get<std::string>(tableData.extend[g_cursorField]);
232 if ((!isIncreCursor && std::stol(srcCursor) > std::stol(cursor)) || isIncreCursor) {
233 VBucket bucket = tableData.record;
234 for (const auto &ex: tableData.extend) {
235 bucket.insert(ex);
236 }
237 data.push_back(std::move(bucket));
238 }
239 if (data.size() >= static_cast<size_t>(queryLimit_)) {
240 return;
241 }
242 }
243 }
244
InnerUpdate(const std::string & tableName,std::vector<VBucket> && record,std::vector<VBucket> & extend,bool isDelete)245 DBStatus VirtualCloudDb::InnerUpdate(const std::string &tableName, std::vector<VBucket> &&record,
246 std::vector<VBucket> &extend, bool isDelete)
247 {
248 if (record.size() != extend.size()) {
249 return DB_ERROR;
250 }
251 std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
252 for (size_t i = 0; i < record.size(); ++i) {
253 if (extend[i].find(g_gidField) == extend[i].end()) {
254 LOGE("[VirtualCloudDb] Update data should have gid");
255 return DB_ERROR;
256 }
257 extend[i][g_cursorField] = std::to_string(currentCursor_++);
258 if (isDelete) {
259 extend[i][g_deleteField] = true;
260 } else {
261 extend[i][g_deleteField] = false;
262 }
263 CloudData cloudData = {
264 .record = std::move(record[i]),
265 .extend = extend[i]
266 };
267 if (UpdateCloudData(tableName, std::move(cloudData)) != OK) {
268 return DB_ERROR;
269 }
270 }
271 return OK;
272 }
273
UpdateCloudData(const std::string & tableName,VirtualCloudDb::CloudData && cloudData)274 DBStatus VirtualCloudDb::UpdateCloudData(const std::string &tableName, VirtualCloudDb::CloudData &&cloudData)
275 {
276 if (cloudData_.find(tableName) == cloudData_.end()) {
277 LOGE("[VirtualCloudDb] update cloud data failed, not found tableName");
278 return DB_ERROR;
279 }
280 std::string paramGid = std::get<std::string>(cloudData.extend[g_gidField]);
281 bool paramDelete = std::get<bool>(cloudData.extend[g_deleteField]);
282 for (auto &data: cloudData_[tableName]) {
283 std::string srcGid = std::get<std::string>(data.extend[g_gidField]);
284 if (srcGid != paramGid) {
285 continue;
286 }
287 if (paramDelete) {
288 if (data.extend.find(g_deleteField) != data.extend.end() &&
289 std::get<bool>(data.extend[g_deleteField])) {
290 LOGE("[VirtualCloudDb] current data has been delete gid %s", paramGid.c_str());
291 return DB_ERROR;
292 }
293 LOGD("[VirtualCloudDb] delete data, gid %s", paramGid.c_str());
294 }
295 data = std::move(cloudData);
296 return OK;
297 }
298 LOGE("[VirtualCloudDb] update cloud data failed, not found gid %s", paramGid.c_str());
299 return DB_ERROR;
300 }
301
SetCloudError(bool cloudError)302 void VirtualCloudDb::SetCloudError(bool cloudError)
303 {
304 cloudError_ = cloudError;
305 }
306
SetBlockTime(int32_t blockTime)307 void VirtualCloudDb::SetBlockTime(int32_t blockTime)
308 {
309 blockTimeMs_ = blockTime;
310 }
311
ClearHeartbeatCount()312 void VirtualCloudDb::ClearHeartbeatCount()
313 {
314 heartbeatCount_ = 0;
315 }
316
GetHeartbeatCount()317 int32_t VirtualCloudDb::GetHeartbeatCount()
318 {
319 return heartbeatCount_;
320 }
321
GetLockStatus()322 bool VirtualCloudDb::GetLockStatus()
323 {
324 return lockStatus_;
325 }
326
SetHeartbeatError(bool heartbeatError)327 void VirtualCloudDb::SetHeartbeatError(bool heartbeatError)
328 {
329 heartbeatError_ = heartbeatError;
330 }
331
SetIncrementData(const std::string & tableName,const VBucket & record,const VBucket & extend)332 void VirtualCloudDb::SetIncrementData(const std::string &tableName, const VBucket &record, const VBucket &extend)
333 {
334 std::lock_guard<std::mutex> autoLock(cloudDataMutex_);
335 isSetCrementCloudData_ = true;
336 auto iter = incrementCloudData_.find(tableName);
337 if (iter == incrementCloudData_.end()) {
338 return;
339 }
340 CloudData data = {record, extend};
341 iter->second.push_back(data);
342 }
343
GetQueryTimes(const std::string & tableName)344 uint32_t VirtualCloudDb::GetQueryTimes(const std::string &tableName)
345 {
346 if (queryTimes_.find(tableName) == queryTimes_.end()) {
347 return 0;
348 }
349 return queryTimes_[tableName];
350 }
351
SetActionStatus(DBStatus status)352 void VirtualCloudDb::SetActionStatus(DBStatus status)
353 {
354 actionStatus_ = status;
355 }
356 }