• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "kv_store_errno.h"
21 #include "log_print.h"
22 
23 namespace DistributedDB {
CloudDBProxy()24 CloudDBProxy::CloudDBProxy()
25     : isDownloading_(false)
26 {
27 }
28 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)29 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
30 {
31     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
32     if (!iCloudDb_) {
33         iCloudDb_ = cloudDB;
34     }
35 }
36 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)37 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
38 {
39     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
40     auto it = std::find_if(cloudDBs.begin(), cloudDBs.end(), [](const auto &item) { return item.second == nullptr; });
41     if (it != cloudDBs.end()) {
42         LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", it->first.c_str());
43         return -E_INVALID_ARGS;
44     }
45     cloudDbs_ = cloudDBs;
46     return E_OK;
47 }
48 
GetCloudDB() const49 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
50 {
51     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
52     return cloudDbs_;
53 }
54 
SwitchCloudDB(const std::string & user)55 void CloudDBProxy::SwitchCloudDB(const std::string &user)
56 {
57     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
58     if (cloudDbs_.find(user) == cloudDbs_.end()) {
59         return;
60     }
61     iCloudDb_ = cloudDbs_[user];
62 }
63 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)64 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
65 {
66     std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
67     iAssetLoader_ = loader;
68 }
69 
RecordSyncDataTimeStampLog(std::vector<VBucket> & data,InnerActionCode action)70 void CloudDBProxy::RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action)
71 {
72     if (data.empty()) {
73         LOGI("[CloudDBProxy] sync data is empty");
74         return;
75     }
76 
77     int64_t first = 0;
78     int errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[0], first);
79     if (errCode != E_OK) {
80         LOGE("get first modify time for bucket failed, %d", errCode);
81         return;
82     }
83 
84     int64_t last = 0;
85     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, data[data.size() - 1],
86         last);
87     if (errCode != E_OK) {
88         LOGE("get last modify time for bucket failed, %d", errCode);
89         return;
90     }
91 
92     LOGI("[CloudDBProxy] sync action is %d and size is %d, sync data: first timestamp %lld, last timestamp %lld",
93         static_cast<int>(action), data.size(), first, last);
94 }
95 
FillErrorToExtend(int error,std::vector<VBucket> & extend)96 void CloudDBProxy::FillErrorToExtend(int error, std::vector<VBucket> &extend)
97 {
98     for (auto &item : extend) {
99         if (item.find(CloudDbConstant::ERROR_FIELD) == item.end()) {
100             item[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(TransferDBErrno(error));
101         }
102     }
103 }
104 
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)105 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
106     std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
107 {
108     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
109     if (iCloudDb_ == nullptr) {
110         FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
111         return -E_CLOUD_ERROR;
112     }
113     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
114     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
115     context->MoveInRecordAndExtend(record, extend);
116     context->SetTableName(tableName);
117     int errCode = InnerAction(context, cloudDb, InnerActionCode::INSERT);
118     uploadInfo = context->GetInfo();
119     retryCount = context->GetRetryCount();
120     context->MoveOutRecordAndExtend(record, extend);
121     return errCode;
122 }
123 
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)124 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
125     std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount)
126 {
127     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
128     if (iCloudDb_ == nullptr) {
129         FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
130         return -E_CLOUD_ERROR;
131     }
132     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
133     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
134     context->SetTableName(tableName);
135     context->MoveInRecordAndExtend(record, extend);
136     int errCode = InnerAction(context, cloudDb, InnerActionCode::UPDATE);
137     uploadInfo = context->GetInfo();
138     retryCount = context->GetRetryCount();
139     context->MoveOutRecordAndExtend(record, extend);
140     return errCode;
141 }
142 
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo,uint32_t & retryCount)143 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
144     Info &uploadInfo, uint32_t &retryCount)
145 {
146     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
147     if (iCloudDb_ == nullptr) {
148         FillErrorToExtend(static_cast<int>(-E_CLOUD_ERROR), extend);
149         return -E_CLOUD_ERROR;
150     }
151     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
152     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
153     context->MoveInRecordAndExtend(record, extend);
154     context->SetTableName(tableName);
155     int errCode = InnerAction(context, cloudDb, InnerActionCode::DELETE);
156     uploadInfo = context->GetInfo();
157     retryCount = context->GetRetryCount();
158     context->MoveOutRecordAndExtend(record, extend);
159     return errCode;
160 }
161 
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)162 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
163 {
164     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
165     if (iCloudDb_ == nullptr) {
166         return -E_CLOUD_ERROR;
167     }
168     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
169     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
170     context->MoveInQueryExtendAndData(extend, data);
171     context->SetTableName(tableName);
172     int errCode = InnerAction(context, cloudDb, InnerActionCode::QUERY);
173     context->MoveOutQueryExtendAndData(extend, data);
174     for (auto &item : data) {
175         for (auto &row : item) {
176             auto assets = std::get_if<Assets>(&row.second);
177             if (assets == nullptr) {
178                 continue;
179             }
180             DBCommon::RemoveDuplicateAssetsData(*assets);
181         }
182     }
183     RecordSyncDataTimeStampLog(data, InnerActionCode::QUERY);
184     return errCode;
185 }
186 
Lock()187 std::pair<int, uint64_t> CloudDBProxy::Lock()
188 {
189     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
190     if (iCloudDb_ == nullptr) {
191         return { -E_CLOUD_ERROR, 0u };
192     }
193     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
194     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
195     std::pair<int, uint64_t> lockStatus;
196     int errCode = InnerAction(context, cloudDb, InnerActionCode::LOCK);
197     context->MoveOutLockStatus(lockStatus);
198     lockStatus.first = errCode;
199     return lockStatus;
200 }
201 
UnLock()202 int CloudDBProxy::UnLock()
203 {
204     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
205     if (iCloudDb_ == nullptr) {
206         return -E_CLOUD_ERROR;
207     }
208     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
209     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
210     return InnerAction(context, cloudDb, InnerActionCode::UNLOCK);
211 }
212 
Close()213 int CloudDBProxy::Close()
214 {
215     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
216     std::vector<std::shared_ptr<ICloudDb>> waitForClose;
217     {
218         std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
219         if (iCloudDb_ != nullptr) {
220             iCloudDb = iCloudDb_;
221             iCloudDb_ = nullptr;
222         }
223         for (const auto &item : cloudDbs_) {
224             if (iCloudDb == item.second) {
225                 iCloudDb = nullptr;
226             }
227             waitForClose.push_back(item.second);
228         }
229         cloudDbs_.clear();
230     }
231     LOGD("[CloudDBProxy] call cloudDb close begin");
232     DBStatus status = OK;
233     if (iCloudDb != nullptr) {
234         status = iCloudDb->Close();
235     }
236     for (const auto &item : waitForClose) {
237         DBStatus ret = item->Close();
238         status = (status == OK ? ret : status);
239     }
240     if (status != OK) {
241         LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
242     }
243     waitForClose.clear();
244     LOGD("[CloudDBProxy] call cloudDb close end");
245     return status == OK ? E_OK : -E_CLOUD_ERROR;
246 }
247 
HeartBeat()248 int CloudDBProxy::HeartBeat()
249 {
250     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
251     if (iCloudDb_ == nullptr) {
252         return -E_CLOUD_ERROR;
253     }
254 
255     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
256     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
257     return InnerAction(context, cloudDb, InnerActionCode::HEARTBEAT);
258 }
259 
IsNotExistCloudDB() const260 bool CloudDBProxy::IsNotExistCloudDB() const
261 {
262     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
263     return iCloudDb_ == nullptr && cloudDbs_.empty();
264 }
265 
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)266 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
267     std::map<std::string, Assets> &assets)
268 {
269     if (assets.empty()) {
270         return E_OK;
271     }
272     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
273     if (iAssetLoader_ == nullptr) {
274         LOGE("Asset loader has not been set %d", -E_NOT_SET);
275         return -E_NOT_SET;
276     }
277     isDownloading_ = true;
278     DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
279     isDownloading_ = false;
280     if (status != OK) {
281         LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
282         if (status == SKIP_ASSET) {
283             return status;
284         }
285     }
286     return GetInnerErrorCode(status);
287 }
288 
RemoveLocalAssets(const std::vector<Asset> & assets)289 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
290 {
291     if (assets.empty()) {
292         return E_OK;
293     }
294     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
295     if (iAssetLoader_ == nullptr) {
296         LOGW("Asset loader has not been set");
297         return E_OK;
298     }
299     DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
300     if (status != OK) {
301         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
302         return -E_REMOVE_ASSETS_FAILED;
303     }
304     return E_OK;
305 }
306 
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)307 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
308     std::map<std::string, Assets> &assets)
309 {
310     if (assets.empty()) {
311         return E_OK;
312     }
313     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
314     if (iAssetLoader_ == nullptr) {
315         LOGE("Asset loader has not been set %d", -E_NOT_SET);
316         return -E_NOT_SET;
317     }
318     DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
319     if (status != OK) {
320         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
321         return -E_REMOVE_ASSETS_FAILED;
322     }
323     return E_OK;
324 }
325 
GetEmptyCursor(const std::string & tableName)326 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
327 {
328     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
329     if (iCloudDb_ == nullptr) {
330         return { -E_CLOUD_ERROR, "" };
331     }
332     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
333     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
334     context->SetTableName(tableName);
335     int errCode = InnerAction(context, cloudDb, InnerActionCode::GET_EMPTY_CURSOR);
336     std::pair<int, std::string> cursorStatus;
337     context->MoveOutCursorStatus(cursorStatus);
338     cursorStatus.first = errCode;
339     return cursorStatus;
340 }
341 
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)342 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
343     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
344 {
345     if (action >= InnerActionCode::INVALID_ACTION) {
346         return -E_INVALID_ARGS;
347     }
348     InnerActionTask(context, cloudDb, action);
349     return context->GetActionRes();
350 }
351 
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)352 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
353     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
354 {
355     DBStatus status = OK;
356     std::vector<VBucket> record;
357     std::vector<VBucket> extend;
358     context->MoveOutRecordAndExtend(record, extend);
359     RecordSyncDataTimeStampLog(extend, action);
360     uint32_t recordSize = record.size();
361 
362     switch (action) {
363         case InnerActionCode::INSERT: {
364             status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
365             context->MoveInExtend(extend);
366             context->SetInfo(CloudWaterType::INSERT, status, recordSize);
367             break;
368         }
369         case InnerActionCode::UPDATE: {
370             status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
371             context->MoveInExtend(extend);
372             context->SetInfo(CloudWaterType::UPDATE, status, recordSize);
373             break;
374         }
375         case InnerActionCode::DELETE: {
376             status = cloudDb->BatchDelete(context->GetTableName(), extend);
377             context->MoveInRecordAndExtend(record, extend);
378             context->SetInfo(CloudWaterType::DELETE, status, recordSize);
379             break;
380         }
381         default: {
382             LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
383             return INVALID_ARGS;
384         }
385     }
386     if (status == CLOUD_VERSION_CONFLICT) {
387         LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
388     } else if (status != OK) {
389         LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
390     }
391     return status;
392 }
393 
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)394 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
395     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
396 {
397     DBStatus status = OK;
398     bool setResAlready = false;
399     LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
400     switch (action) {
401         case InnerActionCode::INSERT:
402         case InnerActionCode::UPDATE:
403         case InnerActionCode::DELETE:
404             status = DMLActionTask(context, cloudDb, action);
405             break;
406         case InnerActionCode::QUERY: {
407             status = QueryAction(context, cloudDb);
408             if (status == QUERY_END) {
409                 setResAlready = true;
410             }
411             break;
412         }
413         case InnerActionCode::GET_EMPTY_CURSOR:
414             status = InnerActionGetEmptyCursor(context, cloudDb);
415             break;
416         case InnerActionCode::LOCK:
417             status = InnerActionLock(context, cloudDb);
418             break;
419         case InnerActionCode::UNLOCK:
420             status = cloudDb->UnLock();
421             if (status != OK) {
422                 LOGE("[CloudDBProxy] UnLock cloud DB failed: %d", static_cast<int>(status));
423             }
424             break;
425         case InnerActionCode::HEARTBEAT:
426             status = cloudDb->HeartBeat();
427             if (status != OK) {
428                 LOGE("[CloudDBProxy] Heart beat error: %d", static_cast<int>(status));
429             }
430             break;
431         default: // should not happen
432             status = DB_ERROR;
433     }
434     LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
435 
436     if (!setResAlready) {
437         context->SetActionRes(GetInnerErrorCode(status));
438     }
439 
440     context->FinishAndNotify();
441 }
442 
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)443 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
444     const std::shared_ptr<ICloudDb> &cloudDb)
445 {
446     DBStatus status = OK;
447     std::pair<int, uint64_t> lockRet;
448     std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
449     if (lockStatus.first != OK) {
450         status = lockStatus.first;
451         LOGE("[CloudDBProxy] Lock cloud DB failed: %d", static_cast<int>(status));
452     } else if (lockStatus.second == 0) {
453         LOGE("[CloudDBProxy] Lock successfully but timeout is 0");
454         status = CLOUD_ERROR;
455     }
456     lockRet.second = lockStatus.second;
457     lockRet.first = GetInnerErrorCode(status);
458     context->MoveInLockStatus(lockRet);
459     return status;
460 }
461 
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)462 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
463     const std::shared_ptr<ICloudDb> &cloudDb)
464 {
465     std::string tableName = context->GetTableName();
466     std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
467     DBStatus status = OK;
468     if (cursorStatus.first != OK) {
469         status = cursorStatus.first;
470         LOGE("[CloudDBProxy] Get empty cursor failed: %d", static_cast<int>(status));
471     }
472     std::pair<int, std::string> cursorRet;
473     cursorRet.second = cursorStatus.second;
474     cursorRet.first = GetInnerErrorCode(status);
475     context->MoveInCursorStatus(cursorRet);
476     return status;
477 }
478 
GetInnerErrorCode(DBStatus status)479 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
480 {
481     if (status < DB_ERROR || status >= BUTT_STATUS) {
482         return static_cast<int>(status);
483     }
484     switch (status) {
485         case OK:
486             return E_OK;
487         case CLOUD_NETWORK_ERROR:
488             return -E_CLOUD_NETWORK_ERROR;
489         case CLOUD_SYNC_UNSET:
490             return -E_CLOUD_SYNC_UNSET;
491         case CLOUD_FULL_RECORDS:
492             return -E_CLOUD_FULL_RECORDS;
493         case CLOUD_LOCK_ERROR:
494             return -E_CLOUD_LOCK_ERROR;
495         case CLOUD_ASSET_SPACE_INSUFFICIENT:
496             return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
497         case CLOUD_VERSION_CONFLICT:
498             return -E_CLOUD_VERSION_CONFLICT;
499         case CLOUD_RECORD_EXIST_CONFLICT:
500             return -E_CLOUD_RECORD_EXIST_CONFLICT;
501         case CLOUD_DISABLED:
502             return -E_CLOUD_DISABLED;
503         default:
504             return -E_CLOUD_ERROR;
505     }
506 }
507 
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)508 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
509     const std::shared_ptr<ICloudDb> &cloudDb)
510 {
511     VBucket queryExtend;
512     std::vector<VBucket> data;
513     context->MoveOutQueryExtendAndData(queryExtend, data);
514     DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
515     context->MoveInQueryExtendAndData(queryExtend, data);
516     if (status == QUERY_END) {
517         context->SetActionRes(-E_QUERY_END);
518     }
519     return status;
520 }
521 
CloudActionContext()522 CloudDBProxy::CloudActionContext::CloudActionContext()
523     : actionFinished_(false),
524       actionRes_(OK),
525       totalCount_(0u),
526       successCount_(0u),
527       failedCount_(0u),
528       retryCount_(0u)
529 {
530 }
531 
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)532 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
533     std::vector<VBucket> &extend)
534 {
535     std::lock_guard<std::mutex> autoLock(actionMutex_);
536     record_ = std::move(record);
537     extend_ = std::move(extend);
538 }
539 
MoveInExtend(std::vector<VBucket> & extend)540 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
541 {
542     std::lock_guard<std::mutex> autoLock(actionMutex_);
543     extend_ = std::move(extend);
544 }
545 
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)546 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
547     std::vector<VBucket> &extend)
548 {
549     std::lock_guard<std::mutex> autoLock(actionMutex_);
550     record = std::move(record_);
551     extend = std::move(extend_);
552 }
553 
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)554 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
555 {
556     std::lock_guard<std::mutex> autoLock(actionMutex_);
557     queryExtend_ = std::move(extend);
558     data_ = std::move(data);
559 }
560 
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)561 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
562 {
563     std::lock_guard<std::mutex> autoLock(actionMutex_);
564     extend = std::move(queryExtend_);
565     data = std::move(data_);
566 }
567 
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)568 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
569 {
570     std::lock_guard<std::mutex> autoLock(actionMutex_);
571     lockStatus_ = std::move(lockStatus);
572 }
573 
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)574 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
575 {
576     std::lock_guard<std::mutex> autoLock(actionMutex_);
577     lockStatus = std::move(lockStatus_);
578 }
579 
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)580 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
581 {
582     std::lock_guard<std::mutex> autoLock(actionMutex_);
583     cursorStatus_ = std::move(cursorStatus);
584 }
585 
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)586 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
587 {
588     std::lock_guard<std::mutex> autoLock(actionMutex_);
589     cursorStatus = std::move(cursorStatus_);
590 }
591 
FinishAndNotify()592 void CloudDBProxy::CloudActionContext::FinishAndNotify()
593 {
594     {
595         std::lock_guard<std::mutex> autoLock(actionMutex_);
596         actionFinished_ = true;
597     }
598     actionCv_.notify_all();
599 }
600 
SetActionRes(int res)601 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
602 {
603     std::lock_guard<std::mutex> autoLock(actionMutex_);
604     actionRes_ = res;
605 }
606 
GetActionRes()607 int CloudDBProxy::CloudActionContext::GetActionRes()
608 {
609     std::lock_guard<std::mutex> autoLock(actionMutex_);
610     return actionRes_;
611 }
612 
GetInfo()613 Info CloudDBProxy::CloudActionContext::GetInfo()
614 {
615     std::lock_guard<std::mutex> autoLock(actionMutex_);
616     Info info;
617     info.total = totalCount_;
618     info.successCount = successCount_;
619     info.failCount = failedCount_;
620     return info;
621 }
622 
IsEmptyAssetId(const Assets & assets)623 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
624 {
625     for (auto &asset : assets) {
626         if (asset.assetId.empty()) {
627             return true;
628         }
629     }
630     return false;
631 }
632 
IsRecordActionFail(const VBucket & extend,const CloudWaterType & type,DBStatus status)633 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, const CloudWaterType &type,
634     DBStatus status)
635 {
636     if (DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) ||
637         DBCommon::IsRecordIgnored(extend)) {
638         return false;
639     }
640     if (extend.count(CloudDbConstant::GID_FIELD) == 0 || DBCommon::IsRecordFailed(extend, status)) {
641         return true;
642     }
643     bool isInsert = type == CloudWaterType::INSERT;
644     auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
645     if (gid == nullptr || (isInsert && (*gid).empty())) {
646         return true;
647     }
648     for (auto &entry : extend) {
649         auto asset = std::get_if<Asset>(&entry.second);
650         if (asset != nullptr && (*asset).assetId.empty()) {
651             return true;
652         }
653         auto assets = std::get_if<Assets>(&entry.second);
654         if (assets != nullptr && IsEmptyAssetId(*assets)) {
655             return true;
656         }
657     }
658     return false;
659 }
660 
SetInfo(const CloudWaterType & type,DBStatus status,uint32_t size)661 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size)
662 {
663     totalCount_ = size;
664     retryCount_ = 0; // reset retryCount in each batch
665 
666     // totalCount_ should be equal to extend_ or batch data failed.
667     if (totalCount_ != extend_.size()) {
668         failedCount_ += totalCount_;
669         return;
670     }
671     for (auto &extend : extend_) {
672         if (DBCommon::IsRecordVersionConflict(extend)) {
673             retryCount_++;
674         } else if (IsRecordActionFail(extend, type, status)) {
675             failedCount_++;
676         } else {
677             successCount_++;
678         }
679     }
680 }
681 
SetTableName(const std::string & tableName)682 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
683 {
684     std::lock_guard<std::mutex> autoLock(actionMutex_);
685     tableName_ = tableName;
686 }
687 
GetTableName()688 std::string CloudDBProxy::CloudActionContext::GetTableName()
689 {
690     std::lock_guard<std::mutex> autoLock(actionMutex_);
691     return tableName_;
692 }
693 
GetRetryCount()694 uint32_t CloudDBProxy::CloudActionContext::GetRetryCount()
695 {
696     std::lock_guard<std::mutex> autoLock(actionMutex_);
697     return retryCount_;
698 }
699 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)700 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
701 {
702     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
703     genVersionCallback_ = callback;
704     LOGI("[CloudDBProxy] Set generate cloud version callback ok");
705 }
706 
IsExistCloudVersionCallback() const707 bool CloudDBProxy::IsExistCloudVersionCallback() const
708 {
709     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
710     return genVersionCallback_ != nullptr;
711 }
712 
GetCloudVersion(const std::string & originVersion) const713 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
714 {
715     GenerateCloudVersionCallback genVersionCallback;
716     {
717         std::lock_guard<std::mutex> autoLock(genVersionMutex_);
718         if (genVersionCallback_ == nullptr) {
719             return {-E_NOT_SUPPORT, ""};
720         }
721         genVersionCallback = genVersionCallback_;
722     }
723     LOGI("[CloudDBProxy] Begin get cloud version");
724     std::string version = genVersionCallback(originVersion);
725     LOGI("[CloudDBProxy] End get cloud version");
726     return {E_OK, version};
727 }
728 
SetPrepareTraceId(const std::string & traceId) const729 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId) const
730 {
731     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
732     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
733     if (iCloudDb_ != nullptr) {
734         iCloudDb = iCloudDb_;
735         iCloudDb->SetPrepareTraceId(traceId);
736     }
737 }
738 
BatchDownload(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & downloadAssets)739 int CloudDBProxy::BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets)
740 {
741     return BatchOperateAssetsWithAllRecords(tableName, downloadAssets, CloudDBProxy::BATCH_DOWNLOAD);
742 }
743 
BatchRemoveLocalAssets(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & removeAssets)744 int CloudDBProxy::BatchRemoveLocalAssets(const std::string &tableName,
745     std::vector<IAssetLoader::AssetRecord> &removeAssets)
746 {
747     return BatchOperateAssetsWithAllRecords(tableName, removeAssets, CloudDBProxy::BATCH_REMOVE_LOCAL);
748 }
749 
BatchOperateAssetsWithAllRecords(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & allRecords,const InnerBatchOpType operationType)750 int CloudDBProxy::BatchOperateAssetsWithAllRecords(const std::string &tableName,
751     std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType)
752 {
753     std::vector<IAssetLoader::AssetRecord> nonEmptyRecords;
754     auto indexes = GetNotEmptyAssetRecords(allRecords, nonEmptyRecords);
755     if (nonEmptyRecords.empty()) {
756         return E_OK;
757     }
758 
759     int errCode = BatchOperateAssetsInner(tableName, nonEmptyRecords, operationType);
760 
761     CopyAssetsBack(allRecords, indexes, nonEmptyRecords);
762     return errCode;
763 }
764 
BatchOperateAssetsInner(const std::string & tableName,std::vector<IAssetLoader::AssetRecord> & necessaryRecords,const InnerBatchOpType operationType)765 int CloudDBProxy::BatchOperateAssetsInner(const std::string &tableName,
766     std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType)
767 {
768     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
769     if (iAssetLoader_ == nullptr) {
770         LOGE("[CloudDBProxy] Asset loader has not been set %d", -E_NOT_SET);
771         return -E_NOT_SET;
772     }
773     if (operationType == CloudDBProxy::BATCH_DOWNLOAD) {
774         isDownloading_ = true;
775         iAssetLoader_->BatchDownload(tableName, necessaryRecords);
776         isDownloading_ = false;
777     } else if (operationType == CloudDBProxy::BATCH_REMOVE_LOCAL) {
778         iAssetLoader_->BatchRemoveLocalAssets(tableName, necessaryRecords);
779     } else {
780         LOGE("[CloudDBProxy][BatchOperateAssetsInner] Internal error! Operation type is invalid: %d", operationType);
781         return -E_NOT_SET;
782     }
783     return E_OK;
784 }
785 
GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> & originalRecords,std::vector<IAssetLoader::AssetRecord> & nonEmptyRecords)786 std::vector<int> CloudDBProxy::GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords,
787     std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords)
788 {
789     std::vector<int> indexes;
790     if (originalRecords.empty()) {
791         return indexes;
792     }
793 
794     int index = 0;
795     for (auto &record : originalRecords) {
796         bool isEmpty = true;
797         for (const auto &recordAssets : record.assets) {
798             if (!recordAssets.second.empty()) {
799                 isEmpty = false;
800                 break;
801             }
802         }
803         if (!isEmpty) {
804             indexes.push_back(index);
805             IAssetLoader::AssetRecord newRecord = {
806                 record.gid,
807                 record.prefix,
808                 std::move(record.assets)
809             };
810             nonEmptyRecords.emplace_back(newRecord);
811         }
812         index++;
813     }
814     return indexes;
815 }
816 
CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> & originalRecords,const std::vector<int> & indexes,std::vector<IAssetLoader::AssetRecord> & newRecords)817 void CloudDBProxy::CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords,
818     const std::vector<int> &indexes, std::vector<IAssetLoader::AssetRecord> &newRecords)
819 {
820     int i = 0;
821     for (const auto index : indexes) {
822         originalRecords[index].status = newRecords[i].status;
823         originalRecords[index].assets = std::move(newRecords[i].assets);
824         i++;
825     }
826 }
827 
CancelDownload()828 void CloudDBProxy::CancelDownload()
829 {
830     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
831     if (!isDownloading_) {
832         return;
833     }
834     if (iAssetLoader_ == nullptr) {
835         LOGE("[CloudDBProxy] Asset loader has not been set %d when cancel", -E_NOT_SET);
836         return;
837     }
838     DBStatus status = iAssetLoader_->CancelDownload();
839     if (status != OK) {
840         LOGW("[CloudDBProxy] cancel download failed %d", static_cast<int>(status));
841     }
842 }
843 }
844