• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 
22 namespace DistributedDB {
CloudDBProxy()23 CloudDBProxy::CloudDBProxy()
24     : isDownloading_(false)
25 {
26 }
27 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)28 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
29 {
30     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
31     if (!iCloudDb_) {
32         iCloudDb_ = cloudDB;
33     }
34 }
35 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)36 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
37 {
38     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
39     auto it = std::find_if(cloudDBs.begin(), cloudDBs.end(), [](const auto &item) { return item.second == nullptr; });
40     if (it != cloudDBs.end()) {
41         LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", it->first.c_str());
42         return -E_INVALID_ARGS;
43     }
44     cloudDbs_ = cloudDBs;
45     return E_OK;
46 }
47 
GetCloudDB() const48 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
49 {
50     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
51     return cloudDbs_;
52 }
53 
SwitchCloudDB(const std::string & user)54 void CloudDBProxy::SwitchCloudDB(const std::string &user)
55 {
56     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
57     if (cloudDbs_.find(user) == cloudDbs_.end()) {
58         return;
59     }
60     iCloudDb_ = cloudDbs_[user];
61 }
62 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)63 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
64 {
65     std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
66     iAssetLoader_ = loader;
67 }
68 
RecordSyncDataTimeStampLog(std::vector<VBucket> & data,InnerActionCode action)69 void CloudDBProxy::RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action)
70 {
71     if (data.empty()) {
72         LOGI("[CloudDBProxy] sync data is empty");
73         return;
74     }
75 
76     int64_t first = 0;
77     int errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[0], first);
78     if (errCode != E_OK) {
79         LOGE("get first modify time for bucket failed, %d", errCode);
80         return;
81     }
82 
83     int64_t last = 0;
84     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[data.size() - 1],
85         last);
86     if (errCode != E_OK) {
87         LOGE("get last modify time for bucket failed, %d", errCode);
88         return;
89     }
90 
91     LOGI("[CloudDBProxy] sync action is %d and size is %d, sync data: first timestamp %lld, last timestamp %lld",
92         static_cast<int>(action), data.size(), first, last);
93 }
94 
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)95 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
96     std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
97 {
98     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
99     if (iCloudDb_ == nullptr) {
100         return -E_CLOUD_ERROR;
101     }
102     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
103     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
104     context->MoveInRecordAndExtend(record, extend);
105     context->SetTableName(tableName);
106     int errCode = InnerAction(context, cloudDb, InnerActionCode::INSERT);
107     uploadInfo = context->GetInfo();
108     retryCount = context->GetRetryCount();
109     context->MoveOutRecordAndExtend(record, extend);
110     return errCode;
111 }
112 
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)113 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
114     std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
115 {
116     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
117     if (iCloudDb_ == nullptr) {
118         return -E_CLOUD_ERROR;
119     }
120     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
121     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
122     context->SetTableName(tableName);
123     context->MoveInRecordAndExtend(record, extend);
124     int errCode = InnerAction(context, cloudDb, InnerActionCode::UPDATE);
125     uploadInfo = context->GetInfo();
126     retryCount = context->GetRetryCount();
127     context->MoveOutRecordAndExtend(record, extend);
128     return errCode;
129 }
130 
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)131 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
132     Info &uploadInfo, uint32_t &retryCount)
133 {
134     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
135     if (iCloudDb_ == nullptr) {
136         return -E_CLOUD_ERROR;
137     }
138     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
139     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
140     context->MoveInRecordAndExtend(record, extend);
141     context->SetTableName(tableName);
142     int errCode = InnerAction(context, cloudDb, InnerActionCode::DELETE);
143     uploadInfo = context->GetInfo();
144     retryCount = context->GetRetryCount();
145     context->MoveOutRecordAndExtend(record, extend);
146     return errCode;
147 }
148 
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)149 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
150 {
151     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
152     if (iCloudDb_ == nullptr) {
153         return -E_CLOUD_ERROR;
154     }
155     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
156     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
157     context->MoveInQueryExtendAndData(extend, data);
158     context->SetTableName(tableName);
159     int errCode = InnerAction(context, cloudDb, InnerActionCode::QUERY);
160     context->MoveOutQueryExtendAndData(extend, data);
161     for (auto &item : data) {
162         for (auto &row : item) {
163             auto assets = std::get_if<Assets>(&row.second);
164             if (assets == nullptr) {
165                 continue;
166             }
167             DBCommon::RemoveDuplicateAssetsData(*assets);
168         }
169     }
170     RecordSyncDataTimeStampLog(data, InnerActionCode::QUERY);
171     return errCode;
172 }
173 
Lock()174 std::pair<int, uint64_t> CloudDBProxy::Lock()
175 {
176     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
177     if (iCloudDb_ == nullptr) {
178         return { -E_CLOUD_ERROR, 0u };
179     }
180     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
181     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
182     std::pair<int, uint64_t> lockStatus;
183     int errCode = InnerAction(context, cloudDb, InnerActionCode::LOCK);
184     context->MoveOutLockStatus(lockStatus);
185     lockStatus.first = errCode;
186     return lockStatus;
187 }
188 
UnLock()189 int CloudDBProxy::UnLock()
190 {
191     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
192     if (iCloudDb_ == nullptr) {
193         return -E_CLOUD_ERROR;
194     }
195     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
196     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
197     return InnerAction(context, cloudDb, InnerActionCode::UNLOCK);
198 }
199 
Close()200 int CloudDBProxy::Close()
201 {
202     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
203     std::vector<std::shared_ptr<ICloudDb>> waitForClose;
204     {
205         std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
206         if (iCloudDb_ != nullptr) {
207             iCloudDb = iCloudDb_;
208             iCloudDb_ = nullptr;
209         }
210         for (const auto &item : cloudDbs_) {
211             if (iCloudDb == item.second) {
212                 iCloudDb = nullptr;
213             }
214             waitForClose.push_back(item.second);
215         }
216         cloudDbs_.clear();
217     }
218     LOGD("[CloudDBProxy] call cloudDb close begin");
219     DBStatus status = OK;
220     if (iCloudDb != nullptr) {
221         status = iCloudDb->Close();
222     }
223     for (const auto &item : waitForClose) {
224         DBStatus ret = item->Close();
225         status = (status == OK ? ret : status);
226     }
227     if (status != OK) {
228         LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
229     }
230     waitForClose.clear();
231     LOGD("[CloudDBProxy] call cloudDb close end");
232     return status == OK ? E_OK : -E_CLOUD_ERROR;
233 }
234 
HeartBeat()235 int CloudDBProxy::HeartBeat()
236 {
237     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
238     if (iCloudDb_ == nullptr) {
239         return -E_CLOUD_ERROR;
240     }
241 
242     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
243     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
244     return InnerAction(context, cloudDb, InnerActionCode::HEARTBEAT);
245 }
246 
IsNotExistCloudDB() const247 bool CloudDBProxy::IsNotExistCloudDB() const
248 {
249     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
250     return iCloudDb_ == nullptr && cloudDbs_.empty();
251 }
252 
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)253 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
254     std::map<std::string, Assets> &assets)
255 {
256     if (assets.empty()) {
257         return E_OK;
258     }
259     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
260     if (iAssetLoader_ == nullptr) {
261         LOGE("Asset loader has not been set %d", -E_NOT_SET);
262         return -E_NOT_SET;
263     }
264     isDownloading_ = true;
265     DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
266     isDownloading_ = false;
267     if (status != OK) {
268         LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
269         if (status == SKIP_ASSET) {
270             return status;
271         }
272     }
273     return GetInnerErrorCode(status);
274 }
275 
RemoveLocalAssets(const std::vector<Asset> & assets)276 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
277 {
278     if (assets.empty()) {
279         return E_OK;
280     }
281     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
282     if (iAssetLoader_ == nullptr) {
283         LOGW("Asset loader has not been set");
284         return E_OK;
285     }
286     DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
287     if (status != OK) {
288         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
289         return -E_REMOVE_ASSETS_FAILED;
290     }
291     return E_OK;
292 }
293 
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)294 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
295     std::map<std::string, Assets> &assets)
296 {
297     if (assets.empty()) {
298         return E_OK;
299     }
300     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
301     if (iAssetLoader_ == nullptr) {
302         LOGE("Asset loader has not been set %d", -E_NOT_SET);
303         return -E_NOT_SET;
304     }
305     DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
306     if (status != OK) {
307         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
308         return -E_REMOVE_ASSETS_FAILED;
309     }
310     return E_OK;
311 }
312 
GetEmptyCursor(const std::string & tableName)313 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
314 {
315     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
316     if (iCloudDb_ == nullptr) {
317         return { -E_CLOUD_ERROR, "" };
318     }
319     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
320     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
321     context->SetTableName(tableName);
322     int errCode = InnerAction(context, cloudDb, InnerActionCode::GET_EMPTY_CURSOR);
323     std::pair<int, std::string> cursorStatus;
324     context->MoveOutCursorStatus(cursorStatus);
325     cursorStatus.first = errCode;
326     return cursorStatus;
327 }
328 
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)329 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
330     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
331 {
332     if (action >= InnerActionCode::INVALID_ACTION) {
333         return -E_INVALID_ARGS;
334     }
335     InnerActionTask(context, cloudDb, action);
336     return context->GetActionRes();
337 }
338 
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)339 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
340     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
341 {
342     DBStatus status = OK;
343     std::vector<VBucket> record;
344     std::vector<VBucket> extend;
345     context->MoveOutRecordAndExtend(record, extend);
346     uint32_t recordSize = record.size();
347 
348     switch (action) {
349         case InnerActionCode::INSERT: {
350             status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
351             context->MoveInExtend(extend);
352             context->SetInfo(CloudWaterType::INSERT, status, recordSize);
353             break;
354         }
355         case InnerActionCode::UPDATE: {
356             status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
357             context->MoveInExtend(extend);
358             context->SetInfo(CloudWaterType::UPDATE, status, recordSize);
359             break;
360         }
361         case InnerActionCode::DELETE: {
362             status = cloudDb->BatchDelete(context->GetTableName(), extend);
363             context->MoveInRecordAndExtend(record, extend);
364             context->SetInfo(CloudWaterType::DELETE, status, recordSize);
365             break;
366         }
367         default: {
368             LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
369             return INVALID_ARGS;
370         }
371     }
372     if (status == CLOUD_VERSION_CONFLICT) {
373         LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
374     } else if (status != OK) {
375         LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
376     }
377     return status;
378 }
379 
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)380 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
381     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
382 {
383     DBStatus status = OK;
384     bool setResAlready = false;
385     LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
386     switch (action) {
387         case InnerActionCode::INSERT:
388         case InnerActionCode::UPDATE:
389         case InnerActionCode::DELETE:
390             status = DMLActionTask(context, cloudDb, action);
391             break;
392         case InnerActionCode::QUERY: {
393             status = QueryAction(context, cloudDb);
394             if (status == QUERY_END) {
395                 setResAlready = true;
396             }
397             break;
398         }
399         case InnerActionCode::GET_EMPTY_CURSOR:
400             status = InnerActionGetEmptyCursor(context, cloudDb);
401             break;
402         case InnerActionCode::LOCK:
403             status = InnerActionLock(context, cloudDb);
404             break;
405         case InnerActionCode::UNLOCK:
406             status = cloudDb->UnLock();
407             if (status != OK) {
408                 LOGE("[CloudDBProxy] UnLock cloud DB failed: %d", static_cast<int>(status));
409             }
410             break;
411         case InnerActionCode::HEARTBEAT:
412             status = cloudDb->HeartBeat();
413             if (status != OK) {
414                 LOGE("[CloudDBProxy] Heart beat error: %d", static_cast<int>(status));
415             }
416             break;
417         default: // should not happen
418             status = DB_ERROR;
419     }
420     LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
421 
422     if (!setResAlready) {
423         context->SetActionRes(GetInnerErrorCode(status));
424     }
425 
426     context->FinishAndNotify();
427 }
428 
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)429 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
430     const std::shared_ptr<ICloudDb> &cloudDb)
431 {
432     DBStatus status = OK;
433     std::pair<int, uint64_t> lockRet;
434     std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
435     if (lockStatus.first != OK) {
436         status = lockStatus.first;
437         LOGE("[CloudDBProxy] Lock cloud DB failed: %d", static_cast<int>(status));
438     } else if (lockStatus.second == 0) {
439         LOGE("[CloudDBProxy] Lock successfully but timeout is 0");
440         status = CLOUD_ERROR;
441     }
442     lockRet.second = lockStatus.second;
443     lockRet.first = GetInnerErrorCode(status);
444     context->MoveInLockStatus(lockRet);
445     return status;
446 }
447 
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)448 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
449     const std::shared_ptr<ICloudDb> &cloudDb)
450 {
451     std::string tableName = context->GetTableName();
452     std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
453     DBStatus status = OK;
454     if (cursorStatus.first != OK) {
455         status = cursorStatus.first;
456         LOGE("[CloudDBProxy] Get empty cursor failed: %d", static_cast<int>(status));
457     }
458     std::pair<int, std::string> cursorRet;
459     cursorRet.second = cursorStatus.second;
460     cursorRet.first = GetInnerErrorCode(status);
461     context->MoveInCursorStatus(cursorRet);
462     return status;
463 }
464 
GetInnerErrorCode(DBStatus status)465 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
466 {
467     if (status < DB_ERROR || status >= BUTT_STATUS) {
468         return static_cast<int>(status);
469     }
470     switch (status) {
471         case OK:
472             return E_OK;
473         case CLOUD_NETWORK_ERROR:
474             return -E_CLOUD_NETWORK_ERROR;
475         case CLOUD_SYNC_UNSET:
476             return -E_CLOUD_SYNC_UNSET;
477         case CLOUD_FULL_RECORDS:
478             return -E_CLOUD_FULL_RECORDS;
479         case CLOUD_LOCK_ERROR:
480             return -E_CLOUD_LOCK_ERROR;
481         case CLOUD_ASSET_SPACE_INSUFFICIENT:
482             return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
483         case CLOUD_VERSION_CONFLICT:
484             return -E_CLOUD_VERSION_CONFLICT;
485         case CLOUD_RECORD_EXIST_CONFLICT:
486             return -E_CLOUD_RECORD_EXIST_CONFLICT;
487         case CLOUD_DISABLED:
488             return -E_CLOUD_DISABLED;
489         default:
490             return -E_CLOUD_ERROR;
491     }
492 }
493 
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)494 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
495     const std::shared_ptr<ICloudDb> &cloudDb)
496 {
497     VBucket queryExtend;
498     std::vector<VBucket> data;
499     context->MoveOutQueryExtendAndData(queryExtend, data);
500     DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
501     context->MoveInQueryExtendAndData(queryExtend, data);
502     if (status == QUERY_END) {
503         context->SetActionRes(-E_QUERY_END);
504     }
505     return status;
506 }
507 
CloudActionContext()508 CloudDBProxy::CloudActionContext::CloudActionContext()
509     : actionFinished_(false),
510       actionRes_(OK),
511       totalCount_(0u),
512       successCount_(0u),
513       failedCount_(0u),
514       retryCount_(0u)
515 {
516 }
517 
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)518 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
519     std::vector<VBucket> &extend)
520 {
521     std::lock_guard<std::mutex> autoLock(actionMutex_);
522     record_ = std::move(record);
523     extend_ = std::move(extend);
524 }
525 
MoveInExtend(std::vector<VBucket> & extend)526 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
527 {
528     std::lock_guard<std::mutex> autoLock(actionMutex_);
529     extend_ = std::move(extend);
530 }
531 
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)532 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
533     std::vector<VBucket> &extend)
534 {
535     std::lock_guard<std::mutex> autoLock(actionMutex_);
536     record = std::move(record_);
537     extend = std::move(extend_);
538 }
539 
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)540 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
541 {
542     std::lock_guard<std::mutex> autoLock(actionMutex_);
543     queryExtend_ = std::move(extend);
544     data_ = std::move(data);
545 }
546 
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)547 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
548 {
549     std::lock_guard<std::mutex> autoLock(actionMutex_);
550     extend = std::move(queryExtend_);
551     data = std::move(data_);
552 }
553 
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)554 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
555 {
556     std::lock_guard<std::mutex> autoLock(actionMutex_);
557     lockStatus_ = std::move(lockStatus);
558 }
559 
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)560 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
561 {
562     std::lock_guard<std::mutex> autoLock(actionMutex_);
563     lockStatus = std::move(lockStatus_);
564 }
565 
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)566 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
567 {
568     std::lock_guard<std::mutex> autoLock(actionMutex_);
569     cursorStatus_ = std::move(cursorStatus);
570 }
571 
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)572 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
573 {
574     std::lock_guard<std::mutex> autoLock(actionMutex_);
575     cursorStatus = std::move(cursorStatus_);
576 }
577 
FinishAndNotify()578 void CloudDBProxy::CloudActionContext::FinishAndNotify()
579 {
580     {
581         std::lock_guard<std::mutex> autoLock(actionMutex_);
582         actionFinished_ = true;
583     }
584     actionCv_.notify_all();
585 }
586 
SetActionRes(int res)587 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
588 {
589     std::lock_guard<std::mutex> autoLock(actionMutex_);
590     actionRes_ = res;
591 }
592 
GetActionRes()593 int CloudDBProxy::CloudActionContext::GetActionRes()
594 {
595     std::lock_guard<std::mutex> autoLock(actionMutex_);
596     return actionRes_;
597 }
598 
GetInfo()599 Info CloudDBProxy::CloudActionContext::GetInfo()
600 {
601     std::lock_guard<std::mutex> autoLock(actionMutex_);
602     Info info;
603     info.total = totalCount_;
604     info.successCount = successCount_;
605     info.failCount = failedCount_;
606     return info;
607 }
608 
IsEmptyAssetId(const Assets & assets)609 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
610 {
611     for (auto &asset : assets) {
612         if (asset.assetId.empty()) {
613             return true;
614         }
615     }
616     return false;
617 }
618 
IsRecordActionFail(const VBucket & extend,const CloudWaterType & type,DBStatus status)619 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, const CloudWaterType &type,
620     DBStatus status)
621 {
622     if (DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) ||
623         DBCommon::IsRecordIgnored(extend)) {
624         return false;
625     }
626     if (extend.count(CloudDbConstant::GID_FIELD) == 0 || DBCommon::IsRecordFailed(extend, status)) {
627         return true;
628     }
629     bool isInsert = type == CloudWaterType::INSERT;
630     auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
631     if (gid == nullptr || (isInsert && (*gid).empty())) {
632         return true;
633     }
634     for (auto &entry : extend) {
635         auto asset = std::get_if<Asset>(&entry.second);
636         if (asset != nullptr && (*asset).assetId.empty()) {
637             return true;
638         }
639         auto assets = std::get_if<Assets>(&entry.second);
640         if (assets != nullptr && IsEmptyAssetId(*assets)) {
641             return true;
642         }
643     }
644     return false;
645 }
646 
SetInfo(const CloudWaterType & type,DBStatus status,uint32_t size)647 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size)
648 {
649     totalCount_ = size;
650     retryCount_ = 0; // reset retryCount in each batch
651 
652     // totalCount_ should be equal to extend_ or batch data failed.
653     if (totalCount_ != extend_.size()) {
654         failedCount_ += totalCount_;
655         return;
656     }
657     for (auto &extend : extend_) {
658         if (DBCommon::IsRecordVersionConflict(extend)) {
659             retryCount_++;
660         } else if (IsRecordActionFail(extend, type, status)) {
661             failedCount_++;
662         } else {
663             successCount_++;
664         }
665     }
666 }
667 
SetTableName(const std::string & tableName)668 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
669 {
670     std::lock_guard<std::mutex> autoLock(actionMutex_);
671     tableName_ = tableName;
672 }
673 
GetTableName()674 std::string CloudDBProxy::CloudActionContext::GetTableName()
675 {
676     std::lock_guard<std::mutex> autoLock(actionMutex_);
677     return tableName_;
678 }
679 
GetRetryCount()680 uint32_t CloudDBProxy::CloudActionContext::GetRetryCount()
681 {
682     std::lock_guard<std::mutex> autoLock(actionMutex_);
683     return retryCount_;
684 }
685 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)686 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
687 {
688     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
689     genVersionCallback_ = callback;
690     LOGI("[CloudDBProxy] Set generate cloud version callback ok");
691 }
692 
IsExistCloudVersionCallback() const693 bool CloudDBProxy::IsExistCloudVersionCallback() const
694 {
695     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
696     return genVersionCallback_ != nullptr;
697 }
698 
GetCloudVersion(const std::string & originVersion) const699 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
700 {
701     GenerateCloudVersionCallback genVersionCallback;
702     {
703         std::lock_guard<std::mutex> autoLock(genVersionMutex_);
704         if (genVersionCallback_ == nullptr) {
705             return {-E_NOT_SUPPORT, ""};
706         }
707         genVersionCallback = genVersionCallback_;
708     }
709     LOGI("[CloudDBProxy] Begin get cloud version");
710     std::string version = genVersionCallback(originVersion);
711     LOGI("[CloudDBProxy] End get cloud version");
712     return {E_OK, version};
713 }
714 
SetPrepareTraceId(const std::string & traceId) const715 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId) const
716 {
717     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
718     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
719     if (iCloudDb_ != nullptr) {
720         iCloudDb = iCloudDb_;
721         iCloudDb->SetPrepareTraceId(traceId);
722     }
723 }
724 
BatchDownload(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & downloadAssets)725 int CloudDBProxy::BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets)
726 {
727     return BatchOperateAssetsWithAllRecords(tableName, downloadAssets, CloudDBProxy::BATCH_DOWNLOAD);
728 }
729 
BatchRemoveLocalAssets(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & removeAssets)730 int CloudDBProxy::BatchRemoveLocalAssets(const std::string &tableName,
731     std::vector<IAssetLoader::AssetRecord> &removeAssets)
732 {
733     return BatchOperateAssetsWithAllRecords(tableName, removeAssets, CloudDBProxy::BATCH_REMOVE_LOCAL);
734 }
735 
BatchOperateAssetsWithAllRecords(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & allRecords,const InnerBatchOpType operationType)736 int CloudDBProxy::BatchOperateAssetsWithAllRecords(const std::string &tableName,
737     std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType)
738 {
739     std::vector<IAssetLoader::AssetRecord> nonEmptyRecords;
740     auto indexes = GetNotEmptyAssetRecords(allRecords, nonEmptyRecords);
741     if (nonEmptyRecords.empty()) {
742         return E_OK;
743     }
744 
745     int errCode = BatchOperateAssetsInner(tableName, nonEmptyRecords, operationType);
746 
747     CopyAssetsBack(allRecords, indexes, nonEmptyRecords);
748     return errCode;
749 }
750 
BatchOperateAssetsInner(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & necessaryRecords,const InnerBatchOpType operationType)751 int CloudDBProxy::BatchOperateAssetsInner(const std::string &tableName,
752     std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType)
753 {
754     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
755     if (iAssetLoader_ == nullptr) {
756         LOGE("[CloudDBProxy] Asset loader has not been set %d", -E_NOT_SET);
757         return -E_NOT_SET;
758     }
759     if (operationType == CloudDBProxy::BATCH_DOWNLOAD) {
760         isDownloading_ = true;
761         iAssetLoader_->BatchDownload(tableName, necessaryRecords);
762         isDownloading_ = false;
763     } else if (operationType == CloudDBProxy::BATCH_REMOVE_LOCAL) {
764         iAssetLoader_->BatchRemoveLocalAssets(tableName, necessaryRecords);
765     } else {
766         LOGE("[CloudDBProxy][BatchOperateAssetsInner] Internal error! Operation type is invalid: %d", operationType);
767         return -E_NOT_SET;
768     }
769     return E_OK;
770 }
771 
GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> & originalRecords,std::vector<IAssetLoader::AssetRecord> & nonEmptyRecords)772 std::vector<int> CloudDBProxy::GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords,
773     std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords)
774 {
775     std::vector<int> indexes;
776     if (originalRecords.empty()) {
777         return indexes;
778     }
779 
780     int index = 0;
781     for (auto &record : originalRecords) {
782         bool isEmpty = true;
783         for (const auto &recordAssets : record.assets) {
784             if (!recordAssets.second.empty()) {
785                 isEmpty = false;
786                 break;
787             }
788         }
789         if (!isEmpty) {
790             indexes.push_back(index);
791             IAssetLoader::AssetRecord newRecord = {
792                 record.gid,
793                 record.prefix,
794                 std::move(record.assets)
795             };
796             nonEmptyRecords.emplace_back(newRecord);
797         }
798         index++;
799     }
800     return indexes;
801 }
802 
CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> & originalRecords,const std::vector<int> & indexes,std::vector<IAssetLoader::AssetRecord> & newRecords)803 void CloudDBProxy::CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords,
804     const std::vector<int> &indexes, std::vector<IAssetLoader::AssetRecord> &newRecords)
805 {
806     int i = 0;
807     for (const auto index : indexes) {
808         originalRecords[index].status = newRecords[i].status;
809         originalRecords[index].assets = std::move(newRecords[i].assets);
810         i++;
811     }
812 }
813 
CancelDownload()814 void CloudDBProxy::CancelDownload()
815 {
816     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
817     if (!isDownloading_) {
818         return;
819     }
820     if (iAssetLoader_ == nullptr) {
821         LOGE("[CloudDBProxy] Asset loader has not been set %d when cancel", -E_NOT_SET);
822         return;
823     }
824     DBStatus status = iAssetLoader_->CancelDownload();
825     if (status != OK) {
826         LOGW("[CloudDBProxy] cancel download failed %d", static_cast<int>(status));
827     }
828 }
829 }
830