1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "kv_store_errno.h"
21 #include "log_print.h"
22
23 namespace DistributedDB {
CloudDBProxy()24 CloudDBProxy::CloudDBProxy()
25 : isDownloading_(false)
26 {
27 }
28
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)29 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
30 {
31 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
32 if (!iCloudDb_) {
33 iCloudDb_ = cloudDB;
34 }
35 }
36
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)37 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
38 {
39 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
40 auto it = std::find_if(cloudDBs.begin(), cloudDBs.end(), [](const auto &item) { return item.second == nullptr; });
41 if (it != cloudDBs.end()) {
42 LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", it->first.c_str());
43 return -E_INVALID_ARGS;
44 }
45 cloudDbs_ = cloudDBs;
46 return E_OK;
47 }
48
GetCloudDB() const49 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
50 {
51 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
52 return cloudDbs_;
53 }
54
SwitchCloudDB(const std::string & user)55 void CloudDBProxy::SwitchCloudDB(const std::string &user)
56 {
57 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
58 if (cloudDbs_.find(user) == cloudDbs_.end()) {
59 return;
60 }
61 iCloudDb_ = cloudDbs_[user];
62 }
63
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)64 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
65 {
66 std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
67 iAssetLoader_ = loader;
68 }
69
RecordSyncDataTimeStampLog(std::vector<VBucket> & data,InnerActionCode action)70 void CloudDBProxy::RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action)
71 {
72 if (data.empty()) {
73 LOGI("[CloudDBProxy] sync data is empty");
74 return;
75 }
76
77 int64_t first = 0;
78 int errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[0], first);
79 if (errCode != E_OK) {
80 LOGE("get first modify time for bucket failed, %d", errCode);
81 return;
82 }
83
84 int64_t last = 0;
85 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[data.size() - 1],
86 last);
87 if (errCode != E_OK) {
88 LOGE("get last modify time for bucket failed, %d", errCode);
89 return;
90 }
91
92 LOGI("[CloudDBProxy] sync action is %d and size is %d, sync data: first timestamp %lld, last timestamp %lld",
93 static_cast<int>(action), data.size(), first, last);
94 }
95
FillErrorToExtend(int error,std::vector<VBucket> & extend)96 void CloudDBProxy::FillErrorToExtend(int error, std::vector<VBucket> &extend)
97 {
98 for (auto &item : extend) {
99 if (item.find(CloudDbConstant::ERROR_FIELD) == item.end()) {
100 item[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(TransferDBErrno(error));
101 }
102 }
103 }
104
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)105 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
106 std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
107 {
108 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
109 if (iCloudDb_ == nullptr) {
110 FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
111 return -E_CLOUD_ERROR;
112 }
113 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
114 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
115 context->MoveInRecordAndExtend(record, extend);
116 context->SetTableName(tableName);
117 int errCode = InnerAction(context, cloudDb, InnerActionCode::INSERT);
118 uploadInfo = context->GetInfo();
119 retryCount = context->GetRetryCount();
120 context->MoveOutRecordAndExtend(record, extend);
121 return errCode;
122 }
123
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)124 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
125 std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
126 {
127 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
128 if (iCloudDb_ == nullptr) {
129 FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
130 return -E_CLOUD_ERROR;
131 }
132 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
133 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
134 context->SetTableName(tableName);
135 context->MoveInRecordAndExtend(record, extend);
136 int errCode = InnerAction(context, cloudDb, InnerActionCode::UPDATE);
137 uploadInfo = context->GetInfo();
138 retryCount = context->GetRetryCount();
139 context->MoveOutRecordAndExtend(record, extend);
140 return errCode;
141 }
142
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)143 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
144 Info &uploadInfo, uint32_t &retryCount)
145 {
146 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
147 if (iCloudDb_ == nullptr) {
148 FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
149 return -E_CLOUD_ERROR;
150 }
151 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
152 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
153 context->MoveInRecordAndExtend(record, extend);
154 context->SetTableName(tableName);
155 int errCode = InnerAction(context, cloudDb, InnerActionCode::DELETE);
156 uploadInfo = context->GetInfo();
157 retryCount = context->GetRetryCount();
158 context->MoveOutRecordAndExtend(record, extend);
159 return errCode;
160 }
161
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)162 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
163 {
164 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
165 if (iCloudDb_ == nullptr) {
166 return -E_CLOUD_ERROR;
167 }
168 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
169 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
170 context->MoveInQueryExtendAndData(extend, data);
171 context->SetTableName(tableName);
172 int errCode = InnerAction(context, cloudDb, InnerActionCode::QUERY);
173 context->MoveOutQueryExtendAndData(extend, data);
174 for (auto &item : data) {
175 for (auto &row : item) {
176 auto assets = std::get_if<Assets>(&row.second);
177 if (assets == nullptr) {
178 continue;
179 }
180 DBCommon::RemoveDuplicateAssetsData(*assets);
181 }
182 }
183 RecordSyncDataTimeStampLog(data, InnerActionCode::QUERY);
184 return errCode;
185 }
186
Lock()187 std::pair<int, uint64_t> CloudDBProxy::Lock()
188 {
189 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
190 if (iCloudDb_ == nullptr) {
191 return { -E_CLOUD_ERROR, 0u };
192 }
193 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
194 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
195 std::pair<int, uint64_t> lockStatus;
196 int errCode = InnerAction(context, cloudDb, InnerActionCode::LOCK);
197 context->MoveOutLockStatus(lockStatus);
198 lockStatus.first = errCode;
199 return lockStatus;
200 }
201
UnLock()202 int CloudDBProxy::UnLock()
203 {
204 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
205 if (iCloudDb_ == nullptr) {
206 return -E_CLOUD_ERROR;
207 }
208 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
209 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
210 return InnerAction(context, cloudDb, InnerActionCode::UNLOCK);
211 }
212
Close()213 int CloudDBProxy::Close()
214 {
215 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
216 std::vector<std::shared_ptr<ICloudDb>> waitForClose;
217 {
218 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
219 if (iCloudDb_ != nullptr) {
220 iCloudDb = iCloudDb_;
221 iCloudDb_ = nullptr;
222 }
223 for (const auto &item : cloudDbs_) {
224 if (iCloudDb == item.second) {
225 iCloudDb = nullptr;
226 }
227 waitForClose.push_back(item.second);
228 }
229 cloudDbs_.clear();
230 }
231 LOGD("[CloudDBProxy] call cloudDb close begin");
232 DBStatus status = OK;
233 if (iCloudDb != nullptr) {
234 status = iCloudDb->Close();
235 }
236 for (const auto &item : waitForClose) {
237 DBStatus ret = item->Close();
238 status = (status == OK ? ret : status);
239 }
240 if (status != OK) {
241 LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
242 }
243 waitForClose.clear();
244 LOGD("[CloudDBProxy] call cloudDb close end");
245 return status == OK ? E_OK : -E_CLOUD_ERROR;
246 }
247
HeartBeat()248 int CloudDBProxy::HeartBeat()
249 {
250 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
251 if (iCloudDb_ == nullptr) {
252 return -E_CLOUD_ERROR;
253 }
254
255 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
256 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
257 return InnerAction(context, cloudDb, InnerActionCode::HEARTBEAT);
258 }
259
IsNotExistCloudDB() const260 bool CloudDBProxy::IsNotExistCloudDB() const
261 {
262 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
263 return iCloudDb_ == nullptr && cloudDbs_.empty();
264 }
265
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)266 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
267 std::map<std::string, Assets> &assets)
268 {
269 if (assets.empty()) {
270 return E_OK;
271 }
272 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
273 if (iAssetLoader_ == nullptr) {
274 LOGE("Asset loader has not been set %d", -E_NOT_SET);
275 return -E_NOT_SET;
276 }
277 isDownloading_ = true;
278 DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
279 isDownloading_ = false;
280 if (status != OK) {
281 LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
282 if (status == SKIP_ASSET) {
283 return status;
284 }
285 }
286 return GetInnerErrorCode(status);
287 }
288
RemoveLocalAssets(const std::vector<Asset> & assets)289 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
290 {
291 if (assets.empty()) {
292 return E_OK;
293 }
294 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
295 if (iAssetLoader_ == nullptr) {
296 LOGW("Asset loader has not been set");
297 return E_OK;
298 }
299 DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
300 if (status != OK) {
301 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
302 return -E_REMOVE_ASSETS_FAILED;
303 }
304 return E_OK;
305 }
306
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)307 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
308 std::map<std::string, Assets> &assets)
309 {
310 if (assets.empty()) {
311 return E_OK;
312 }
313 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
314 if (iAssetLoader_ == nullptr) {
315 LOGE("Asset loader has not been set %d", -E_NOT_SET);
316 return -E_NOT_SET;
317 }
318 DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
319 if (status != OK) {
320 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
321 return -E_REMOVE_ASSETS_FAILED;
322 }
323 return E_OK;
324 }
325
GetEmptyCursor(const std::string & tableName)326 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
327 {
328 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
329 if (iCloudDb_ == nullptr) {
330 return { -E_CLOUD_ERROR, "" };
331 }
332 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
333 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
334 context->SetTableName(tableName);
335 int errCode = InnerAction(context, cloudDb, InnerActionCode::GET_EMPTY_CURSOR);
336 std::pair<int, std::string> cursorStatus;
337 context->MoveOutCursorStatus(cursorStatus);
338 cursorStatus.first = errCode;
339 return cursorStatus;
340 }
341
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)342 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
343 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
344 {
345 if (action >= InnerActionCode::INVALID_ACTION) {
346 return -E_INVALID_ARGS;
347 }
348 InnerActionTask(context, cloudDb, action);
349 return context->GetActionRes();
350 }
351
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)352 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
353 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
354 {
355 DBStatus status = OK;
356 std::vector<VBucket> record;
357 std::vector<VBucket> extend;
358 context->MoveOutRecordAndExtend(record, extend);
359 RecordSyncDataTimeStampLog(extend, action);
360 uint32_t recordSize = record.size();
361
362 switch (action) {
363 case InnerActionCode::INSERT: {
364 status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
365 context->MoveInExtend(extend);
366 context->SetInfo(CloudWaterType::INSERT, status, recordSize);
367 break;
368 }
369 case InnerActionCode::UPDATE: {
370 status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
371 context->MoveInExtend(extend);
372 context->SetInfo(CloudWaterType::UPDATE, status, recordSize);
373 break;
374 }
375 case InnerActionCode::DELETE: {
376 status = cloudDb->BatchDelete(context->GetTableName(), extend);
377 context->MoveInRecordAndExtend(record, extend);
378 context->SetInfo(CloudWaterType::DELETE, status, recordSize);
379 break;
380 }
381 default: {
382 LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
383 return INVALID_ARGS;
384 }
385 }
386 if (status == CLOUD_VERSION_CONFLICT) {
387 LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
388 } else if (status != OK) {
389 LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
390 }
391 return status;
392 }
393
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)394 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
395 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
396 {
397 DBStatus status = OK;
398 bool setResAlready = false;
399 LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
400 switch (action) {
401 case InnerActionCode::INSERT:
402 case InnerActionCode::UPDATE:
403 case InnerActionCode::DELETE:
404 status = DMLActionTask(context, cloudDb, action);
405 break;
406 case InnerActionCode::QUERY: {
407 status = QueryAction(context, cloudDb);
408 if (status == QUERY_END) {
409 setResAlready = true;
410 }
411 break;
412 }
413 case InnerActionCode::GET_EMPTY_CURSOR:
414 status = InnerActionGetEmptyCursor(context, cloudDb);
415 break;
416 case InnerActionCode::LOCK:
417 status = InnerActionLock(context, cloudDb);
418 break;
419 case InnerActionCode::UNLOCK:
420 status = cloudDb->UnLock();
421 if (status != OK) {
422 LOGE("[CloudDBProxy] UnLock cloud DB failed: %d", static_cast<int>(status));
423 }
424 break;
425 case InnerActionCode::HEARTBEAT:
426 status = cloudDb->HeartBeat();
427 if (status != OK) {
428 LOGE("[CloudDBProxy] Heart beat error: %d", static_cast<int>(status));
429 }
430 break;
431 default: // should not happen
432 status = DB_ERROR;
433 }
434 LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
435
436 if (!setResAlready) {
437 context->SetActionRes(GetInnerErrorCode(status));
438 }
439
440 context->FinishAndNotify();
441 }
442
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)443 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
444 const std::shared_ptr<ICloudDb> &cloudDb)
445 {
446 DBStatus status = OK;
447 std::pair<int, uint64_t> lockRet;
448 std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
449 if (lockStatus.first != OK) {
450 status = lockStatus.first;
451 LOGE("[CloudDBProxy] Lock cloud DB failed: %d", static_cast<int>(status));
452 } else if (lockStatus.second == 0) {
453 LOGE("[CloudDBProxy] Lock successfully but timeout is 0");
454 status = CLOUD_ERROR;
455 }
456 lockRet.second = lockStatus.second;
457 lockRet.first = GetInnerErrorCode(status);
458 context->MoveInLockStatus(lockRet);
459 return status;
460 }
461
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)462 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
463 const std::shared_ptr<ICloudDb> &cloudDb)
464 {
465 std::string tableName = context->GetTableName();
466 std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
467 DBStatus status = OK;
468 if (cursorStatus.first != OK) {
469 status = cursorStatus.first;
470 LOGE("[CloudDBProxy] Get empty cursor failed: %d", static_cast<int>(status));
471 }
472 std::pair<int, std::string> cursorRet;
473 cursorRet.second = cursorStatus.second;
474 cursorRet.first = GetInnerErrorCode(status);
475 context->MoveInCursorStatus(cursorRet);
476 return status;
477 }
478
GetInnerErrorCode(DBStatus status)479 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
480 {
481 if (status < DB_ERROR || status >= BUTT_STATUS) {
482 return static_cast<int>(status);
483 }
484 switch (status) {
485 case OK:
486 return E_OK;
487 case CLOUD_NETWORK_ERROR:
488 return -E_CLOUD_NETWORK_ERROR;
489 case CLOUD_SYNC_UNSET:
490 return -E_CLOUD_SYNC_UNSET;
491 case CLOUD_FULL_RECORDS:
492 return -E_CLOUD_FULL_RECORDS;
493 case CLOUD_LOCK_ERROR:
494 return -E_CLOUD_LOCK_ERROR;
495 case CLOUD_ASSET_SPACE_INSUFFICIENT:
496 return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
497 case CLOUD_VERSION_CONFLICT:
498 return -E_CLOUD_VERSION_CONFLICT;
499 case CLOUD_RECORD_EXIST_CONFLICT:
500 return -E_CLOUD_RECORD_EXIST_CONFLICT;
501 case CLOUD_DISABLED:
502 return -E_CLOUD_DISABLED;
503 default:
504 return -E_CLOUD_ERROR;
505 }
506 }
507
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)508 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
509 const std::shared_ptr<ICloudDb> &cloudDb)
510 {
511 VBucket queryExtend;
512 std::vector<VBucket> data;
513 context->MoveOutQueryExtendAndData(queryExtend, data);
514 DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
515 context->MoveInQueryExtendAndData(queryExtend, data);
516 if (status == QUERY_END) {
517 context->SetActionRes(-E_QUERY_END);
518 }
519 return status;
520 }
521
CloudActionContext()522 CloudDBProxy::CloudActionContext::CloudActionContext()
523 : actionFinished_(false),
524 actionRes_(OK),
525 totalCount_(0u),
526 successCount_(0u),
527 failedCount_(0u),
528 retryCount_(0u)
529 {
530 }
531
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)532 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
533 std::vector<VBucket> &extend)
534 {
535 std::lock_guard<std::mutex> autoLock(actionMutex_);
536 record_ = std::move(record);
537 extend_ = std::move(extend);
538 }
539
MoveInExtend(std::vector<VBucket> & extend)540 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
541 {
542 std::lock_guard<std::mutex> autoLock(actionMutex_);
543 extend_ = std::move(extend);
544 }
545
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)546 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
547 std::vector<VBucket> &extend)
548 {
549 std::lock_guard<std::mutex> autoLock(actionMutex_);
550 record = std::move(record_);
551 extend = std::move(extend_);
552 }
553
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)554 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
555 {
556 std::lock_guard<std::mutex> autoLock(actionMutex_);
557 queryExtend_ = std::move(extend);
558 data_ = std::move(data);
559 }
560
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)561 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
562 {
563 std::lock_guard<std::mutex> autoLock(actionMutex_);
564 extend = std::move(queryExtend_);
565 data = std::move(data_);
566 }
567
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)568 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
569 {
570 std::lock_guard<std::mutex> autoLock(actionMutex_);
571 lockStatus_ = std::move(lockStatus);
572 }
573
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)574 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
575 {
576 std::lock_guard<std::mutex> autoLock(actionMutex_);
577 lockStatus = std::move(lockStatus_);
578 }
579
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)580 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
581 {
582 std::lock_guard<std::mutex> autoLock(actionMutex_);
583 cursorStatus_ = std::move(cursorStatus);
584 }
585
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)586 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
587 {
588 std::lock_guard<std::mutex> autoLock(actionMutex_);
589 cursorStatus = std::move(cursorStatus_);
590 }
591
FinishAndNotify()592 void CloudDBProxy::CloudActionContext::FinishAndNotify()
593 {
594 {
595 std::lock_guard<std::mutex> autoLock(actionMutex_);
596 actionFinished_ = true;
597 }
598 actionCv_.notify_all();
599 }
600
SetActionRes(int res)601 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
602 {
603 std::lock_guard<std::mutex> autoLock(actionMutex_);
604 actionRes_ = res;
605 }
606
GetActionRes()607 int CloudDBProxy::CloudActionContext::GetActionRes()
608 {
609 std::lock_guard<std::mutex> autoLock(actionMutex_);
610 return actionRes_;
611 }
612
GetInfo()613 Info CloudDBProxy::CloudActionContext::GetInfo()
614 {
615 std::lock_guard<std::mutex> autoLock(actionMutex_);
616 Info info;
617 info.total = totalCount_;
618 info.successCount = successCount_;
619 info.failCount = failedCount_;
620 return info;
621 }
622
IsEmptyAssetId(const Assets & assets)623 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
624 {
625 for (auto &asset : assets) {
626 if (asset.assetId.empty()) {
627 return true;
628 }
629 }
630 return false;
631 }
632
IsRecordActionFail(const VBucket & extend,const CloudWaterType & type,DBStatus status)633 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, const CloudWaterType &type,
634 DBStatus status)
635 {
636 if (DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) ||
637 DBCommon::IsRecordIgnored(extend)) {
638 return false;
639 }
640 if (extend.count(CloudDbConstant::GID_FIELD) == 0 || DBCommon::IsRecordFailed(extend, status)) {
641 return true;
642 }
643 bool isInsert = type == CloudWaterType::INSERT;
644 auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
645 if (gid == nullptr || (isInsert && (*gid).empty())) {
646 return true;
647 }
648 for (auto &entry : extend) {
649 auto asset = std::get_if<Asset>(&entry.second);
650 if (asset != nullptr && (*asset).assetId.empty()) {
651 return true;
652 }
653 auto assets = std::get_if<Assets>(&entry.second);
654 if (assets != nullptr && IsEmptyAssetId(*assets)) {
655 return true;
656 }
657 }
658 return false;
659 }
660
SetInfo(const CloudWaterType & type,DBStatus status,uint32_t size)661 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size)
662 {
663 totalCount_ = size;
664 retryCount_ = 0; // reset retryCount in each batch
665
666 // totalCount_ should be equal to extend_ or batch data failed.
667 if (totalCount_ != extend_.size()) {
668 failedCount_ += totalCount_;
669 return;
670 }
671 for (auto &extend : extend_) {
672 if (DBCommon::IsRecordVersionConflict(extend)) {
673 retryCount_++;
674 } else if (IsRecordActionFail(extend, type, status)) {
675 failedCount_++;
676 } else {
677 successCount_++;
678 }
679 }
680 }
681
SetTableName(const std::string & tableName)682 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
683 {
684 std::lock_guard<std::mutex> autoLock(actionMutex_);
685 tableName_ = tableName;
686 }
687
GetTableName()688 std::string CloudDBProxy::CloudActionContext::GetTableName()
689 {
690 std::lock_guard<std::mutex> autoLock(actionMutex_);
691 return tableName_;
692 }
693
GetRetryCount()694 uint32_t CloudDBProxy::CloudActionContext::GetRetryCount()
695 {
696 std::lock_guard<std::mutex> autoLock(actionMutex_);
697 return retryCount_;
698 }
699
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)700 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
701 {
702 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
703 genVersionCallback_ = callback;
704 LOGI("[CloudDBProxy] Set generate cloud version callback ok");
705 }
706
IsExistCloudVersionCallback() const707 bool CloudDBProxy::IsExistCloudVersionCallback() const
708 {
709 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
710 return genVersionCallback_ != nullptr;
711 }
712
GetCloudVersion(const std::string & originVersion) const713 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
714 {
715 GenerateCloudVersionCallback genVersionCallback;
716 {
717 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
718 if (genVersionCallback_ == nullptr) {
719 return {-E_NOT_SUPPORT, ""};
720 }
721 genVersionCallback = genVersionCallback_;
722 }
723 LOGI("[CloudDBProxy] Begin get cloud version");
724 std::string version = genVersionCallback(originVersion);
725 LOGI("[CloudDBProxy] End get cloud version");
726 return {E_OK, version};
727 }
728
SetPrepareTraceId(const std::string & traceId) const729 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId) const
730 {
731 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
732 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
733 if (iCloudDb_ != nullptr) {
734 iCloudDb = iCloudDb_;
735 iCloudDb->SetPrepareTraceId(traceId);
736 }
737 }
738
BatchDownload(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & downloadAssets)739 int CloudDBProxy::BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets)
740 {
741 return BatchOperateAssetsWithAllRecords(tableName, downloadAssets, CloudDBProxy::BATCH_DOWNLOAD);
742 }
743
BatchRemoveLocalAssets(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & removeAssets)744 int CloudDBProxy::BatchRemoveLocalAssets(const std::string &tableName,
745 std::vector<IAssetLoader::AssetRecord> &removeAssets)
746 {
747 return BatchOperateAssetsWithAllRecords(tableName, removeAssets, CloudDBProxy::BATCH_REMOVE_LOCAL);
748 }
749
BatchOperateAssetsWithAllRecords(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & allRecords,const InnerBatchOpType operationType)750 int CloudDBProxy::BatchOperateAssetsWithAllRecords(const std::string &tableName,
751 std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType)
752 {
753 std::vector<IAssetLoader::AssetRecord> nonEmptyRecords;
754 auto indexes = GetNotEmptyAssetRecords(allRecords, nonEmptyRecords);
755 if (nonEmptyRecords.empty()) {
756 return E_OK;
757 }
758
759 int errCode = BatchOperateAssetsInner(tableName, nonEmptyRecords, operationType);
760
761 CopyAssetsBack(allRecords, indexes, nonEmptyRecords);
762 return errCode;
763 }
764
BatchOperateAssetsInner(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & necessaryRecords,const InnerBatchOpType operationType)765 int CloudDBProxy::BatchOperateAssetsInner(const std::string &tableName,
766 std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType)
767 {
768 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
769 if (iAssetLoader_ == nullptr) {
770 LOGE("[CloudDBProxy] Asset loader has not been set %d", -E_NOT_SET);
771 return -E_NOT_SET;
772 }
773 if (operationType == CloudDBProxy::BATCH_DOWNLOAD) {
774 isDownloading_ = true;
775 iAssetLoader_->BatchDownload(tableName, necessaryRecords);
776 isDownloading_ = false;
777 } else if (operationType == CloudDBProxy::BATCH_REMOVE_LOCAL) {
778 iAssetLoader_->BatchRemoveLocalAssets(tableName, necessaryRecords);
779 } else {
780 LOGE("[CloudDBProxy][BatchOperateAssetsInner] Internal error! Operation type is invalid: %d", operationType);
781 return -E_NOT_SET;
782 }
783 return E_OK;
784 }
785
GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> & originalRecords,std::vector<IAssetLoader::AssetRecord> & nonEmptyRecords)786 std::vector<int> CloudDBProxy::GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords,
787 std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords)
788 {
789 std::vector<int> indexes;
790 if (originalRecords.empty()) {
791 return indexes;
792 }
793
794 int index = 0;
795 for (auto &record : originalRecords) {
796 bool isEmpty = true;
797 for (const auto &recordAssets : record.assets) {
798 if (!recordAssets.second.empty()) {
799 isEmpty = false;
800 break;
801 }
802 }
803 if (!isEmpty) {
804 indexes.push_back(index);
805 IAssetLoader::AssetRecord newRecord = {
806 record.gid,
807 record.prefix,
808 std::move(record.assets)
809 };
810 nonEmptyRecords.emplace_back(newRecord);
811 }
812 index++;
813 }
814 return indexes;
815 }
816
CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> & originalRecords,const std::vector<int> & indexes,std::vector<IAssetLoader::AssetRecord> & newRecords)817 void CloudDBProxy::CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords,
818 const std::vector<int> &indexes, std::vector<IAssetLoader::AssetRecord> &newRecords)
819 {
820 int i = 0;
821 for (const auto index : indexes) {
822 originalRecords[index].status = newRecords[i].status;
823 originalRecords[index].assets = std::move(newRecords[i].assets);
824 i++;
825 }
826 }
827
CancelDownload()828 void CloudDBProxy::CancelDownload()
829 {
830 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
831 if (!isDownloading_) {
832 return;
833 }
834 if (iAssetLoader_ == nullptr) {
835 LOGE("[CloudDBProxy] Asset loader has not been set %d when cancel", -E_NOT_SET);
836 return;
837 }
838 DBStatus status = iAssetLoader_->CancelDownload();
839 if (status != OK) {
840 LOGW("[CloudDBProxy] cancel download failed %d", static_cast<int>(status));
841 }
842 }
843 }
844