1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21
22 namespace DistributedDB {
CloudDBProxy()23 CloudDBProxy::CloudDBProxy()
24 : isDownloading_(false)
25 {
26 }
27
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)28 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
29 {
30 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
31 if (!iCloudDb_) {
32 iCloudDb_ = cloudDB;
33 }
34 }
35
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)36 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
37 {
38 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
39 auto it = std::find_if(cloudDBs.begin(), cloudDBs.end(), [](const auto &item) { return item.second == nullptr; });
40 if (it != cloudDBs.end()) {
41 LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", it->first.c_str());
42 return -E_INVALID_ARGS;
43 }
44 cloudDbs_ = cloudDBs;
45 return E_OK;
46 }
47
GetCloudDB() const48 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
49 {
50 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
51 return cloudDbs_;
52 }
53
SwitchCloudDB(const std::string & user)54 void CloudDBProxy::SwitchCloudDB(const std::string &user)
55 {
56 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
57 if (cloudDbs_.find(user) == cloudDbs_.end()) {
58 return;
59 }
60 iCloudDb_ = cloudDbs_[user];
61 }
62
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)63 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
64 {
65 std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
66 iAssetLoader_ = loader;
67 }
68
RecordSyncDataTimeStampLog(std::vector<VBucket> & data,InnerActionCode action)69 void CloudDBProxy::RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action)
70 {
71 if (data.empty()) {
72 LOGI("[CloudDBProxy] sync data is empty");
73 return;
74 }
75
76 int64_t first = 0;
77 int errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[0], first);
78 if (errCode != E_OK) {
79 LOGE("get first modify time for bucket failed, %d", errCode);
80 return;
81 }
82
83 int64_t last = 0;
84 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[data.size() - 1],
85 last);
86 if (errCode != E_OK) {
87 LOGE("get last modify time for bucket failed, %d", errCode);
88 return;
89 }
90
91 LOGI("[CloudDBProxy] sync action is %d and size is %d, sync data: first timestamp %lld, last timestamp %lld",
92 static_cast<int>(action), data.size(), first, last);
93 }
94
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)95 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
96 std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
97 {
98 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
99 if (iCloudDb_ == nullptr) {
100 return -E_CLOUD_ERROR;
101 }
102 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
103 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
104 context->MoveInRecordAndExtend(record, extend);
105 context->SetTableName(tableName);
106 int errCode = InnerAction(context, cloudDb, InnerActionCode::INSERT);
107 uploadInfo = context->GetInfo();
108 retryCount = context->GetRetryCount();
109 context->MoveOutRecordAndExtend(record, extend);
110 return errCode;
111 }
112
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)113 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
114 std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
115 {
116 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
117 if (iCloudDb_ == nullptr) {
118 return -E_CLOUD_ERROR;
119 }
120 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
121 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
122 context->SetTableName(tableName);
123 context->MoveInRecordAndExtend(record, extend);
124 int errCode = InnerAction(context, cloudDb, InnerActionCode::UPDATE);
125 uploadInfo = context->GetInfo();
126 retryCount = context->GetRetryCount();
127 context->MoveOutRecordAndExtend(record, extend);
128 return errCode;
129 }
130
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)131 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
132 Info &uploadInfo, uint32_t &retryCount)
133 {
134 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
135 if (iCloudDb_ == nullptr) {
136 return -E_CLOUD_ERROR;
137 }
138 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
139 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
140 context->MoveInRecordAndExtend(record, extend);
141 context->SetTableName(tableName);
142 int errCode = InnerAction(context, cloudDb, InnerActionCode::DELETE);
143 uploadInfo = context->GetInfo();
144 retryCount = context->GetRetryCount();
145 context->MoveOutRecordAndExtend(record, extend);
146 return errCode;
147 }
148
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)149 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
150 {
151 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
152 if (iCloudDb_ == nullptr) {
153 return -E_CLOUD_ERROR;
154 }
155 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
156 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
157 context->MoveInQueryExtendAndData(extend, data);
158 context->SetTableName(tableName);
159 int errCode = InnerAction(context, cloudDb, InnerActionCode::QUERY);
160 context->MoveOutQueryExtendAndData(extend, data);
161 for (auto &item : data) {
162 for (auto &row : item) {
163 auto assets = std::get_if<Assets>(&row.second);
164 if (assets == nullptr) {
165 continue;
166 }
167 DBCommon::RemoveDuplicateAssetsData(*assets);
168 }
169 }
170 RecordSyncDataTimeStampLog(data, InnerActionCode::QUERY);
171 return errCode;
172 }
173
Lock()174 std::pair<int, uint64_t> CloudDBProxy::Lock()
175 {
176 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
177 if (iCloudDb_ == nullptr) {
178 return { -E_CLOUD_ERROR, 0u };
179 }
180 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
181 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
182 std::pair<int, uint64_t> lockStatus;
183 int errCode = InnerAction(context, cloudDb, InnerActionCode::LOCK);
184 context->MoveOutLockStatus(lockStatus);
185 lockStatus.first = errCode;
186 return lockStatus;
187 }
188
UnLock()189 int CloudDBProxy::UnLock()
190 {
191 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
192 if (iCloudDb_ == nullptr) {
193 return -E_CLOUD_ERROR;
194 }
195 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
196 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
197 return InnerAction(context, cloudDb, InnerActionCode::UNLOCK);
198 }
199
Close()200 int CloudDBProxy::Close()
201 {
202 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
203 std::vector<std::shared_ptr<ICloudDb>> waitForClose;
204 {
205 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
206 if (iCloudDb_ != nullptr) {
207 iCloudDb = iCloudDb_;
208 iCloudDb_ = nullptr;
209 }
210 for (const auto &item : cloudDbs_) {
211 if (iCloudDb == item.second) {
212 iCloudDb = nullptr;
213 }
214 waitForClose.push_back(item.second);
215 }
216 cloudDbs_.clear();
217 }
218 LOGD("[CloudDBProxy] call cloudDb close begin");
219 DBStatus status = OK;
220 if (iCloudDb != nullptr) {
221 status = iCloudDb->Close();
222 }
223 for (const auto &item : waitForClose) {
224 DBStatus ret = item->Close();
225 status = (status == OK ? ret : status);
226 }
227 if (status != OK) {
228 LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
229 }
230 waitForClose.clear();
231 LOGD("[CloudDBProxy] call cloudDb close end");
232 return status == OK ? E_OK : -E_CLOUD_ERROR;
233 }
234
HeartBeat()235 int CloudDBProxy::HeartBeat()
236 {
237 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
238 if (iCloudDb_ == nullptr) {
239 return -E_CLOUD_ERROR;
240 }
241
242 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
243 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
244 return InnerAction(context, cloudDb, InnerActionCode::HEARTBEAT);
245 }
246
IsNotExistCloudDB() const247 bool CloudDBProxy::IsNotExistCloudDB() const
248 {
249 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
250 return iCloudDb_ == nullptr && cloudDbs_.empty();
251 }
252
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)253 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
254 std::map<std::string, Assets> &assets)
255 {
256 if (assets.empty()) {
257 return E_OK;
258 }
259 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
260 if (iAssetLoader_ == nullptr) {
261 LOGE("Asset loader has not been set %d", -E_NOT_SET);
262 return -E_NOT_SET;
263 }
264 isDownloading_ = true;
265 DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
266 isDownloading_ = false;
267 if (status != OK) {
268 LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
269 if (status == SKIP_ASSET) {
270 return status;
271 }
272 }
273 return GetInnerErrorCode(status);
274 }
275
RemoveLocalAssets(const std::vector<Asset> & assets)276 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
277 {
278 if (assets.empty()) {
279 return E_OK;
280 }
281 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
282 if (iAssetLoader_ == nullptr) {
283 LOGW("Asset loader has not been set");
284 return E_OK;
285 }
286 DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
287 if (status != OK) {
288 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
289 return -E_REMOVE_ASSETS_FAILED;
290 }
291 return E_OK;
292 }
293
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)294 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
295 std::map<std::string, Assets> &assets)
296 {
297 if (assets.empty()) {
298 return E_OK;
299 }
300 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
301 if (iAssetLoader_ == nullptr) {
302 LOGE("Asset loader has not been set %d", -E_NOT_SET);
303 return -E_NOT_SET;
304 }
305 DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
306 if (status != OK) {
307 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
308 return -E_REMOVE_ASSETS_FAILED;
309 }
310 return E_OK;
311 }
312
GetEmptyCursor(const std::string & tableName)313 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
314 {
315 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
316 if (iCloudDb_ == nullptr) {
317 return { -E_CLOUD_ERROR, "" };
318 }
319 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
320 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
321 context->SetTableName(tableName);
322 int errCode = InnerAction(context, cloudDb, InnerActionCode::GET_EMPTY_CURSOR);
323 std::pair<int, std::string> cursorStatus;
324 context->MoveOutCursorStatus(cursorStatus);
325 cursorStatus.first = errCode;
326 return cursorStatus;
327 }
328
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)329 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
330 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
331 {
332 if (action >= InnerActionCode::INVALID_ACTION) {
333 return -E_INVALID_ARGS;
334 }
335 InnerActionTask(context, cloudDb, action);
336 return context->GetActionRes();
337 }
338
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)339 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
340 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
341 {
342 DBStatus status = OK;
343 std::vector<VBucket> record;
344 std::vector<VBucket> extend;
345 context->MoveOutRecordAndExtend(record, extend);
346 uint32_t recordSize = record.size();
347
348 switch (action) {
349 case InnerActionCode::INSERT: {
350 status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
351 context->MoveInExtend(extend);
352 context->SetInfo(CloudWaterType::INSERT, status, recordSize);
353 break;
354 }
355 case InnerActionCode::UPDATE: {
356 status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
357 context->MoveInExtend(extend);
358 context->SetInfo(CloudWaterType::UPDATE, status, recordSize);
359 break;
360 }
361 case InnerActionCode::DELETE: {
362 status = cloudDb->BatchDelete(context->GetTableName(), extend);
363 context->MoveInRecordAndExtend(record, extend);
364 context->SetInfo(CloudWaterType::DELETE, status, recordSize);
365 break;
366 }
367 default: {
368 LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
369 return INVALID_ARGS;
370 }
371 }
372 if (status == CLOUD_VERSION_CONFLICT) {
373 LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
374 } else if (status != OK) {
375 LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
376 }
377 return status;
378 }
379
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)380 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
381 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
382 {
383 DBStatus status = OK;
384 bool setResAlready = false;
385 LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
386 switch (action) {
387 case InnerActionCode::INSERT:
388 case InnerActionCode::UPDATE:
389 case InnerActionCode::DELETE:
390 status = DMLActionTask(context, cloudDb, action);
391 break;
392 case InnerActionCode::QUERY: {
393 status = QueryAction(context, cloudDb);
394 if (status == QUERY_END) {
395 setResAlready = true;
396 }
397 break;
398 }
399 case InnerActionCode::GET_EMPTY_CURSOR:
400 status = InnerActionGetEmptyCursor(context, cloudDb);
401 break;
402 case InnerActionCode::LOCK:
403 status = InnerActionLock(context, cloudDb);
404 break;
405 case InnerActionCode::UNLOCK:
406 status = cloudDb->UnLock();
407 if (status != OK) {
408 LOGE("[CloudDBProxy] UnLock cloud DB failed: %d", static_cast<int>(status));
409 }
410 break;
411 case InnerActionCode::HEARTBEAT:
412 status = cloudDb->HeartBeat();
413 if (status != OK) {
414 LOGE("[CloudDBProxy] Heart beat error: %d", static_cast<int>(status));
415 }
416 break;
417 default: // should not happen
418 status = DB_ERROR;
419 }
420 LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
421
422 if (!setResAlready) {
423 context->SetActionRes(GetInnerErrorCode(status));
424 }
425
426 context->FinishAndNotify();
427 }
428
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)429 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
430 const std::shared_ptr<ICloudDb> &cloudDb)
431 {
432 DBStatus status = OK;
433 std::pair<int, uint64_t> lockRet;
434 std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
435 if (lockStatus.first != OK) {
436 status = lockStatus.first;
437 LOGE("[CloudDBProxy] Lock cloud DB failed: %d", static_cast<int>(status));
438 } else if (lockStatus.second == 0) {
439 LOGE("[CloudDBProxy] Lock successfully but timeout is 0");
440 status = CLOUD_ERROR;
441 }
442 lockRet.second = lockStatus.second;
443 lockRet.first = GetInnerErrorCode(status);
444 context->MoveInLockStatus(lockRet);
445 return status;
446 }
447
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)448 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
449 const std::shared_ptr<ICloudDb> &cloudDb)
450 {
451 std::string tableName = context->GetTableName();
452 std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
453 DBStatus status = OK;
454 if (cursorStatus.first != OK) {
455 status = cursorStatus.first;
456 LOGE("[CloudDBProxy] Get empty cursor failed: %d", static_cast<int>(status));
457 }
458 std::pair<int, std::string> cursorRet;
459 cursorRet.second = cursorStatus.second;
460 cursorRet.first = GetInnerErrorCode(status);
461 context->MoveInCursorStatus(cursorRet);
462 return status;
463 }
464
GetInnerErrorCode(DBStatus status)465 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
466 {
467 if (status < DB_ERROR || status >= BUTT_STATUS) {
468 return static_cast<int>(status);
469 }
470 switch (status) {
471 case OK:
472 return E_OK;
473 case CLOUD_NETWORK_ERROR:
474 return -E_CLOUD_NETWORK_ERROR;
475 case CLOUD_SYNC_UNSET:
476 return -E_CLOUD_SYNC_UNSET;
477 case CLOUD_FULL_RECORDS:
478 return -E_CLOUD_FULL_RECORDS;
479 case CLOUD_LOCK_ERROR:
480 return -E_CLOUD_LOCK_ERROR;
481 case CLOUD_ASSET_SPACE_INSUFFICIENT:
482 return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
483 case CLOUD_VERSION_CONFLICT:
484 return -E_CLOUD_VERSION_CONFLICT;
485 case CLOUD_RECORD_EXIST_CONFLICT:
486 return -E_CLOUD_RECORD_EXIST_CONFLICT;
487 case CLOUD_DISABLED:
488 return -E_CLOUD_DISABLED;
489 default:
490 return -E_CLOUD_ERROR;
491 }
492 }
493
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)494 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
495 const std::shared_ptr<ICloudDb> &cloudDb)
496 {
497 VBucket queryExtend;
498 std::vector<VBucket> data;
499 context->MoveOutQueryExtendAndData(queryExtend, data);
500 DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
501 context->MoveInQueryExtendAndData(queryExtend, data);
502 if (status == QUERY_END) {
503 context->SetActionRes(-E_QUERY_END);
504 }
505 return status;
506 }
507
CloudActionContext()508 CloudDBProxy::CloudActionContext::CloudActionContext()
509 : actionFinished_(false),
510 actionRes_(OK),
511 totalCount_(0u),
512 successCount_(0u),
513 failedCount_(0u),
514 retryCount_(0u)
515 {
516 }
517
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)518 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
519 std::vector<VBucket> &extend)
520 {
521 std::lock_guard<std::mutex> autoLock(actionMutex_);
522 record_ = std::move(record);
523 extend_ = std::move(extend);
524 }
525
MoveInExtend(std::vector<VBucket> & extend)526 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
527 {
528 std::lock_guard<std::mutex> autoLock(actionMutex_);
529 extend_ = std::move(extend);
530 }
531
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)532 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
533 std::vector<VBucket> &extend)
534 {
535 std::lock_guard<std::mutex> autoLock(actionMutex_);
536 record = std::move(record_);
537 extend = std::move(extend_);
538 }
539
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)540 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
541 {
542 std::lock_guard<std::mutex> autoLock(actionMutex_);
543 queryExtend_ = std::move(extend);
544 data_ = std::move(data);
545 }
546
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)547 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
548 {
549 std::lock_guard<std::mutex> autoLock(actionMutex_);
550 extend = std::move(queryExtend_);
551 data = std::move(data_);
552 }
553
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)554 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
555 {
556 std::lock_guard<std::mutex> autoLock(actionMutex_);
557 lockStatus_ = std::move(lockStatus);
558 }
559
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)560 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
561 {
562 std::lock_guard<std::mutex> autoLock(actionMutex_);
563 lockStatus = std::move(lockStatus_);
564 }
565
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)566 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
567 {
568 std::lock_guard<std::mutex> autoLock(actionMutex_);
569 cursorStatus_ = std::move(cursorStatus);
570 }
571
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)572 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
573 {
574 std::lock_guard<std::mutex> autoLock(actionMutex_);
575 cursorStatus = std::move(cursorStatus_);
576 }
577
FinishAndNotify()578 void CloudDBProxy::CloudActionContext::FinishAndNotify()
579 {
580 {
581 std::lock_guard<std::mutex> autoLock(actionMutex_);
582 actionFinished_ = true;
583 }
584 actionCv_.notify_all();
585 }
586
SetActionRes(int res)587 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
588 {
589 std::lock_guard<std::mutex> autoLock(actionMutex_);
590 actionRes_ = res;
591 }
592
GetActionRes()593 int CloudDBProxy::CloudActionContext::GetActionRes()
594 {
595 std::lock_guard<std::mutex> autoLock(actionMutex_);
596 return actionRes_;
597 }
598
GetInfo()599 Info CloudDBProxy::CloudActionContext::GetInfo()
600 {
601 std::lock_guard<std::mutex> autoLock(actionMutex_);
602 Info info;
603 info.total = totalCount_;
604 info.successCount = successCount_;
605 info.failCount = failedCount_;
606 return info;
607 }
608
IsEmptyAssetId(const Assets & assets)609 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
610 {
611 for (auto &asset : assets) {
612 if (asset.assetId.empty()) {
613 return true;
614 }
615 }
616 return false;
617 }
618
IsRecordActionFail(const VBucket & extend,const CloudWaterType & type,DBStatus status)619 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, const CloudWaterType &type,
620 DBStatus status)
621 {
622 if (DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) ||
623 DBCommon::IsRecordIgnored(extend)) {
624 return false;
625 }
626 if (extend.count(CloudDbConstant::GID_FIELD) == 0 || DBCommon::IsRecordFailed(extend, status)) {
627 return true;
628 }
629 bool isInsert = type == CloudWaterType::INSERT;
630 auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
631 if (gid == nullptr || (isInsert && (*gid).empty())) {
632 return true;
633 }
634 for (auto &entry : extend) {
635 auto asset = std::get_if<Asset>(&entry.second);
636 if (asset != nullptr && (*asset).assetId.empty()) {
637 return true;
638 }
639 auto assets = std::get_if<Assets>(&entry.second);
640 if (assets != nullptr && IsEmptyAssetId(*assets)) {
641 return true;
642 }
643 }
644 return false;
645 }
646
SetInfo(const CloudWaterType & type,DBStatus status,uint32_t size)647 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size)
648 {
649 totalCount_ = size;
650 retryCount_ = 0; // reset retryCount in each batch
651
652 // totalCount_ should be equal to extend_ or batch data failed.
653 if (totalCount_ != extend_.size()) {
654 failedCount_ += totalCount_;
655 return;
656 }
657 for (auto &extend : extend_) {
658 if (DBCommon::IsRecordVersionConflict(extend)) {
659 retryCount_++;
660 } else if (IsRecordActionFail(extend, type, status)) {
661 failedCount_++;
662 } else {
663 successCount_++;
664 }
665 }
666 }
667
SetTableName(const std::string & tableName)668 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
669 {
670 std::lock_guard<std::mutex> autoLock(actionMutex_);
671 tableName_ = tableName;
672 }
673
GetTableName()674 std::string CloudDBProxy::CloudActionContext::GetTableName()
675 {
676 std::lock_guard<std::mutex> autoLock(actionMutex_);
677 return tableName_;
678 }
679
GetRetryCount()680 uint32_t CloudDBProxy::CloudActionContext::GetRetryCount()
681 {
682 std::lock_guard<std::mutex> autoLock(actionMutex_);
683 return retryCount_;
684 }
685
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)686 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
687 {
688 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
689 genVersionCallback_ = callback;
690 LOGI("[CloudDBProxy] Set generate cloud version callback ok");
691 }
692
IsExistCloudVersionCallback() const693 bool CloudDBProxy::IsExistCloudVersionCallback() const
694 {
695 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
696 return genVersionCallback_ != nullptr;
697 }
698
GetCloudVersion(const std::string & originVersion) const699 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
700 {
701 GenerateCloudVersionCallback genVersionCallback;
702 {
703 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
704 if (genVersionCallback_ == nullptr) {
705 return {-E_NOT_SUPPORT, ""};
706 }
707 genVersionCallback = genVersionCallback_;
708 }
709 LOGI("[CloudDBProxy] Begin get cloud version");
710 std::string version = genVersionCallback(originVersion);
711 LOGI("[CloudDBProxy] End get cloud version");
712 return {E_OK, version};
713 }
714
SetPrepareTraceId(const std::string & traceId) const715 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId) const
716 {
717 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
718 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
719 if (iCloudDb_ != nullptr) {
720 iCloudDb = iCloudDb_;
721 iCloudDb->SetPrepareTraceId(traceId);
722 }
723 }
724
BatchDownload(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & downloadAssets)725 int CloudDBProxy::BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets)
726 {
727 return BatchOperateAssetsWithAllRecords(tableName, downloadAssets, CloudDBProxy::BATCH_DOWNLOAD);
728 }
729
BatchRemoveLocalAssets(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & removeAssets)730 int CloudDBProxy::BatchRemoveLocalAssets(const std::string &tableName,
731 std::vector<IAssetLoader::AssetRecord> &removeAssets)
732 {
733 return BatchOperateAssetsWithAllRecords(tableName, removeAssets, CloudDBProxy::BATCH_REMOVE_LOCAL);
734 }
735
BatchOperateAssetsWithAllRecords(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & allRecords,const InnerBatchOpType operationType)736 int CloudDBProxy::BatchOperateAssetsWithAllRecords(const std::string &tableName,
737 std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType)
738 {
739 std::vector<IAssetLoader::AssetRecord> nonEmptyRecords;
740 auto indexes = GetNotEmptyAssetRecords(allRecords, nonEmptyRecords);
741 if (nonEmptyRecords.empty()) {
742 return E_OK;
743 }
744
745 int errCode = BatchOperateAssetsInner(tableName, nonEmptyRecords, operationType);
746
747 CopyAssetsBack(allRecords, indexes, nonEmptyRecords);
748 return errCode;
749 }
750
BatchOperateAssetsInner(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & necessaryRecords,const InnerBatchOpType operationType)751 int CloudDBProxy::BatchOperateAssetsInner(const std::string &tableName,
752 std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType)
753 {
754 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
755 if (iAssetLoader_ == nullptr) {
756 LOGE("[CloudDBProxy] Asset loader has not been set %d", -E_NOT_SET);
757 return -E_NOT_SET;
758 }
759 if (operationType == CloudDBProxy::BATCH_DOWNLOAD) {
760 isDownloading_ = true;
761 iAssetLoader_->BatchDownload(tableName, necessaryRecords);
762 isDownloading_ = false;
763 } else if (operationType == CloudDBProxy::BATCH_REMOVE_LOCAL) {
764 iAssetLoader_->BatchRemoveLocalAssets(tableName, necessaryRecords);
765 } else {
766 LOGE("[CloudDBProxy][BatchOperateAssetsInner] Internal error! Operation type is invalid: %d", operationType);
767 return -E_NOT_SET;
768 }
769 return E_OK;
770 }
771
GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> & originalRecords,std::vector<IAssetLoader::AssetRecord> & nonEmptyRecords)772 std::vector<int> CloudDBProxy::GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords,
773 std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords)
774 {
775 std::vector<int> indexes;
776 if (originalRecords.empty()) {
777 return indexes;
778 }
779
780 int index = 0;
781 for (auto &record : originalRecords) {
782 bool isEmpty = true;
783 for (const auto &recordAssets : record.assets) {
784 if (!recordAssets.second.empty()) {
785 isEmpty = false;
786 break;
787 }
788 }
789 if (!isEmpty) {
790 indexes.push_back(index);
791 IAssetLoader::AssetRecord newRecord = {
792 record.gid,
793 record.prefix,
794 std::move(record.assets)
795 };
796 nonEmptyRecords.emplace_back(newRecord);
797 }
798 index++;
799 }
800 return indexes;
801 }
802
CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> & originalRecords,const std::vector<int> & indexes,std::vector<IAssetLoader::AssetRecord> & newRecords)803 void CloudDBProxy::CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords,
804 const std::vector<int> &indexes, std::vector<IAssetLoader::AssetRecord> &newRecords)
805 {
806 int i = 0;
807 for (const auto index : indexes) {
808 originalRecords[index].status = newRecords[i].status;
809 originalRecords[index].assets = std::move(newRecords[i].assets);
810 i++;
811 }
812 }
813
CancelDownload()814 void CloudDBProxy::CancelDownload()
815 {
816 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
817 if (!isDownloading_) {
818 return;
819 }
820 if (iAssetLoader_ == nullptr) {
821 LOGE("[CloudDBProxy] Asset loader has not been set %d when cancel", -E_NOT_SET);
822 return;
823 }
824 DBStatus status = iAssetLoader_->CancelDownload();
825 if (status != OK) {
826 LOGW("[CloudDBProxy] cancel download failed %d", static_cast<int>(status));
827 }
828 }
829 }
830