1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_db_proxy.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18 #include "runtime_context.h"
19
20 namespace DistributedDB {
CloudDBProxy()21 CloudDBProxy::CloudDBProxy()
22 : timeout_(0),
23 asyncTaskCount_(0)
24 {
25 }
26
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)27 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
28 {
29 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
30 if (!iCloudDb_) {
31 iCloudDb_ = cloudDB;
32 }
33 }
34
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)35 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
36 {
37 std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
38 iAssetLoader_ = loader;
39 }
40
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)41 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
42 std::vector<VBucket> &extend, Info &uploadInfo)
43 {
44 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
45 if (iCloudDb_ == nullptr) {
46 return -E_CLOUD_ERROR;
47 }
48 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
49 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
50 context->MoveInRecordAndExtend(record, extend);
51 context->SetTableName(tableName);
52 int errCode = InnerAction(context, cloudDb, INSERT);
53 uploadInfo = context->GetInfo();
54 context->MoveOutRecordAndExtend(record, extend);
55 return errCode;
56 }
57
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)58 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
59 std::vector<VBucket> &extend, Info &uploadInfo)
60 {
61 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
62 if (iCloudDb_ == nullptr) {
63 return -E_CLOUD_ERROR;
64 }
65 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
66 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
67 context->SetTableName(tableName);
68 context->MoveInRecordAndExtend(record, extend);
69 int errCode = InnerAction(context, cloudDb, UPDATE);
70 uploadInfo = context->GetInfo();
71 return errCode;
72 }
73
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)74 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
75 Info &uploadInfo)
76 {
77 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
78 if (iCloudDb_ == nullptr) {
79 return -E_CLOUD_ERROR;
80 }
81 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
82 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
83 context->MoveInRecordAndExtend(record, extend);
84 context->SetTableName(tableName);
85 int errCode = InnerAction(context, cloudDb, DELETE);
86 uploadInfo = context->GetInfo();
87 return errCode;
88 }
89
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)90 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
91 {
92 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
93 if (iCloudDb_ == nullptr) {
94 return -E_CLOUD_ERROR;
95 }
96 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
97 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
98 context->MoveInQueryExtendAndData(extend, data);
99 context->SetTableName(tableName);
100 int errCode = InnerAction(context, cloudDb, QUERY);
101 context->MoveOutQueryExtendAndData(extend, data);
102 return errCode;
103 }
104
Lock()105 std::pair<int, uint64_t> CloudDBProxy::Lock()
106 {
107 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
108 if (iCloudDb_ == nullptr) {
109 return { -E_CLOUD_ERROR, 0u };
110 }
111 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
112 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
113 std::pair<int, uint64_t> lockStatus;
114 int errCode = InnerAction(context, cloudDb, LOCK);
115 context->MoveOutLockStatus(lockStatus);
116 lockStatus.first = errCode;
117 return lockStatus;
118 }
119
UnLock()120 int CloudDBProxy::UnLock()
121 {
122 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
123 if (iCloudDb_ == nullptr) {
124 return -E_CLOUD_ERROR;
125 }
126 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
127 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
128 return InnerAction(context, cloudDb, UNLOCK);
129 }
130
Close()131 int CloudDBProxy::Close()
132 {
133 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
134 {
135 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
136 if (iCloudDb_ == nullptr) {
137 return E_OK;
138 }
139 iCloudDb = iCloudDb_;
140 iCloudDb_ = nullptr;
141 }
142 {
143 std::unique_lock<std::mutex> uniqueLock(asyncTaskMutex_);
144 LOGD("[CloudDBProxy] wait for all asyncTask begin");
145 asyncTaskCv_.wait(uniqueLock, [this]() {
146 return asyncTaskCount_ <= 0;
147 });
148 LOGD("[CloudDBProxy] wait for all asyncTask end");
149 }
150 LOGD("[CloudDBProxy] call cloudDb close begin");
151 DBStatus status = iCloudDb->Close();
152 LOGD("[CloudDBProxy] call cloudDb close end");
153 return status == OK ? E_OK : -E_CLOUD_ERROR;
154 }
155
HeartBeat()156 int CloudDBProxy::HeartBeat()
157 {
158 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
159 if (iCloudDb_ == nullptr) {
160 return -E_CLOUD_ERROR;
161 }
162
163 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
164 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
165 return InnerAction(context, cloudDb, HEARTBEAT);
166 }
167
IsNotExistCloudDB() const168 bool CloudDBProxy::IsNotExistCloudDB() const
169 {
170 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
171 return iCloudDb_ == nullptr;
172 }
173
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)174 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
175 std::map<std::string, Assets> &assets)
176 {
177 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
178 if (iAssetLoader_ == nullptr) {
179 LOGE("Asset loader has not been set %d", -E_NOT_SET);
180 return -E_NOT_SET;
181 }
182 DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
183 if (status != OK) {
184 LOGE("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
185 }
186 return GetInnerErrorCode(status);
187 }
188
RemoveLocalAssets(const std::vector<Asset> & assets)189 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
190 {
191 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
192 if (iAssetLoader_ == nullptr) {
193 LOGE("Asset loader has not been set %d", -E_NOT_SET);
194 return -E_NOT_SET;
195 }
196 DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
197 if (status != OK) {
198 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
199 }
200 return GetInnerErrorCode(status);
201 }
202
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)203 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
204 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
205 {
206 if (action >= InnerActionCode::INVALID_ACTION) {
207 return -E_INVALID_ARGS;
208 }
209 {
210 std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
211 asyncTaskCount_++;
212 }
213 int errCode = RuntimeContext::GetInstance()->ScheduleTask([cloudDb, context, action, this]() {
214 InnerActionTask(context, cloudDb, action);
215 });
216 if (errCode != E_OK) {
217 {
218 std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
219 asyncTaskCount_--;
220 }
221 asyncTaskCv_.notify_all();
222 LOGW("[CloudDBProxy] Schedule async task error %d", errCode);
223 return errCode;
224 }
225 if (context->WaitForRes(timeout_)) {
226 errCode = context->GetActionRes();
227 } else {
228 errCode = -E_TIMEOUT;
229 }
230 return errCode;
231 }
232
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)233 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
234 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
235 {
236 DBStatus status = OK;
237 std::vector<VBucket> record;
238 std::vector<VBucket> extend;
239 context->MoveOutRecordAndExtend(record, extend);
240 size_t dataSize = extend.size();
241
242 switch (action) {
243 case INSERT: {
244 status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
245 context->MoveInExtend(extend);
246 break;
247 }
248 case UPDATE: {
249 status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
250 // no need to MoveIn, only insert need extend for insert gid
251 break;
252 }
253 case DELETE: {
254 status = cloudDb->BatchDelete(context->GetTableName(), extend);
255 // no need to MoveIn, only insert need extend for insert gid
256 break;
257 }
258 default: {
259 LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
260 return INVALID_ARGS;
261 }
262 }
263 if (status == OK) {
264 context->SetInfo(dataSize, dataSize, 0u);
265 } else {
266 LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
267 context->SetInfo(dataSize, 0u, dataSize);
268 }
269 return status;
270 }
271
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)272 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
273 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
274 {
275 DBStatus status = OK;
276 bool setResAlready = false;
277 LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
278 switch (action) {
279 case INSERT:
280 case UPDATE:
281 case DELETE:
282 status = DMLActionTask(context, cloudDb, action);
283 break;
284 case QUERY: {
285 VBucket queryExtend;
286 std::vector<VBucket> data;
287 context->MoveOutQueryExtendAndData(queryExtend, data);
288 status = cloudDb->Query(context->GetTableName(), queryExtend, data);
289 context->MoveInQueryExtendAndData(queryExtend, data);
290
291 if (status == QUERY_END) {
292 setResAlready = true;
293 context->SetActionRes(-E_QUERY_END);
294 }
295 break;
296 }
297 case LOCK: {
298 status = InnerActionLock(context, cloudDb);
299 break;
300 }
301 case UNLOCK:
302 status = cloudDb->UnLock();
303 break;
304 case HEARTBEAT:
305 status = cloudDb->HeartBeat();
306 break;
307 default: // should not happen
308 status = DB_ERROR;
309 }
310 LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
311
312 if (!setResAlready) {
313 context->SetActionRes(GetInnerErrorCode(status));
314 }
315
316 context->FinishAndNotify();
317 {
318 std::lock_guard<std::mutex> uniqueLock(asyncTaskMutex_);
319 asyncTaskCount_--;
320 }
321 asyncTaskCv_.notify_all();
322 }
323
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)324 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
325 const std::shared_ptr<ICloudDb> &cloudDb)
326 {
327 DBStatus status = OK;
328 std::pair<int, uint64_t> lockRet;
329 std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
330 if (lockStatus.first != OK) {
331 status = lockStatus.first;
332 } else if (lockStatus.second == 0) {
333 status = CLOUD_ERROR;
334 }
335 lockRet.second = lockStatus.second;
336 lockRet.first = GetInnerErrorCode(status);
337 context->MoveInLockStatus(lockRet);
338 return status;
339 }
340
GetInnerErrorCode(DBStatus status)341 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
342 {
343 switch (status) {
344 case OK:
345 return E_OK;
346 case CLOUD_NETWORK_ERROR:
347 return -E_CLOUD_NETWORK_ERROR;
348 case CLOUD_SYNC_UNSET:
349 return -E_CLOUD_SYNC_UNSET;
350 case CLOUD_FULL_RECORDS:
351 return -E_CLOUD_FULL_RECORDS;
352 case CLOUD_LOCK_ERROR:
353 return -E_CLOUD_LOCK_ERROR;
354 case CLOUD_ASSET_SPACE_INSUFFICIENT:
355 return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
356 default:
357 return -E_CLOUD_ERROR;
358 }
359 }
360
CloudActionContext()361 CloudDBProxy::CloudActionContext::CloudActionContext()
362 : actionFinished_(false),
363 actionRes_(OK),
364 totalCount_(0u),
365 successCount_(0u),
366 failedCount_(0u)
367 {
368 }
369
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)370 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
371 std::vector<VBucket> &extend)
372 {
373 std::lock_guard<std::mutex> autoLock(actionMutex_);
374 record_ = std::move(record);
375 extend_ = std::move(extend);
376 }
377
MoveInExtend(std::vector<VBucket> & extend)378 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
379 {
380 std::lock_guard<std::mutex> autoLock(actionMutex_);
381 extend_ = std::move(extend);
382 }
383
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)384 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
385 std::vector<VBucket> &extend)
386 {
387 std::lock_guard<std::mutex> autoLock(actionMutex_);
388 record = std::move(record_);
389 extend = std::move(extend_);
390 }
391
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)392 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
393 {
394 std::lock_guard<std::mutex> autoLock(actionMutex_);
395 queryExtend_ = std::move(extend);
396 data_ = std::move(data);
397 }
398
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)399 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
400 {
401 std::lock_guard<std::mutex> autoLock(actionMutex_);
402 extend = std::move(queryExtend_);
403 data = std::move(data_);
404 }
405
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)406 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
407 {
408 std::lock_guard<std::mutex> autoLock(actionMutex_);
409 lockStatus_ = std::move(lockStatus);
410 }
411
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)412 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
413 {
414 std::lock_guard<std::mutex> autoLock(actionMutex_);
415 lockStatus = std::move(lockStatus_);
416 }
417
WaitForRes(int64_t timeout)418 bool CloudDBProxy::CloudActionContext::WaitForRes(int64_t timeout)
419 {
420 std::unique_lock<std::mutex> uniqueLock(actionMutex_);
421 if (timeout == 0) {
422 actionCv_.wait(uniqueLock, [this]() {
423 return actionFinished_;
424 });
425 return true;
426 }
427 return actionCv_.wait_for(uniqueLock, std::chrono::milliseconds(timeout), [this]() {
428 return actionFinished_;
429 });
430 }
431
FinishAndNotify()432 void CloudDBProxy::CloudActionContext::FinishAndNotify()
433 {
434 {
435 std::lock_guard<std::mutex> autoLock(actionMutex_);
436 actionFinished_ = true;
437 }
438 actionCv_.notify_all();
439 }
440
SetActionRes(int res)441 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
442 {
443 std::lock_guard<std::mutex> autoLock(actionMutex_);
444 actionRes_ = res;
445 }
446
GetActionRes()447 int CloudDBProxy::CloudActionContext::GetActionRes()
448 {
449 std::lock_guard<std::mutex> autoLock(actionMutex_);
450 return actionRes_;
451 }
452
GetInfo()453 Info CloudDBProxy::CloudActionContext::GetInfo()
454 {
455 std::lock_guard<std::mutex> autoLock(actionMutex_);
456 Info info;
457 info.total = totalCount_;
458 info.successCount = successCount_;
459 info.failCount = failedCount_;
460 return info;
461 }
462
SetInfo(const uint32_t & totalCount,const uint32_t & successCount,const uint32_t & failedCount)463 void CloudDBProxy::CloudActionContext::SetInfo(const uint32_t &totalCount,
464 const uint32_t &successCount, const uint32_t &failedCount)
465 {
466 totalCount_ = totalCount;
467 successCount_ = successCount;
468 failedCount_ = failedCount;
469 }
470
SetTableName(const std::string & tableName)471 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
472 {
473 std::lock_guard<std::mutex> autoLock(actionMutex_);
474 tableName_ = tableName;
475 }
476
GetTableName()477 std::string CloudDBProxy::CloudActionContext::GetTableName()
478 {
479 std::lock_guard<std::mutex> autoLock(actionMutex_);
480 return tableName_;
481 }
482 }