• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_db_proxy.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18 #include "runtime_context.h"
19 
20 namespace DistributedDB {
CloudDBProxy()21 CloudDBProxy::CloudDBProxy()
22     : timeout_(0),
23       asyncTaskCount_(0)
24 {
25 }
26 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)27 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
28 {
29     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
30     if (!iCloudDb_) {
31         iCloudDb_ = cloudDB;
32     }
33 }
34 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)35 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
36 {
37     std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
38     iAssetLoader_ = loader;
39 }
40 
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)41 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
42     std::vector<VBucket> &extend, Info &uploadInfo)
43 {
44     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
45     if (iCloudDb_ == nullptr) {
46         return -E_CLOUD_ERROR;
47     }
48     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
49     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
50     context->MoveInRecordAndExtend(record, extend);
51     context->SetTableName(tableName);
52     int errCode = InnerAction(context, cloudDb, INSERT);
53     uploadInfo = context->GetInfo();
54     context->MoveOutRecordAndExtend(record, extend);
55     return errCode;
56 }
57 
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)58 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
59     std::vector<VBucket> &extend, Info &uploadInfo)
60 {
61     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
62     if (iCloudDb_ == nullptr) {
63         return -E_CLOUD_ERROR;
64     }
65     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
66     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
67     context->SetTableName(tableName);
68     context->MoveInRecordAndExtend(record, extend);
69     int errCode = InnerAction(context, cloudDb, UPDATE);
70     uploadInfo = context->GetInfo();
71     return errCode;
72 }
73 
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)74 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
75     Info &uploadInfo)
76 {
77     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
78     if (iCloudDb_ == nullptr) {
79         return -E_CLOUD_ERROR;
80     }
81     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
82     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
83     context->MoveInRecordAndExtend(record, extend);
84     context->SetTableName(tableName);
85     int errCode = InnerAction(context, cloudDb, DELETE);
86     uploadInfo = context->GetInfo();
87     return errCode;
88 }
89 
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)90 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
91 {
92     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
93     if (iCloudDb_ == nullptr) {
94         return -E_CLOUD_ERROR;
95     }
96     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
97     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
98     context->MoveInQueryExtendAndData(extend, data);
99     context->SetTableName(tableName);
100     int errCode = InnerAction(context, cloudDb, QUERY);
101     context->MoveOutQueryExtendAndData(extend, data);
102     return errCode;
103 }
104 
Lock()105 std::pair<int, uint64_t> CloudDBProxy::Lock()
106 {
107     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
108     if (iCloudDb_ == nullptr) {
109         return { -E_CLOUD_ERROR, 0u };
110     }
111     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
112     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
113     std::pair<int, uint64_t> lockStatus;
114     int errCode = InnerAction(context, cloudDb, LOCK);
115     context->MoveOutLockStatus(lockStatus);
116     lockStatus.first = errCode;
117     return lockStatus;
118 }
119 
UnLock()120 int CloudDBProxy::UnLock()
121 {
122     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
123     if (iCloudDb_ == nullptr) {
124         return -E_CLOUD_ERROR;
125     }
126     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
127     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
128     return InnerAction(context, cloudDb, UNLOCK);
129 }
130 
Close()131 int CloudDBProxy::Close()
132 {
133     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
134     {
135         std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
136         if (iCloudDb_ == nullptr) {
137             return E_OK;
138         }
139         iCloudDb = iCloudDb_;
140         iCloudDb_ = nullptr;
141     }
142     {
143         std::unique_lock<std::mutex> uniqueLock(asyncTaskMutex_);
144         LOGD("[CloudDBProxy] wait for all asyncTask  begin");
145         asyncTaskCv_.wait(uniqueLock, [this]() {
146             return asyncTaskCount_ <= 0;
147         });
148         LOGD("[CloudDBProxy] wait for all asyncTask end");
149     }
150     LOGD("[CloudDBProxy] call cloudDb close begin");
151     DBStatus status = iCloudDb->Close();
152     LOGD("[CloudDBProxy] call cloudDb close end");
153     return status == OK ? E_OK : -E_CLOUD_ERROR;
154 }
155 
HeartBeat()156 int CloudDBProxy::HeartBeat()
157 {
158     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
159     if (iCloudDb_ == nullptr) {
160         return -E_CLOUD_ERROR;
161     }
162 
163     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
164     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
165     return InnerAction(context, cloudDb, HEARTBEAT);
166 }
167 
IsNotExistCloudDB() const168 bool CloudDBProxy::IsNotExistCloudDB() const
169 {
170     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
171     return iCloudDb_ == nullptr;
172 }
173 
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)174 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
175     std::map<std::string, Assets> &assets)
176 {
177     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
178     if (iAssetLoader_ == nullptr) {
179         LOGE("Asset loader has not been set %d", -E_NOT_SET);
180         return -E_NOT_SET;
181     }
182     DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
183     if (status != OK) {
184         LOGE("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
185     }
186     return GetInnerErrorCode(status);
187 }
188 
RemoveLocalAssets(const std::vector<Asset> & assets)189 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
190 {
191     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
192     if (iAssetLoader_ == nullptr) {
193         LOGE("Asset loader has not been set %d", -E_NOT_SET);
194         return -E_NOT_SET;
195     }
196     DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
197     if (status != OK) {
198         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
199     }
200     return GetInnerErrorCode(status);
201 }
202 
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)203 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
204     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
205 {
206     if (action >= InnerActionCode::INVALID_ACTION) {
207         return -E_INVALID_ARGS;
208     }
209     {
210         std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
211         asyncTaskCount_++;
212     }
213     int errCode = RuntimeContext::GetInstance()->ScheduleTask([cloudDb, context, action, this]() {
214         InnerActionTask(context, cloudDb, action);
215     });
216     if (errCode != E_OK) {
217         {
218             std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
219             asyncTaskCount_--;
220         }
221         asyncTaskCv_.notify_all();
222         LOGW("[CloudDBProxy] Schedule async task error %d", errCode);
223         return errCode;
224     }
225     if (context->WaitForRes(timeout_)) {
226         errCode = context->GetActionRes();
227     } else {
228         errCode = -E_TIMEOUT;
229     }
230     return errCode;
231 }
232 
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)233 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
234     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
235 {
236     DBStatus status = OK;
237     std::vector<VBucket> record;
238     std::vector<VBucket> extend;
239     context->MoveOutRecordAndExtend(record, extend);
240     size_t dataSize = extend.size();
241 
242     switch (action) {
243         case INSERT: {
244             status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
245             context->MoveInExtend(extend);
246             break;
247         }
248         case UPDATE: {
249             status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
250             // no need to MoveIn, only insert need extend for insert gid
251             break;
252         }
253         case DELETE: {
254             status = cloudDb->BatchDelete(context->GetTableName(), extend);
255             // no need to MoveIn, only insert need extend for insert gid
256             break;
257         }
258         default: {
259             LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
260             return INVALID_ARGS;
261         }
262     }
263     if (status == OK) {
264         context->SetInfo(dataSize, dataSize, 0u);
265     } else {
266         LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
267         context->SetInfo(dataSize, 0u, dataSize);
268     }
269     return status;
270 }
271 
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)272 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
273     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
274 {
275     DBStatus status = OK;
276     bool setResAlready = false;
277     LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
278     switch (action) {
279         case INSERT:
280         case UPDATE:
281         case DELETE:
282             status = DMLActionTask(context, cloudDb, action);
283             break;
284         case QUERY: {
285             VBucket queryExtend;
286             std::vector<VBucket> data;
287             context->MoveOutQueryExtendAndData(queryExtend, data);
288             status = cloudDb->Query(context->GetTableName(), queryExtend, data);
289             context->MoveInQueryExtendAndData(queryExtend, data);
290 
291             if (status == QUERY_END) {
292                 setResAlready = true;
293                 context->SetActionRes(-E_QUERY_END);
294             }
295             break;
296         }
297         case LOCK: {
298             status = InnerActionLock(context, cloudDb);
299             break;
300         }
301         case UNLOCK:
302             status = cloudDb->UnLock();
303             break;
304         case HEARTBEAT:
305             status = cloudDb->HeartBeat();
306             break;
307         default: // should not happen
308             status = DB_ERROR;
309     }
310     LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
311 
312     if (!setResAlready) {
313         context->SetActionRes(GetInnerErrorCode(status));
314     }
315 
316     context->FinishAndNotify();
317     {
318         std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
319         asyncTaskCount_--;
320     }
321     asyncTaskCv_.notify_all();
322 }
323 
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)324 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
325     const std::shared_ptr<ICloudDb> &cloudDb)
326 {
327     DBStatus status = OK;
328     std::pair<int, uint64_t> lockRet;
329     std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
330     if (lockStatus.first != OK) {
331         status = lockStatus.first;
332     } else if (lockStatus.second == 0) {
333         status = CLOUD_ERROR;
334     }
335     lockRet.second = lockStatus.second;
336     lockRet.first = GetInnerErrorCode(status);
337     context->MoveInLockStatus(lockRet);
338     return status;
339 }
340 
GetInnerErrorCode(DBStatus status)341 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
342 {
343     switch (status) {
344         case OK:
345             return E_OK;
346         case CLOUD_NETWORK_ERROR:
347             return -E_CLOUD_NETWORK_ERROR;
348         case CLOUD_SYNC_UNSET:
349             return -E_CLOUD_SYNC_UNSET;
350         case CLOUD_FULL_RECORDS:
351             return -E_CLOUD_FULL_RECORDS;
352         case CLOUD_LOCK_ERROR:
353             return -E_CLOUD_LOCK_ERROR;
354         case CLOUD_ASSET_SPACE_INSUFFICIENT:
355             return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
356         default:
357             return -E_CLOUD_ERROR;
358     }
359 }
360 
CloudActionContext()361 CloudDBProxy::CloudActionContext::CloudActionContext()
362     : actionFinished_(false),
363       actionRes_(OK),
364       totalCount_(0u),
365       successCount_(0u),
366       failedCount_(0u)
367 {
368 }
369 
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)370 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
371     std::vector<VBucket> &extend)
372 {
373     std::lock_guard<std::mutex> autoLock(actionMutex_);
374     record_ = std::move(record);
375     extend_ = std::move(extend);
376 }
377 
MoveInExtend(std::vector<VBucket> & extend)378 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
379 {
380     std::lock_guard<std::mutex> autoLock(actionMutex_);
381     extend_ = std::move(extend);
382 }
383 
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)384 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
385     std::vector<VBucket> &extend)
386 {
387     std::lock_guard<std::mutex> autoLock(actionMutex_);
388     record = std::move(record_);
389     extend = std::move(extend_);
390 }
391 
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)392 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
393 {
394     std::lock_guard<std::mutex> autoLock(actionMutex_);
395     queryExtend_ = std::move(extend);
396     data_ = std::move(data);
397 }
398 
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)399 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
400 {
401     std::lock_guard<std::mutex> autoLock(actionMutex_);
402     extend = std::move(queryExtend_);
403     data = std::move(data_);
404 }
405 
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)406 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
407 {
408     std::lock_guard<std::mutex> autoLock(actionMutex_);
409     lockStatus_ = std::move(lockStatus);
410 }
411 
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)412 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
413 {
414     std::lock_guard<std::mutex> autoLock(actionMutex_);
415     lockStatus = std::move(lockStatus_);
416 }
417 
WaitForRes(int64_t timeout)418 bool CloudDBProxy::CloudActionContext::WaitForRes(int64_t timeout)
419 {
420     std::unique_lock<std::mutex> uniqueLock(actionMutex_);
421     if (timeout == 0) {
422         actionCv_.wait(uniqueLock, [this]() {
423             return actionFinished_;
424         });
425         return true;
426     }
427     return actionCv_.wait_for(uniqueLock, std::chrono::milliseconds(timeout), [this]() {
428         return actionFinished_;
429     });
430 }
431 
FinishAndNotify()432 void CloudDBProxy::CloudActionContext::FinishAndNotify()
433 {
434     {
435         std::lock_guard<std::mutex> autoLock(actionMutex_);
436         actionFinished_ = true;
437     }
438     actionCv_.notify_all();
439 }
440 
SetActionRes(int res)441 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
442 {
443     std::lock_guard<std::mutex> autoLock(actionMutex_);
444     actionRes_ = res;
445 }
446 
GetActionRes()447 int CloudDBProxy::CloudActionContext::GetActionRes()
448 {
449     std::lock_guard<std::mutex> autoLock(actionMutex_);
450     return actionRes_;
451 }
452 
GetInfo()453 Info CloudDBProxy::CloudActionContext::GetInfo()
454 {
455     std::lock_guard<std::mutex> autoLock(actionMutex_);
456     Info info;
457     info.total = totalCount_;
458     info.successCount = successCount_;
459     info.failCount = failedCount_;
460     return info;
461 }
462 
SetInfo(const uint32_t & totalCount,const uint32_t & successCount,const uint32_t & failedCount)463 void CloudDBProxy::CloudActionContext::SetInfo(const uint32_t &totalCount,
464     const uint32_t &successCount, const uint32_t &failedCount)
465 {
466     totalCount_ = totalCount;
467     successCount_ = successCount;
468     failedCount_ = failedCount;
469 }
470 
SetTableName(const std::string & tableName)471 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
472 {
473     std::lock_guard<std::mutex> autoLock(actionMutex_);
474     tableName_ = tableName;
475 }
476 
GetTableName()477 std::string CloudDBProxy::CloudActionContext::GetTableName()
478 {
479     std::lock_guard<std::mutex> autoLock(actionMutex_);
480     return tableName_;
481 }
482 }