1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include <utility>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_data_utils.h"
20 #include "cloud/cloud_db_proxy.h"
21 #include "cloud/cloud_db_types.h"
22 #include "cloud/cloud_sync_utils.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_errno.h"
25 #include "mock_icloud_sync_storage_interface.h"
26 #include "virtual_asset_loader.h"
27 #include "virtual_cloud_db.h"
28 #include "virtual_cloud_syncer.h"
29 #include "virtual_communicator_aggregator.h"
30
31 using namespace std;
32 using namespace testing::ext;
33 using namespace DistributedDB;
34
35 namespace {
36 constexpr const char *TABLE_NAME = "Table";
GetFields()37 std::vector<Field> GetFields()
38 {
39 return {
40 {
41 .colName = "col1",
42 .type = TYPE_INDEX<int64_t>,
43 .primary = true,
44 .nullable = false
45 },
46 {
47 .colName = "col2",
48 .type = TYPE_INDEX<std::string>,
49 .primary = false
50 },
51 {
52 .colName = "col3",
53 .type = TYPE_INDEX<Bytes>,
54 .primary = false
55 }
56 };
57 }
58
ModifyRecords(std::vector<VBucket> & expectRecord)59 void ModifyRecords(std::vector<VBucket> &expectRecord)
60 {
61 std::vector<VBucket> tempRecord;
62 for (const auto &record: expectRecord) {
63 VBucket bucket;
64 for (auto &[field, val] : record) {
65 LOGD("modify field %s", field.c_str());
66 if (val.index() == TYPE_INDEX<int64_t>) {
67 int64_t v = std::get<int64_t>(val);
68 bucket.insert({ field, static_cast<int64_t>(v + 1) });
69 } else {
70 bucket.insert({ field, val });
71 }
72 }
73 tempRecord.push_back(bucket);
74 }
75 expectRecord = tempRecord;
76 }
77
Sync(CloudSyncer * cloudSyncer,int & callCount)78 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
79 {
80 std::mutex processMutex;
81 std::condition_variable cv;
82 SyncProcess syncProcess;
83 const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
84 const std::map<std::string, SyncProcess> &process) {
85 {
86 std::lock_guard<std::mutex> autoLock(processMutex);
87 syncProcess = process.begin()->second;
88 if (!process.empty()) {
89 syncProcess = process.begin()->second;
90 } else {
91 SyncProcess tmpProcess;
92 syncProcess = tmpProcess;
93 }
94 callCount++;
95 }
96 cv.notify_all();
97 };
98 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
99 {
100 LOGI("begin to wait sync");
101 std::unique_lock<std::mutex> uniqueLock(processMutex);
102 cv.wait(uniqueLock, [&syncProcess]() {
103 return syncProcess.process == ProcessStatus::FINISHED;
104 });
105 LOGI("end to wait sync");
106 }
107 return syncProcess.errCode;
108 }
109
110 class DistributedDBCloudDBProxyTest : public testing::Test {
111 public:
112 static void SetUpTestCase();
113 static void TearDownTestCase();
114 void SetUp() override;
115 void TearDown() override;
116
117 protected:
118 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
119 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
120 };
121
SetUpTestCase()122 void DistributedDBCloudDBProxyTest::SetUpTestCase()
123 {
124 }
125
TearDownTestCase()126 void DistributedDBCloudDBProxyTest::TearDownTestCase()
127 {
128 }
129
SetUp()130 void DistributedDBCloudDBProxyTest::SetUp()
131 {
132 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
133 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
134 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
135 ASSERT_TRUE(communicatorAggregator_ != nullptr);
136 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
137 }
138
TearDown()139 void DistributedDBCloudDBProxyTest::TearDown()
140 {
141 virtualCloudDb_ = nullptr;
142 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
143 communicatorAggregator_ = nullptr;
144 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
145 }
146
147 /**
148 * @tc.name: CloudDBProxyTest001
149 * @tc.desc: Verify cloud db init and close function.
150 * @tc.type: FUNC
151 * @tc.require:
152 * @tc.author: zhangqiquan
153 */
154 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
155 {
156 /**
157 * @tc.steps: step1. set cloud db to proxy
158 * @tc.expected: step1. E_OK
159 */
160 CloudDBProxy proxy;
161 proxy.SetCloudDB(virtualCloudDb_);
162 /**
163 * @tc.steps: step2. proxy close cloud db with cloud error
164 * @tc.expected: step2. -E_CLOUD_ERROR
165 */
166 virtualCloudDb_->SetCloudError(true);
167 EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
168 /**
169 * @tc.steps: step3. proxy close cloud db again
170 * @tc.expected: step3. E_OK because cloud db has been set nullptr
171 */
172 EXPECT_EQ(proxy.Close(), E_OK);
173 virtualCloudDb_->SetCloudError(false);
174 EXPECT_EQ(proxy.Close(), E_OK);
175 }
176
177 /**
178 * @tc.name: CloudDBProxyTest002
179 * @tc.desc: Verify cloud db insert function.
180 * @tc.type: FUNC
181 * @tc.require:
182 * @tc.author: zhangqiquan
183 */
184 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
185 {
186 /**
187 * @tc.steps: step1. set cloud db to proxy
188 * @tc.expected: step1. E_OK
189 */
190 CloudDBProxy proxy;
191 proxy.SetCloudDB(virtualCloudDb_);
192 /**
193 * @tc.steps: step2. insert data to cloud db
194 * @tc.expected: step2. OK
195 */
196 TableSchema schema = {
197 .name = TABLE_NAME,
198 .sharedTableName = "",
199 .fields = GetFields()
200 };
201 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
202 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
203 Info uploadInfo;
204 std::vector<VBucket> insert = expectRecords;
205 uint32_t retryCount = 0;
206 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo, retryCount), E_OK);
207
208 VBucket extend;
209 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
210 std::vector<VBucket> actualRecords;
211 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
212 /**
213 * @tc.steps: step3. proxy query data
214 * @tc.expected: step3. data is equal to expect
215 */
216 ASSERT_EQ(actualRecords.size(), expectRecords.size());
217 for (size_t i = 0; i < actualRecords.size(); ++i) {
218 for (const auto &field: schema.fields) {
219 Type expect = expectRecords[i][field.colName];
220 Type actual = actualRecords[i][field.colName];
221 EXPECT_EQ(expect.index(), actual.index());
222 }
223 }
224 /**
225 * @tc.steps: step4. proxy close cloud db
226 * @tc.expected: step4. E_OK
227 */
228 EXPECT_EQ(proxy.Close(), E_OK);
229 }
230
231 /**
232 * @tc.name: CloudDBProxyTest003
233 * @tc.desc: Verify cloud db update function.
234 * @tc.type: FUNC
235 * @tc.require:
236 * @tc.author: zhangqiquan
237 */
238 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
239 {
240 TableSchema schema = {
241 .name = TABLE_NAME,
242 .sharedTableName = "",
243 .fields = GetFields()
244 };
245 /**
246 * @tc.steps: step1. set cloud db to proxy
247 * @tc.expected: step1. E_OK
248 */
249 CloudDBProxy proxy;
250 proxy.SetCloudDB(virtualCloudDb_);
251 /**
252 * @tc.steps: step2. insert data to cloud db
253 * @tc.expected: step2. OK
254 */
255 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
256 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
257 Info uploadInfo;
258 std::vector<VBucket> insert = expectRecords;
259 uint32_t retryCount = 0;
260 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo, retryCount), E_OK);
261 /**
262 * @tc.steps: step3. update data to cloud db
263 * @tc.expected: step3. E_OK
264 */
265 ModifyRecords(expectRecords);
266 std::vector<VBucket> update = expectRecords;
267 EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo, retryCount), E_OK);
268 /**
269 * @tc.steps: step3. proxy close cloud db
270 * @tc.expected: step3. E_OK
271 */
272 VBucket extend;
273 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
274 std::vector<VBucket> actualRecords;
275 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
276 ASSERT_EQ(actualRecords.size(), expectRecords.size());
277 for (size_t i = 0; i < actualRecords.size(); ++i) {
278 for (const auto &field: schema.fields) {
279 Type expect = expectRecords[i][field.colName];
280 Type actual = actualRecords[i][field.colName];
281 EXPECT_EQ(expect.index(), actual.index());
282 }
283 }
284 /**
285 * @tc.steps: step4. proxy close cloud db
286 * @tc.expected: step4. E_OK
287 */
288 EXPECT_EQ(proxy.Close(), E_OK);
289 }
290
291 /**
292 * @tc.name: CloudDBProxyTest004
293 * @tc.desc: Verify cloud db init and close function with multiple CloudDbs.
294 * @tc.type: FUNC
295 * @tc.require:
296 * @tc.author: zhangtao
297 */
298 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest004, TestSize.Level0)
299 {
300 /**
301 * @tc.steps: step1. set cloud db to proxy
302 * @tc.expected: step1. E_OK
303 */
304 CloudDBProxy proxy;
305 std::string syncUserA = "SyncUserA";
306 std::string syncUserB = "SyncUserB";
307 std::string syncUserC = "SyncUserC";
308 std::shared_ptr<VirtualCloudDb> virtualCloudDbB = std::make_shared<VirtualCloudDb>();
309 std::shared_ptr<VirtualCloudDb> virtualCloudDbC = std::make_shared<VirtualCloudDb>();
310 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDBs = {
311 {syncUserA, virtualCloudDb_}, {syncUserB, virtualCloudDbB}, {syncUserC, virtualCloudDbC}
312 };
313 proxy.SetCloudDB(cloudDBs);
314 /**
315 * @tc.steps: step2. proxy close cloud db with cloud error
316 * @tc.expected: step2. -E_CLOUD_ERROR
317 */
318 for (const auto &pair : cloudDBs) {
319 std::shared_ptr<ICloudDb> basePtr = pair.second;
320 auto vtrPtr = static_cast<VirtualCloudDb*>(basePtr.get());
321 vtrPtr->SetCloudError(true);
322 }
323 EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
324 /**
325 * @tc.steps: step3. proxy close cloud db again
326 * @tc.expected: step3. E_OK because cloud db has been set nullptr
327 */
328 EXPECT_EQ(proxy.Close(), E_OK);
329 for (const auto &pair : cloudDBs) {
330 std::shared_ptr<ICloudDb> basePtr = pair.second;
331 auto vtrPtr = static_cast<VirtualCloudDb*>(basePtr.get());
332 vtrPtr->SetCloudError(false);
333 }
334 EXPECT_EQ(proxy.Close(), E_OK);
335 }
336
337 /**
338 * @tc.name: CloudDBProxyTest005
339 * @tc.desc: Verify sync failed after cloud error.
340 * @tc.type: FUNC
341 * @tc.require:
342 * @tc.author: zhangqiquan
343 */
344 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
345 {
346 /**
347 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
348 * @tc.expected: step1. E_OK
349 */
350 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
351 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
352 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
353 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
354 ASSERT_NE(cloudSyncer, nullptr);
355 cloudSyncer->SetCloudDB(virtualCloudDb_);
356 cloudSyncer->SetSyncAction(true, false);
357 virtualCloudDb_->SetCloudError(true);
358 /**
359 * @tc.steps: step2. call sync and wait sync finish
360 * @tc.expected: step2. CLOUD_ERROR by lock error
361 */
362 int callCount = 0;
363 EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
364 /**
365 * @tc.steps: step3. get cloud lock status and heartbeat count
366 * @tc.expected: step3. cloud is unlock and no heartbeat
367 */
368 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
369 EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
370 virtualCloudDb_->ClearHeartbeatCount();
371 cloudSyncer->Close();
372 RefObject::KillAndDecObjRef(cloudSyncer);
373 }
374
375 /**
376 * @tc.name: CloudDBProxyTest008
377 * @tc.desc: Verify cloud db heartbeat with diff status.
378 * @tc.type: FUNC
379 * @tc.require:
380 * @tc.author: zhangqiquan
381 */
382 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
383 {
384 /**
385 * @tc.steps: step1. set cloud db to proxy
386 * @tc.expected: step1. E_OK
387 */
388 CloudDBProxy proxy;
389 proxy.SetCloudDB(virtualCloudDb_);
390 /**
391 * @tc.steps: step2. proxy heartbeat with diff status
392 */
393 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
394 int errCode = proxy.HeartBeat();
395 EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
396 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
397
398 virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
399 errCode = proxy.HeartBeat();
400 EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
401 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
402
403 virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
404 errCode = proxy.HeartBeat();
405 EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
406 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
407
408 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
409 errCode = proxy.HeartBeat();
410 EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
411 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
412
413 virtualCloudDb_->SetActionStatus(DB_ERROR);
414 errCode = proxy.HeartBeat();
415 EXPECT_EQ(errCode, -E_CLOUD_ERROR);
416 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
417
418 /**
419 * @tc.steps: step3. proxy close cloud db
420 * @tc.expected: step3. E_OK
421 */
422 EXPECT_EQ(proxy.Close(), E_OK);
423 }
424
425 /**
426 * @tc.name: CloudDBProxyTest009
427 * @tc.desc: Verify cloud db closed and current task exit .
428 * @tc.type: FUNC
429 * @tc.require:
430 * @tc.author: zhangqiquan
431 */
432 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
433 {
434 /**
435 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
436 * @tc.expected: step1. E_OK
437 */
438 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
439 ASSERT_NE(iCloud, nullptr);
440 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
441 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
442 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
443 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
444 ASSERT_NE(cloudSyncer, nullptr);
445 cloudSyncer->SetCloudDB(virtualCloudDb_);
446 cloudSyncer->SetSyncAction(true, false);
__anon8a9b21740402() 447 cloudSyncer->SetDownloadFunc([]() {
448 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
449 return -E_CLOUD_ERROR;
450 });
451 /**
452 * @tc.steps: step2. call sync and wait sync finish
453 * @tc.expected: step2. E_OK
454 */
455 std::mutex processMutex;
456 bool finished = false;
457 std::condition_variable cv;
458 LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon8a9b21740502(const std::map<std::string, SyncProcess> &process) 459 const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
460 {
461 std::lock_guard<std::mutex> autoLock(processMutex);
462 for (const auto &item: process) {
463 if (item.second.process == DistributedDB::FINISHED) {
464 finished = true;
465 EXPECT_EQ(item.second.errCode, DB_CLOSED);
466 }
467 }
468 }
469 cv.notify_all();
470 };
471 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
472 std::this_thread::sleep_for(std::chrono::seconds(1));
473 cloudSyncer->Close();
474 {
475 LOGI("[CloudDBProxyTest009] begin to wait sync");
476 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon8a9b21740602() 477 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finished]() {
478 return finished;
479 });
480 LOGI("[CloudDBProxyTest009] end to wait sync");
481 }
482 RefObject::KillAndDecObjRef(cloudSyncer);
483 }
484
485 /**
486 * @tc.name: CloudDBProxyTest010
487 * @tc.desc: Verify cloud db lock with diff status.
488 * @tc.type: FUNC
489 * @tc.require:
490 * @tc.author: zhangqiquan
491 */
492 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
493 {
494 /**
495 * @tc.steps: step1. set cloud db to proxy
496 * @tc.expected: step1. E_OK
497 */
498 CloudDBProxy proxy;
499 proxy.SetCloudDB(virtualCloudDb_);
500 /**
501 * @tc.steps: step2. proxy lock with diff status
502 */
503 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
504 auto ret = proxy.Lock();
505 EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
506 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
507
508 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
509 ret = proxy.Lock();
510 EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
511 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
512 /**
513 * @tc.steps: step3. proxy close cloud db
514 * @tc.expected: step3. E_OK
515 */
516 EXPECT_EQ(proxy.Close(), E_OK);
517 }
518
519 /**
520 * @tc.name: CloudDBProxyTest008
521 * @tc.desc: Verify cloud db heartbeat with diff status.
522 * @tc.type: FUNC
523 * @tc.require:
524 * @tc.author: zhangqiquan
525 */
526 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest011, TestSize.Level2)
527 {
528 /**
529 * @tc.steps: step1. set cloud db to proxy
530 * @tc.expected: step1. E_OK
531 */
532 CloudDBProxy proxy;
533 proxy.SetCloudDB(virtualCloudDb_);
534 virtualCloudDb_->SetHeartbeatBlockTime(100); // block 100 ms
535 std::mutex waitMutex;
536 std::condition_variable waitCv;
537 const int scheduleCount = 12;
538 int currentCount = 0;
539 for (int i = 0; i < scheduleCount; ++i) {
__anon8a9b21740702() 540 RuntimeContext::GetInstance()->ScheduleTask([&proxy, &waitMutex, &waitCv, ¤tCount]() {
541 proxy.HeartBeat();
542 {
543 std::lock_guard<std::mutex> autoLock(waitMutex);
544 currentCount++;
545 LOGI("[CloudDBProxyTest011] CurrentCount %d", currentCount);
546 }
547 waitCv.notify_all();
548 });
549 }
550 LOGI("[CloudDBProxyTest011] Begin wait all task finish");
551 std::unique_lock<std::mutex> uniqueLock(waitMutex);
__anon8a9b21740802() 552 waitCv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [¤tCount, scheduleCount]() {
553 return currentCount >= scheduleCount;
554 });
555 LOGI("[CloudDBProxyTest011] End wait all task finish");
556 EXPECT_EQ(currentCount, scheduleCount);
557 }
558
559 /**
560 * @tc.name: CloudDBProxyTest012
561 * @tc.desc: Asset data deduplication.
562 * @tc.type: FUNC
563 * @tc.require:
564 * @tc.author: tankaisheng
565 */
566 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest012, TestSize.Level2)
567 {
568 /**
569 * @tc.steps: step1. construct data
570 * @tc.expected: step1. E_OK
571 */
572 Assets assets;
573 Asset asset1;
574 asset1.name = "assetName1";
575 asset1.assetId = "";
576 asset1.modifyTime = "20240730";
577 assets.push_back(asset1);
578
579 Asset asset2;
580 asset2.name = "assetName1";
581 asset2.assetId = "1";
582 asset2.modifyTime = "20240730";
583 assets.push_back(asset2);
584
585 Asset asset3;
586 asset3.name = "assetName2";
587 asset3.assetId = "2";
588 asset3.modifyTime = "20240730";
589 assets.push_back(asset3);
590
591 Asset asset4;
592 asset4.name = "assetName2";
593 asset4.assetId = "3";
594 asset4.modifyTime = "20240731";
595 assets.push_back(asset4);
596
597 Asset asset5;
598 asset5.name = "assetName3";
599 asset5.assetId = "4";
600 asset5.modifyTime = "20240730";
601 assets.push_back(asset5);
602
603 Asset asset6;
604 asset6.name = "assetName3";
605 asset6.assetId = "5";
606 asset6.modifyTime = "20240730";
607 assets.push_back(asset6);
608
609 Asset asset7;
610 asset7.name = "assetName1";
611 asset7.assetId = "6";
612 asset7.modifyTime = "20240731";
613 assets.push_back(asset7);
614
615 DBCommon::RemoveDuplicateAssetsData(assets);
616
617 /**
618 * @tc.steps: step2. check data
619 * @tc.expected: step2. E_OK
620 */
621 std::string assetNameArr[] = {"assetName2", "assetName3", "assetName1"};
622 std::string assetIdArr[] = {"3", "5", "6"};
623 EXPECT_EQ(assets.size(), 3u);
624 for (std::vector<DistributedDB::Asset>::size_type i = 0; i < assets.size(); ++i) {
625 EXPECT_EQ(assets.at(i).name, assetNameArr[i]);
626 EXPECT_EQ(assets.at(i).assetId, assetIdArr[i]);
627 }
628 }
629
630 /**
631 * @tc.name: CloudDBProxyTest014
632 * @tc.desc: Test asset deduplication with empty assetId.
633 * @tc.type: FUNC
634 * @tc.require:
635 * @tc.author: liaoyonghuang
636 */
637 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest014, TestSize.Level0)
638 {
639 /**
640 * @tc.steps: step1. set cloud db to proxy
641 * @tc.expected: step1. E_OK
642 */
643 Assets assets;
644 Asset asset1;
645 asset1.name = "assetName";
646 asset1.assetId = "";
647 asset1.modifyTime = "1";
648 assets.push_back(asset1);
649
650 Asset asset2;
651 asset2.name = "assetName";
652 asset2.assetId = "";
653 asset2.modifyTime = "3";
654 assets.push_back(asset2);
655
656 Asset asset3;
657 asset3.name = "assetName";
658 asset3.assetId = "";
659 asset3.modifyTime = "2";
660 assets.push_back(asset3);
661
662 /**
663 * @tc.steps: step2. Remove duplicate assets and check data
664 * @tc.expected: step2. E_OK
665 */
666 DBCommon::RemoveDuplicateAssetsData(assets);
667 ASSERT_EQ(assets.size(), 1u);
668 EXPECT_EQ(assets[0].modifyTime, "3");
669 }
670
671 /**
672 * @tc.name: CloudSyncQueue001
673 * @tc.desc: Verify sync task count decrease after sync finished.
674 * @tc.type: FUNC
675 * @tc.require:
676 * @tc.author: zhangqiquan
677 */
678 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
679 {
680 /**
681 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
682 * @tc.expected: step1. E_OK
683 */
684 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
685 ASSERT_NE(iCloud, nullptr);
686 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
687 ASSERT_NE(cloudSyncer, nullptr);
688 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
689 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
690 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
691 cloudSyncer->SetCloudDB(virtualCloudDb_);
692 cloudSyncer->SetSyncAction(true, false);
__anon8a9b21740902() 693 cloudSyncer->SetDownloadFunc([cloudSyncer]() {
694 EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
695 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
696 return E_OK;
697 });
698 /**
699 * @tc.steps: step2. call sync and wait sync finish
700 */
701 int callCount = 0;
702 EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
703 RuntimeContext::GetInstance()->StopTaskPool();
704 EXPECT_EQ(callCount, 1);
705 RefObject::KillAndDecObjRef(cloudSyncer);
706 }
707
708 /**
709 * @tc.name: CloudSyncQueue002
710 * @tc.desc: Verify sync task abort after close.
711 * @tc.type: FUNC
712 * @tc.require:
713 * @tc.author: zhangqiquan
714 */
715 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue002, TestSize.Level2)
716 {
717 /**
718 * @tc.steps: step1. set cloud db to proxy and sleep 2s when download
719 * @tc.expected: step1. E_OK
720 */
721 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
722 ASSERT_NE(iCloud, nullptr);
723 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
724 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
725 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
726 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
727 ASSERT_NE(cloudSyncer, nullptr);
728 cloudSyncer->SetCloudDB(virtualCloudDb_);
729 cloudSyncer->SetSyncAction(true, false);
730 std::atomic<bool> close = false;
__anon8a9b21740a02() 731 cloudSyncer->SetDownloadFunc([cloudSyncer, &close]() {
732 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
733 cloudSyncer->PauseCurrentTask();
734 EXPECT_TRUE(close);
735 return -E_TASK_PAUSED;
736 });
737 /**
738 * @tc.steps: step2. call sync and wait sync finish
739 */
740 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, nullptr, 0), E_OK);
741 std::this_thread::sleep_for(std::chrono::seconds(1));
742 close = true;
743 cloudSyncer->Close();
744 RuntimeContext::GetInstance()->StopTaskPool();
745 RefObject::KillAndDecObjRef(cloudSyncer);
746 }
747
748 /**
749 * @tc.name: CloudSyncerTest001
750 * @tc.desc: Verify syncer notify by queue schedule.
751 * @tc.type: FUNC
752 * @tc.require:
753 * @tc.author: zhangqiquan
754 */
755 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
756 {
757 /**
758 * @tc.steps: step1. set cloud db to proxy
759 * @tc.expected: step1. E_OK
760 */
761 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
762 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
763 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
764 EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
765 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
766 std::atomic<int> callCount = 0;
767 std::condition_variable cv;
__anon8a9b21740b02(const std::map<std::string, SyncProcess> &) 768 cloudSyncer->SetCurrentTaskInfo([&callCount, &cv](const std::map<std::string, SyncProcess> &) {
769 callCount++;
770 int before = callCount;
771 LOGD("on callback %d", before);
772 std::this_thread::sleep_for(std::chrono::seconds(1));
773 EXPECT_EQ(before, callCount);
774 cv.notify_all();
775 }, 1u);
776 const int notifyCount = 2;
777 for (int i = 0; i < notifyCount; ++i) {
778 cloudSyncer->Notify();
779 }
780 cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
781 std::mutex processMutex;
782 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon8a9b21740c02() 783 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&callCount]() {
784 return callCount == notifyCount;
785 });
786 cloudSyncer->Close();
787 RefObject::KillAndDecObjRef(cloudSyncer);
788 }
789
790 /**
791 * @tc.name: SameBatchTest001
792 * @tc.desc: Verify update cache in same batch.
793 * @tc.type: FUNC
794 * @tc.require:
795 * @tc.author: zhangqiquan
796 */
797 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest001, TestSize.Level0)
798 {
799 std::map<std::string, LogInfo> localLogInfoCache;
800 LogInfo cloudInfo;
801 LogInfo localInfo;
802 localInfo.hashKey = {'k'};
803 cloudInfo.cloudGid = "gid";
804 /**
805 * @tc.steps: step1. insert cloud into local
806 * @tc.expected: step1. local cache has gid
807 */
808 CloudSyncUtils::UpdateLocalCache(OpType::INSERT, cloudInfo, localInfo, localLogInfoCache);
809 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
810 EXPECT_EQ(localLogInfoCache[hashKey].cloudGid, cloudInfo.cloudGid);
811 /**
812 * @tc.steps: step2. delete local
813 * @tc.expected: step2. local flag is delete
814 */
815 CloudSyncUtils::UpdateLocalCache(OpType::DELETE, cloudInfo, localInfo, localLogInfoCache);
816 EXPECT_EQ(localLogInfoCache[hashKey].flag, static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE));
817 }
818
819 /**
820 * @tc.name: SameBatchTest002
821 * @tc.desc: Verify cal opType in same batch.
822 * @tc.type: FUNC
823 * @tc.require:
824 * @tc.author: zhangqiquan
825 */
826 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest002, TestSize.Level0)
827 {
828 /**
829 * @tc.steps: step1. prepare two data with same pk
830 */
831 ICloudSyncer::SyncParam param;
832 param.downloadData.opType.push_back(OpType::INSERT);
833 param.downloadData.opType.push_back(OpType::UPDATE);
834 const std::string pkField = "pk";
835 param.changedData.field.push_back(pkField);
836 VBucket oneRow;
837 oneRow[pkField] = static_cast<int64_t>(1); // 1 is pk
838 param.downloadData.data.push_back(oneRow);
839 param.downloadData.data.push_back(oneRow);
840 /**
841 * @tc.steps: step2. cal opType by utils
842 * @tc.expected: step2. all type should be INSERT
843 */
844 for (size_t i = 0; i < param.downloadData.data.size(); ++i) {
845 EXPECT_EQ(CloudSyncUtils::CalOpType(param, i), OpType::INSERT);
846 }
847 /**
848 * @tc.steps: step3. cal opType by utils
849 * @tc.expected: step3. should be UPDATE because diff pk
850 */
851 oneRow[pkField] = static_cast<int64_t>(2); // 2 is pk
852 param.downloadData.data.push_back(oneRow);
853 param.downloadData.opType.push_back(OpType::UPDATE);
854 // index start with zero
855 EXPECT_EQ(CloudSyncUtils::CalOpType(param, param.downloadData.data.size() - 1), OpType::UPDATE);
856 }
857
858 /**
859 * @tc.name: CloudDBProxyTest013
860 * @tc.desc: Verify CloudDBProxy interfaces.
861 * @tc.type: FUNC
862 * @tc.require:
863 * @tc.author: suyue
864 */
865 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest013, TestSize.Level0)
866 {
867 /**
868 * @tc.steps: step1. call CloudDBProxy interfaces when ICloudDb is nullptr.
869 * @tc.expected: step1. return -E_CLOUD_ERROR.
870 */
871 CloudDBProxy proxy;
872 int ret = proxy.UnLock();
873 EXPECT_EQ(ret, -E_CLOUD_ERROR);
874 ret = proxy.HeartBeat();
875 EXPECT_EQ(ret, -E_CLOUD_ERROR);
876 VBucket extend;
877 const std::string tableName = "test";
878 std::vector<VBucket> record;
879 ret = proxy.Query(tableName, extend, record);
880 EXPECT_EQ(ret, -E_CLOUD_ERROR);
881 Info info;
882 uint32_t retryCount = 0;
883 ret = proxy.BatchInsert(tableName, record, record, info, retryCount);
884 EXPECT_EQ(ret, -E_CLOUD_ERROR);
885 ret = proxy.BatchUpdate(tableName, record, record, info, retryCount);
886 EXPECT_EQ(ret, -E_CLOUD_ERROR);
887 ret = proxy.BatchDelete(tableName, record, record, info, retryCount);
888 EXPECT_EQ(ret, -E_CLOUD_ERROR);
889 std::pair<int, uint64_t> res = proxy.Lock();
890 EXPECT_EQ(res.first, -E_CLOUD_ERROR);
891 std::pair<int, std::string> cursor = proxy.GetEmptyCursor(tableName);
892 EXPECT_EQ(cursor.first, -E_CLOUD_ERROR);
893
894 /**
895 * @tc.steps: step2. call CloudDBProxy interfaces when para is err.
896 * @tc.expected: step2. return fail.
897 */
898 std::pair<int, std::string> ver = proxy.GetCloudVersion("test");
899 EXPECT_EQ(ver.first, -E_NOT_SUPPORT);
900 std::vector<Asset> assets;
901 ret = proxy.RemoveLocalAssets(assets);
902 EXPECT_EQ(ret, -E_OK);
903 assets = {{}};
904 ret = proxy.RemoveLocalAssets(assets);
905 EXPECT_EQ(ret, -E_OK);
906 }
907
908 /**
909 * @tc.name: CloudDBProxyTest015
910 * @tc.desc: Verify CloudDBProxy BatchDownload interfaces.
911 * @tc.type: FUNC
912 * @tc.require:
913 * @tc.author: liuhongyang
914 */
915 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest015, TestSize.Level0)
916 {
917 CloudDBProxy proxy;
918
919 const Asset a1 = {
920 .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
921 .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
922 };
923 const Asset a2 = {
924 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
925 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
926 };
927 Assets assets1;
928 Assets assets2;
929 assets1.push_back(a1);
930 assets2.push_back(a2);
931
932 IAssetLoader::AssetRecord emptyR1 = {"r1", "pre"};
933 IAssetLoader::AssetRecord nonEmptyR2 = {"r2", "pre", {{"a1", assets1}}};
934 IAssetLoader::AssetRecord emptyR3 = {"r3", "pre"};
935 IAssetLoader::AssetRecord nonEmptyR4 = {"r4", "pre", {{"a2", assets2}}};
936 size_t uintExpected = 0;
937 /**
938 * @tc.steps: step1. call CloudDBProxy BatchDownload when iAssetLoader_ is nullptr and no records has assets
939 * @tc.expected: step1. return E_OK.
940 */
941 std::vector<IAssetLoader::AssetRecord> downloadAssets;
942 int ret = proxy.BatchDownload(TABLE_NAME, downloadAssets);
943 EXPECT_EQ(downloadAssets.size(), uintExpected);
944 EXPECT_EQ(ret, E_OK);
945
946 downloadAssets.push_back(emptyR1);
947 ret = proxy.BatchDownload(TABLE_NAME, downloadAssets);
948 uintExpected = 1;
949 EXPECT_EQ(downloadAssets.size(), uintExpected);
950 EXPECT_EQ(ret, E_OK);
951
952 /**
953 * @tc.steps: step2. call CloudDBProxy BatchDownload when iAssetLoader_ is nullptr and some records has assets
954 * @tc.expected: step2. return -E_NOT_SET.
955 */
956 downloadAssets.push_back(nonEmptyR2);
957 ret = proxy.BatchDownload(TABLE_NAME, downloadAssets);
958 uintExpected = 2;
959 EXPECT_EQ(downloadAssets.size(), uintExpected);
960 EXPECT_EQ(ret, -E_NOT_SET);
961
962 /**
963 * @tc.steps: step3. call CloudDBProxy BatchDownload and make iAssetLoader_ change the assets and status
964 * @tc.expected: step3. return E_OK, and status are changed in original vectors
965 */
966 int totalRecordsUsed = 0; // number of records passed to loader
967 std::shared_ptr<VirtualAssetLoader> virtialAssetLoader = make_shared<VirtualAssetLoader>();
__anon8a9b21740d02(int rowIndex, std::map<std::string, Assets> &assets) 968 virtialAssetLoader->ForkBatchDownload([&totalRecordsUsed](int rowIndex, std::map<std::string, Assets> &assets) {
969 totalRecordsUsed++;
970 for (auto &asset : assets) {
971 if (asset.first == "a1") {
972 return DB_ERROR;
973 }
974 if (asset.first == "a2") {
975 asset.second[0].version = 3;
976 return NOT_FOUND;
977 }
978 }
979 return OK;
980 });
981 proxy.SetIAssetLoader(virtialAssetLoader);
982 downloadAssets.push_back(emptyR3);
983 downloadAssets.push_back(nonEmptyR4);
984 uintExpected = 4;
985 EXPECT_EQ(downloadAssets.size(), uintExpected);
986
987 ret = proxy.BatchDownload(TABLE_NAME, downloadAssets);
988 EXPECT_EQ(ret, E_OK);
989 EXPECT_EQ(virtialAssetLoader->GetBatchDownloadCount(), 1u);
990 EXPECT_EQ(totalRecordsUsed, 2);
991 EXPECT_EQ(downloadAssets[0].status, OK);
992 EXPECT_EQ(downloadAssets[1].status, DB_ERROR);
993 EXPECT_EQ(downloadAssets[2].status, OK);
994 EXPECT_EQ(downloadAssets[3].status, NOT_FOUND);
995 uintExpected = 3;
996 EXPECT_EQ(downloadAssets[3].assets["a2"][0].version, uintExpected);
997 }
998
999 /**
1000 * @tc.name: CloudDBProxyTest016
1001 * @tc.desc: Verify cancel download
1002 * @tc.type: FUNC
1003 * @tc.require:
1004 * @tc.author: zqq
1005 */
1006 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest016, TestSize.Level0)
1007 {
1008 auto proxy = std::make_shared<CloudDBProxy>();
1009 auto loader = make_shared<VirtualAssetLoader>();
1010 proxy->SetIAssetLoader(loader);
1011 proxy->CancelDownload();
1012 EXPECT_EQ(loader->GetCancelCount(), 0u);
1013 }
1014
1015 /**
1016 * @tc.name: CloudSyncUtilsTest
1017 * @tc.desc: Verify CloudSyncUtils interfaces
1018 * @tc.type: FUNC
1019 * @tc.require:
1020 * @tc.author: suyue
1021 */
1022 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncUtilsTest, TestSize.Level0)
1023 {
1024 /**
1025 * @tc.steps: step1. Test type translation interfaces.
1026 * @tc.expected: step1. success.
1027 */
1028 CloudSyncUtils utilsObj;
1029 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::INSERT), AssetOpType::INSERT);
1030 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DELETE), AssetOpType::DELETE);
1031 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::UPDATE), AssetOpType::UPDATE);
1032 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::NORMAL), AssetOpType::NO_CHANGE);
1033 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DOWNLOADING), AssetOpType::NO_CHANGE);
1034 EXPECT_EQ(utilsObj.OpTypeToChangeType(OpType::ONLY_UPDATE_GID), ChangeType::OP_BUTT);
1035
1036 /**
1037 * @tc.steps: step2. call CloudSyncUtils interfaces when para is err.
1038 * @tc.expected: step2. return false.
1039 */
1040 const std::vector<DeviceID> devices = {"test"};
1041 int mode = 10; // set metaMode to 10 not in enum class MetaMode
1042 int ret = utilsObj.CheckParamValid(devices, static_cast<SyncMode>(mode));
1043 EXPECT_EQ(ret, -E_INVALID_ARGS);
1044 VBucket record;
1045 const std::vector<std::string> pkColNames;
1046 std::vector<Type> cloudPkVals = {{}};
1047 ret = utilsObj.GetCloudPkVals(record, pkColNames, 0, cloudPkVals);
1048 EXPECT_EQ(ret, -E_INVALID_ARGS);
1049 Assets assets = {{}};
1050 utilsObj.StatusToFlagForAssets(assets);
1051 std::vector<Field> fields = {{"test", TYPE_INDEX<Assets>, true, true}};
1052 utilsObj.StatusToFlagForAssetsInRecord(fields, record);
1053 Timestamp timestamp;
1054 CloudSyncData uploadData;
1055 const int64_t count = 0;
1056 ret = utilsObj.UpdateExtendTime(uploadData, count, 0, timestamp);
1057 EXPECT_EQ(ret, -E_INTERNAL_ERROR);
1058 CloudSyncBatch data;
1059 data.assets = {{}};
1060 ret = utilsObj.FillAssetIdToAssets(data, 0, CloudWaterType::UPDATE);
1061 EXPECT_EQ(ret, -E_CLOUD_ERROR);
1062
1063 /**
1064 * @tc.steps: step3. call IsChangeDataEmpty interface when para is different.
1065 * @tc.expected: step3. success.
1066 */
1067 ChangedData changedData;
1068 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
1069 changedData.primaryData[OP_INSERT] = {{}};
1070 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
1071 changedData.primaryData[OP_UPDATE] = {{}};
1072 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
1073 changedData.primaryData[OP_DELETE] = {{}};
1074 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), false);
1075 }
1076 }