1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include <utility>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_data_utils.h"
20 #include "cloud/cloud_db_proxy.h"
21 #include "cloud/cloud_db_types.h"
22 #include "cloud/cloud_sync_utils.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_errno.h"
25 #include "mock_icloud_sync_storage_interface.h"
26 #include "virtual_cloud_db.h"
27 #include "virtual_cloud_syncer.h"
28
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32
33 namespace {
34 constexpr const char *TABLE_NAME = "Table";
GetFields()35 std::vector<Field> GetFields()
36 {
37 return {
38 {
39 .colName = "col1",
40 .type = TYPE_INDEX<int64_t>,
41 .primary = true,
42 .nullable = false
43 },
44 {
45 .colName = "col2",
46 .type = TYPE_INDEX<std::string>,
47 .primary = false
48 },
49 {
50 .colName = "col3",
51 .type = TYPE_INDEX<Bytes>,
52 .primary = false
53 }
54 };
55 }
56
ModifyRecords(std::vector<VBucket> & expectRecord)57 void ModifyRecords(std::vector<VBucket> &expectRecord)
58 {
59 std::vector<VBucket> tempRecord;
60 for (const auto &record: expectRecord) {
61 VBucket bucket;
62 for (auto &[field, val] : record) {
63 LOGD("modify field %s", field.c_str());
64 if (val.index() == TYPE_INDEX<int64_t>) {
65 int64_t v = std::get<int64_t>(val);
66 bucket.insert({ field, static_cast<int64_t>(v + 1) });
67 } else {
68 bucket.insert({ field, val });
69 }
70 }
71 tempRecord.push_back(bucket);
72 }
73 expectRecord = tempRecord;
74 }
75
Sync(CloudSyncer * cloudSyncer,int & callCount)76 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
77 {
78 std::mutex processMutex;
79 std::condition_variable cv;
80 SyncProcess syncProcess;
81 const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
82 const std::map<std::string, SyncProcess> &process) {
83 {
84 std::lock_guard<std::mutex> autoLock(processMutex);
85 syncProcess = process.begin()->second;
86 if (!process.empty()) {
87 syncProcess = process.begin()->second;
88 } else {
89 SyncProcess tmpProcess;
90 syncProcess = tmpProcess;
91 }
92 callCount++;
93 }
94 cv.notify_all();
95 };
96 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
97 {
98 LOGI("begin to wait sync");
99 std::unique_lock<std::mutex> uniqueLock(processMutex);
100 cv.wait(uniqueLock, [&syncProcess]() {
101 return syncProcess.process == ProcessStatus::FINISHED;
102 });
103 LOGI("end to wait sync");
104 }
105 return syncProcess.errCode;
106 }
107
108 class DistributedDBCloudDBProxyTest : public testing::Test {
109 public:
110 static void SetUpTestCase();
111 static void TearDownTestCase();
112 void SetUp() override;
113 void TearDown() override;
114
115 protected:
116 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
117 };
118
SetUpTestCase()119 void DistributedDBCloudDBProxyTest::SetUpTestCase()
120 {
121 }
122
TearDownTestCase()123 void DistributedDBCloudDBProxyTest::TearDownTestCase()
124 {
125 }
126
SetUp()127 void DistributedDBCloudDBProxyTest::SetUp()
128 {
129 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
130 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
131 }
132
TearDown()133 void DistributedDBCloudDBProxyTest::TearDown()
134 {
135 virtualCloudDb_ = nullptr;
136 }
137
138 /**
139 * @tc.name: CloudDBProxyTest001
140 * @tc.desc: Verify cloud db init and close function.
141 * @tc.type: FUNC
142 * @tc.require:
143 * @tc.author: zhangqiquan
144 */
145 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
146 {
147 /**
148 * @tc.steps: step1. set cloud db to proxy
149 * @tc.expected: step1. E_OK
150 */
151 CloudDBProxy proxy;
152 proxy.SetCloudDB(virtualCloudDb_);
153 /**
154 * @tc.steps: step2. proxy close cloud db with cloud error
155 * @tc.expected: step2. -E_CLOUD_ERROR
156 */
157 virtualCloudDb_->SetCloudError(true);
158 EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
159 /**
160 * @tc.steps: step3. proxy close cloud db again
161 * @tc.expected: step3. E_OK because cloud db has been set nullptr
162 */
163 EXPECT_EQ(proxy.Close(), E_OK);
164 virtualCloudDb_->SetCloudError(false);
165 EXPECT_EQ(proxy.Close(), E_OK);
166 }
167
168 /**
169 * @tc.name: CloudDBProxyTest002
170 * @tc.desc: Verify cloud db insert function.
171 * @tc.type: FUNC
172 * @tc.require:
173 * @tc.author: zhangqiquan
174 */
175 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
176 {
177 /**
178 * @tc.steps: step1. set cloud db to proxy
179 * @tc.expected: step1. E_OK
180 */
181 CloudDBProxy proxy;
182 proxy.SetCloudDB(virtualCloudDb_);
183 /**
184 * @tc.steps: step2. insert data to cloud db
185 * @tc.expected: step2. OK
186 */
187 TableSchema schema = {
188 .name = TABLE_NAME,
189 .sharedTableName = "",
190 .fields = GetFields()
191 };
192 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
193 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
194 Info uploadInfo;
195 std::vector<VBucket> insert = expectRecords;
196 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
197
198 VBucket extend;
199 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
200 std::vector<VBucket> actualRecords;
201 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
202 /**
203 * @tc.steps: step3. proxy query data
204 * @tc.expected: step3. data is equal to expect
205 */
206 ASSERT_EQ(actualRecords.size(), expectRecords.size());
207 for (size_t i = 0; i < actualRecords.size(); ++i) {
208 for (const auto &field: schema.fields) {
209 Type expect = expectRecords[i][field.colName];
210 Type actual = actualRecords[i][field.colName];
211 EXPECT_EQ(expect.index(), actual.index());
212 }
213 }
214 /**
215 * @tc.steps: step4. proxy close cloud db
216 * @tc.expected: step4. E_OK
217 */
218 EXPECT_EQ(proxy.Close(), E_OK);
219 }
220
221 /**
222 * @tc.name: CloudDBProxyTest003
223 * @tc.desc: Verify cloud db update function.
224 * @tc.type: FUNC
225 * @tc.require:
226 * @tc.author: zhangqiquan
227 */
228 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
229 {
230 TableSchema schema = {
231 .name = TABLE_NAME,
232 .sharedTableName = "",
233 .fields = GetFields()
234 };
235 /**
236 * @tc.steps: step1. set cloud db to proxy
237 * @tc.expected: step1. E_OK
238 */
239 CloudDBProxy proxy;
240 proxy.SetCloudDB(virtualCloudDb_);
241 /**
242 * @tc.steps: step2. insert data to cloud db
243 * @tc.expected: step2. OK
244 */
245 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
246 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
247 Info uploadInfo;
248 std::vector<VBucket> insert = expectRecords;
249 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
250 /**
251 * @tc.steps: step3. update data to cloud db
252 * @tc.expected: step3. E_OK
253 */
254 ModifyRecords(expectRecords);
255 std::vector<VBucket> update = expectRecords;
256 EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), E_OK);
257 /**
258 * @tc.steps: step3. proxy close cloud db
259 * @tc.expected: step3. E_OK
260 */
261 VBucket extend;
262 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
263 std::vector<VBucket> actualRecords;
264 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
265 ASSERT_EQ(actualRecords.size(), expectRecords.size());
266 for (size_t i = 0; i < actualRecords.size(); ++i) {
267 for (const auto &field: schema.fields) {
268 Type expect = expectRecords[i][field.colName];
269 Type actual = actualRecords[i][field.colName];
270 EXPECT_EQ(expect.index(), actual.index());
271 }
272 }
273 /**
274 * @tc.steps: step4. proxy close cloud db
275 * @tc.expected: step4. E_OK
276 */
277 EXPECT_EQ(proxy.Close(), E_OK);
278 }
279
280 /**
281 * @tc.name: CloudDBProxyTest005
282 * @tc.desc: Verify sync failed after cloud error.
283 * @tc.type: FUNC
284 * @tc.require:
285 * @tc.author: zhangqiquan
286 */
287 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
288 {
289 /**
290 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
291 * @tc.expected: step1. E_OK
292 */
293 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
294 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
295 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
296 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
297 ASSERT_NE(cloudSyncer, nullptr);
298 cloudSyncer->SetCloudDB(virtualCloudDb_);
299 cloudSyncer->SetSyncAction(true, false);
300 virtualCloudDb_->SetCloudError(true);
301 /**
302 * @tc.steps: step2. call sync and wait sync finish
303 * @tc.expected: step2. CLOUD_ERROR by lock error
304 */
305 int callCount = 0;
306 EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
307 /**
308 * @tc.steps: step3. get cloud lock status and heartbeat count
309 * @tc.expected: step3. cloud is unlock and no heartbeat
310 */
311 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
312 EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
313 virtualCloudDb_->ClearHeartbeatCount();
314 cloudSyncer->Close();
315 RefObject::KillAndDecObjRef(cloudSyncer);
316 }
317
318 /**
319 * @tc.name: CloudDBProxyTest008
320 * @tc.desc: Verify cloud db heartbeat with diff status.
321 * @tc.type: FUNC
322 * @tc.require:
323 * @tc.author: zhangqiquan
324 */
325 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
326 {
327 /**
328 * @tc.steps: step1. set cloud db to proxy
329 * @tc.expected: step1. E_OK
330 */
331 CloudDBProxy proxy;
332 proxy.SetCloudDB(virtualCloudDb_);
333 /**
334 * @tc.steps: step2. proxy heartbeat with diff status
335 */
336 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
337 int errCode = proxy.HeartBeat();
338 EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
339 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
340
341 virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
342 errCode = proxy.HeartBeat();
343 EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
344 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
345
346 virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
347 errCode = proxy.HeartBeat();
348 EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
349 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
350
351 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
352 errCode = proxy.HeartBeat();
353 EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
354 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
355
356 virtualCloudDb_->SetActionStatus(DB_ERROR);
357 errCode = proxy.HeartBeat();
358 EXPECT_EQ(errCode, -E_CLOUD_ERROR);
359 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
360
361 /**
362 * @tc.steps: step3. proxy close cloud db
363 * @tc.expected: step3. E_OK
364 */
365 EXPECT_EQ(proxy.Close(), E_OK);
366 }
367
368 /**
369 * @tc.name: CloudDBProxyTest009
370 * @tc.desc: Verify cloud db closed and current task exit .
371 * @tc.type: FUNC
372 * @tc.require:
373 * @tc.author: zhangqiquan
374 */
375 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
376 {
377 /**
378 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
379 * @tc.expected: step1. E_OK
380 */
381 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
382 ASSERT_NE(iCloud, nullptr);
383 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
384 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
385 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
386 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
387 ASSERT_NE(cloudSyncer, nullptr);
388 cloudSyncer->SetCloudDB(virtualCloudDb_);
389 cloudSyncer->SetSyncAction(true, false);
__anon130c91520402() 390 cloudSyncer->SetDownloadFunc([]() {
391 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
392 return -E_CLOUD_ERROR;
393 });
394 /**
395 * @tc.steps: step2. call sync and wait sync finish
396 * @tc.expected: step2. E_OK
397 */
398 std::mutex processMutex;
399 bool finished = false;
400 std::condition_variable cv;
401 LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon130c91520502(const std::map<std::string, SyncProcess> &process) 402 const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
403 {
404 std::lock_guard<std::mutex> autoLock(processMutex);
405 for (const auto &item: process) {
406 if (item.second.process == DistributedDB::FINISHED) {
407 finished = true;
408 EXPECT_EQ(item.second.errCode, DB_CLOSED);
409 }
410 }
411 }
412 cv.notify_all();
413 };
414 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
415 std::this_thread::sleep_for(std::chrono::seconds(1));
416 cloudSyncer->Close();
417 {
418 LOGI("[CloudDBProxyTest009] begin to wait sync");
419 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon130c91520602() 420 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finished]() {
421 return finished;
422 });
423 LOGI("[CloudDBProxyTest009] end to wait sync");
424 }
425 RefObject::KillAndDecObjRef(cloudSyncer);
426 }
427
428 /**
429 * @tc.name: CloudDBProxyTest010
430 * @tc.desc: Verify cloud db lock with diff status.
431 * @tc.type: FUNC
432 * @tc.require:
433 * @tc.author: zhangqiquan
434 */
435 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
436 {
437 /**
438 * @tc.steps: step1. set cloud db to proxy
439 * @tc.expected: step1. E_OK
440 */
441 CloudDBProxy proxy;
442 proxy.SetCloudDB(virtualCloudDb_);
443 /**
444 * @tc.steps: step2. proxy lock with diff status
445 */
446 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
447 auto ret = proxy.Lock();
448 EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
449 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
450
451 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
452 ret = proxy.Lock();
453 EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
454 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
455 /**
456 * @tc.steps: step3. proxy close cloud db
457 * @tc.expected: step3. E_OK
458 */
459 EXPECT_EQ(proxy.Close(), E_OK);
460 }
461
462 /**
463 * @tc.name: CloudDBProxyTest008
464 * @tc.desc: Verify cloud db heartbeat with diff status.
465 * @tc.type: FUNC
466 * @tc.require:
467 * @tc.author: zhangqiquan
468 */
469 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest011, TestSize.Level2)
470 {
471 /**
472 * @tc.steps: step1. set cloud db to proxy
473 * @tc.expected: step1. E_OK
474 */
475 CloudDBProxy proxy;
476 proxy.SetCloudDB(virtualCloudDb_);
477 virtualCloudDb_->SetHeartbeatBlockTime(100); // block 100 ms
478 std::mutex waitMutex;
479 std::condition_variable waitCv;
480 const int scheduleCount = 12;
481 int currentCount = 0;
482 for (int i = 0; i < scheduleCount; ++i) {
__anon130c91520702() 483 RuntimeContext::GetInstance()->ScheduleTask([&proxy, &waitMutex, &waitCv, ¤tCount]() {
484 proxy.HeartBeat();
485 {
486 std::lock_guard<std::mutex> autoLock(waitMutex);
487 currentCount++;
488 LOGI("[CloudDBProxyTest011] CurrentCount %d", currentCount);
489 }
490 waitCv.notify_all();
491 });
492 }
493 LOGI("[CloudDBProxyTest011] Begin wait all task finish");
494 std::unique_lock<std::mutex> uniqueLock(waitMutex);
__anon130c91520802() 495 waitCv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [¤tCount, scheduleCount]() {
496 return currentCount >= scheduleCount;
497 });
498 LOGI("[CloudDBProxyTest011] End wait all task finish");
499 EXPECT_EQ(currentCount, scheduleCount);
500 }
501
502 /**
503 * @tc.name: CloudDBProxyTest012
504 * @tc.desc: Asset data deduplication.
505 * @tc.type: FUNC
506 * @tc.require:
507 * @tc.author: tankaisheng
508 */
509 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest012, TestSize.Level2)
510 {
511 /**
512 * @tc.steps: step1. set cloud db to proxy
513 * @tc.expected: step1. E_OK
514 */
515 Assets assets;
516 Asset asset1;
517 asset1.name = "assetName1";
518 asset1.assetId = "";
519 asset1.modifyTime = "20240730";
520 assets.push_back(asset1);
521
522 Asset asset2;
523 asset2.name = "assetName1";
524 asset2.assetId = "123";
525 asset2.modifyTime = "20240730";
526 assets.push_back(asset2);
527
528 Asset asset3;
529 asset3.name = "assetName2";
530 asset3.assetId = "456";
531 asset3.modifyTime = "20240730";
532 assets.push_back(asset3);
533
534 Asset asset4;
535 asset4.name = "assetName2";
536 asset4.assetId = "789";
537 asset4.modifyTime = "20240731";
538 assets.push_back(asset4);
539
540 Asset asset5;
541 asset5.name = "assetName3";
542 asset5.assetId = "123";
543 asset5.modifyTime = "20240730";
544 assets.push_back(asset5);
545
546 Asset asset6;
547 asset6.name = "assetName3";
548 asset6.assetId = "789";
549 asset6.modifyTime = "20240730";
550 assets.push_back(asset6);
551
552 Asset asset7;
553 asset7.name = "assetName1";
554 asset7.assetId = "456";
555 asset7.modifyTime = "20240731";
556 assets.push_back(asset7);
557
558 DBCommon::RemoveDuplicateAssetsData(assets);
559
560 /**
561 * @tc.steps: step2. check data
562 * @tc.expected: step2. E_OK
563 */
564 std::string assetNameArr[] = {"assetName2", "assetName3", "assetName1"};
565 std::string assetIdArr[] = {"789", "123", "456"};
566 EXPECT_EQ(assets.size(), 3u);
567 for (std::vector<DistributedDB::Asset>::size_type i = 0; i < assets.size(); ++i) {
568 EXPECT_EQ(assets.at(i).name, assetNameArr[i]);
569 EXPECT_EQ(assets.at(i).assetId, assetIdArr[i]);
570 }
571 }
572
573 /**
574 * @tc.name: CloudSyncQueue001
575 * @tc.desc: Verify sync task count decrease after sync finished.
576 * @tc.type: FUNC
577 * @tc.require:
578 * @tc.author: zhangqiquan
579 */
580 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
581 {
582 /**
583 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
584 * @tc.expected: step1. E_OK
585 */
586 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
587 ASSERT_NE(iCloud, nullptr);
588 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
589 ASSERT_NE(cloudSyncer, nullptr);
590 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
591 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
592 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
593 cloudSyncer->SetCloudDB(virtualCloudDb_);
594 cloudSyncer->SetSyncAction(true, false);
__anon130c91520902() 595 cloudSyncer->SetDownloadFunc([cloudSyncer]() {
596 EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
597 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
598 return E_OK;
599 });
600 /**
601 * @tc.steps: step2. call sync and wait sync finish
602 */
603 int callCount = 0;
604 EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
605 RuntimeContext::GetInstance()->StopTaskPool();
606 EXPECT_EQ(callCount, 1);
607 RefObject::KillAndDecObjRef(cloudSyncer);
608 }
609
610 /**
611 * @tc.name: CloudSyncQueue002
612 * @tc.desc: Verify sync task abort after close.
613 * @tc.type: FUNC
614 * @tc.require:
615 * @tc.author: zhangqiquan
616 */
617 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue002, TestSize.Level2)
618 {
619 /**
620 * @tc.steps: step1. set cloud db to proxy and sleep 2s when download
621 * @tc.expected: step1. E_OK
622 */
623 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
624 ASSERT_NE(iCloud, nullptr);
625 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
626 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
627 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
628 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
629 ASSERT_NE(cloudSyncer, nullptr);
630 cloudSyncer->SetCloudDB(virtualCloudDb_);
631 cloudSyncer->SetSyncAction(true, false);
632 std::atomic<bool> close = false;
__anon130c91520a02() 633 cloudSyncer->SetDownloadFunc([cloudSyncer, &close]() {
634 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
635 cloudSyncer->PauseCurrentTask();
636 EXPECT_TRUE(close);
637 return -E_TASK_PAUSED;
638 });
639 /**
640 * @tc.steps: step2. call sync and wait sync finish
641 */
642 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, nullptr, 0), E_OK);
643 std::this_thread::sleep_for(std::chrono::seconds(1));
644 close = true;
645 cloudSyncer->Close();
646 RuntimeContext::GetInstance()->StopTaskPool();
647 RefObject::KillAndDecObjRef(cloudSyncer);
648 }
649
650 /**
651 * @tc.name: CloudSyncerTest001
652 * @tc.desc: Verify syncer notify by queue schedule.
653 * @tc.type: FUNC
654 * @tc.require:
655 * @tc.author: zhangqiquan
656 */
657 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
658 {
659 /**
660 * @tc.steps: step1. set cloud db to proxy
661 * @tc.expected: step1. E_OK
662 */
663 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
664 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
665 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
666 EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
667 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
668 std::atomic<int> callCount = 0;
669 std::condition_variable cv;
__anon130c91520b02(const std::map<std::string, SyncProcess> &) 670 cloudSyncer->SetCurrentTaskInfo([&callCount, &cv](const std::map<std::string, SyncProcess> &) {
671 callCount++;
672 int before = callCount;
673 LOGD("on callback %d", before);
674 std::this_thread::sleep_for(std::chrono::seconds(1));
675 EXPECT_EQ(before, callCount);
676 cv.notify_all();
677 }, 1u);
678 const int notifyCount = 2;
679 for (int i = 0; i < notifyCount; ++i) {
680 cloudSyncer->Notify();
681 }
682 cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
683 std::mutex processMutex;
684 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon130c91520c02() 685 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&callCount]() {
686 return callCount == notifyCount;
687 });
688 cloudSyncer->Close();
689 RefObject::KillAndDecObjRef(cloudSyncer);
690 }
691
692 /**
693 * @tc.name: SameBatchTest001
694 * @tc.desc: Verify update cache in same batch.
695 * @tc.type: FUNC
696 * @tc.require:
697 * @tc.author: zhangqiquan
698 */
699 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest001, TestSize.Level0)
700 {
701 std::map<std::string, LogInfo> localLogInfoCache;
702 LogInfo cloudInfo;
703 LogInfo localInfo;
704 localInfo.hashKey = {'k'};
705 cloudInfo.cloudGid = "gid";
706 /**
707 * @tc.steps: step1. insert cloud into local
708 * @tc.expected: step1. local cache has gid
709 */
710 CloudSyncUtils::UpdateLocalCache(OpType::INSERT, cloudInfo, localInfo, localLogInfoCache);
711 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
712 EXPECT_EQ(localLogInfoCache[hashKey].cloudGid, cloudInfo.cloudGid);
713 /**
714 * @tc.steps: step2. delete local
715 * @tc.expected: step2. local flag is delete
716 */
717 CloudSyncUtils::UpdateLocalCache(OpType::DELETE, cloudInfo, localInfo, localLogInfoCache);
718 EXPECT_EQ(localLogInfoCache[hashKey].flag, static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE));
719 }
720
721 /**
722 * @tc.name: SameBatchTest002
723 * @tc.desc: Verify cal opType in same batch.
724 * @tc.type: FUNC
725 * @tc.require:
726 * @tc.author: zhangqiquan
727 */
728 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest002, TestSize.Level0)
729 {
730 /**
731 * @tc.steps: step1. prepare two data with same pk
732 */
733 ICloudSyncer::SyncParam param;
734 param.downloadData.opType.push_back(OpType::INSERT);
735 param.downloadData.opType.push_back(OpType::UPDATE);
736 const std::string pkField = "pk";
737 param.changedData.field.push_back(pkField);
738 VBucket oneRow;
739 oneRow[pkField] = static_cast<int64_t>(1); // 1 is pk
740 param.downloadData.data.push_back(oneRow);
741 param.downloadData.data.push_back(oneRow);
742 /**
743 * @tc.steps: step2. cal opType by utils
744 * @tc.expected: step2. all type should be INSERT
745 */
746 for (size_t i = 0; i < param.downloadData.data.size(); ++i) {
747 EXPECT_EQ(CloudSyncUtils::CalOpType(param, i), OpType::INSERT);
748 }
749 /**
750 * @tc.steps: step3. cal opType by utils
751 * @tc.expected: step3. should be UPDATE because diff pk
752 */
753 oneRow[pkField] = static_cast<int64_t>(2); // 2 is pk
754 param.downloadData.data.push_back(oneRow);
755 param.downloadData.opType.push_back(OpType::UPDATE);
756 // index start with zero
757 EXPECT_EQ(CloudSyncUtils::CalOpType(param, param.downloadData.data.size() - 1), OpType::UPDATE);
758 }
759
760 /**
761 * @tc.name: CloudDBProxyTest013
762 * @tc.desc: Verify CloudDBProxy interfaces.
763 * @tc.type: FUNC
764 * @tc.require: DTS2024073106613
765 * @tc.author: suyue
766 */
767 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest013, TestSize.Level0)
768 {
769 /**
770 * @tc.steps: step1. call CloudDBProxy interfaces when ICloudDb is nullptr.
771 * @tc.expected: step1. return -E_CLOUD_ERROR.
772 */
773 CloudDBProxy proxy;
774 int ret = proxy.UnLock();
775 EXPECT_EQ(ret, -E_CLOUD_ERROR);
776 ret = proxy.HeartBeat();
777 EXPECT_EQ(ret, -E_CLOUD_ERROR);
778 VBucket extend;
779 const std::string tableName = "test";
780 std::vector<VBucket> record;
781 ret = proxy.Query(tableName, extend, record);
782 EXPECT_EQ(ret, -E_CLOUD_ERROR);
783 Info info;
784 ret = proxy.BatchInsert(tableName, record, record, info);
785 EXPECT_EQ(ret, -E_CLOUD_ERROR);
786 ret = proxy.BatchUpdate(tableName, record, record, info);
787 EXPECT_EQ(ret, -E_CLOUD_ERROR);
788 ret = proxy.BatchDelete(tableName, record, record, info);
789 EXPECT_EQ(ret, -E_CLOUD_ERROR);
790 std::pair<int, uint64_t> res = proxy.Lock();
791 EXPECT_EQ(res.first, -E_CLOUD_ERROR);
792 std::pair<int, std::string> cursor = proxy.GetEmptyCursor(tableName);
793 EXPECT_EQ(cursor.first, -E_CLOUD_ERROR);
794
795 /**
796 * @tc.steps: step2. call CloudDBProxy interfaces when para is err.
797 * @tc.expected: step2. return fail.
798 */
799 std::pair<int, std::string> ver = proxy.GetCloudVersion("test");
800 EXPECT_EQ(ver.first, -E_NOT_SUPPORT);
801 std::vector<Asset> assets;
802 ret = proxy.RemoveLocalAssets(assets);
803 EXPECT_EQ(ret, -E_OK);
804 assets = {{}};
805 ret = proxy.RemoveLocalAssets(assets);
806 EXPECT_EQ(ret, -E_OK);
807 }
808
809 /**
810 * @tc.name: CloudSyncUtilsTest
811 * @tc.desc: Verify CloudSyncUtils interfaces
812 * @tc.type: FUNC
813 * @tc.require: DTS2024073106613
814 * @tc.author: suyue
815 */
816 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncUtilsTest, TestSize.Level0)
817 {
818 /**
819 * @tc.steps: step1. Test type translation interfaces.
820 * @tc.expected: step1. success.
821 */
822 CloudSyncUtils utilsObj;
823 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::INSERT), AssetOpType::INSERT);
824 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DELETE), AssetOpType::DELETE);
825 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::UPDATE), AssetOpType::UPDATE);
826 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::NORMAL), AssetOpType::NO_CHANGE);
827 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DOWNLOADING), AssetOpType::NO_CHANGE);
828 EXPECT_EQ(utilsObj.OpTypeToChangeType(OpType::ONLY_UPDATE_GID), ChangeType::OP_BUTT);
829
830 /**
831 * @tc.steps: step2. call CloudSyncUtils interfaces when para is err.
832 * @tc.expected: step2. return false.
833 */
834 const std::vector<DeviceID> devices = {"test"};
835 int mode = 10; // set metaMode to 10 not in enum class MetaMode
836 int ret = utilsObj.CheckParamValid(devices, static_cast<SyncMode>(mode));
837 EXPECT_EQ(ret, -E_INVALID_ARGS);
838 VBucket record;
839 const std::vector<std::string> pkColNames;
840 std::vector<Type> cloudPkVals = {{}};
841 ret = utilsObj.GetCloudPkVals(record, pkColNames, 0, cloudPkVals);
842 EXPECT_EQ(ret, -E_INVALID_ARGS);
843 Assets assets = {{}};
844 utilsObj.StatusToFlagForAssets(assets);
845 std::vector<Field> fields = {{"test", TYPE_INDEX<Assets>, true, true}};
846 utilsObj.StatusToFlagForAssetsInRecord(fields, record);
847 Timestamp timestamp;
848 CloudSyncData uploadData;
849 const int64_t count = 0;
850 ret = utilsObj.UpdateExtendTime(uploadData, count, 0, timestamp);
851 EXPECT_EQ(ret, -E_INTERNAL_ERROR);
852 CloudSyncBatch data;
853 data.assets = {{}};
854 ret = utilsObj.FillAssetIdToAssets(data, 0, CloudWaterType::UPDATE);
855 EXPECT_EQ(ret, -E_CLOUD_ERROR);
856
857 /**
858 * @tc.steps: step3. call IsChangeDataEmpty interface when para is different.
859 * @tc.expected: step3. success.
860 */
861 ChangedData changedData;
862 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
863 changedData.primaryData[OP_INSERT] = {{}};
864 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
865 changedData.primaryData[OP_UPDATE] = {{}};
866 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
867 changedData.primaryData[OP_DELETE] = {{}};
868 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), false);
869 }
870 }