• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 
17 #include <utility>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_data_utils.h"
20 #include "cloud/cloud_db_proxy.h"
21 #include "cloud/cloud_db_types.h"
22 #include "cloud/cloud_sync_utils.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_errno.h"
25 #include "mock_icloud_sync_storage_interface.h"
26 #include "virtual_cloud_db.h"
27 #include "virtual_cloud_syncer.h"
28 
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 
33 namespace {
34 constexpr const char *TABLE_NAME = "Table";
GetFields()35 std::vector<Field> GetFields()
36 {
37     return {
38         {
39             .colName = "col1",
40             .type = TYPE_INDEX<int64_t>,
41             .primary = true,
42             .nullable = false
43         },
44         {
45             .colName = "col2",
46             .type = TYPE_INDEX<std::string>,
47             .primary = false
48         },
49         {
50             .colName = "col3",
51             .type = TYPE_INDEX<Bytes>,
52             .primary = false
53         }
54     };
55 }
56 
ModifyRecords(std::vector<VBucket> & expectRecord)57 void ModifyRecords(std::vector<VBucket> &expectRecord)
58 {
59     std::vector<VBucket> tempRecord;
60     for (const auto &record: expectRecord) {
61         VBucket bucket;
62         for (auto &[field, val] : record) {
63             LOGD("modify field %s", field.c_str());
64             if (val.index() == TYPE_INDEX<int64_t>) {
65                 int64_t v = std::get<int64_t>(val);
66                 bucket.insert({ field, static_cast<int64_t>(v + 1) });
67             } else {
68                 bucket.insert({ field, val });
69             }
70         }
71         tempRecord.push_back(bucket);
72     }
73     expectRecord = tempRecord;
74 }
75 
Sync(CloudSyncer * cloudSyncer,int & callCount)76 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
77 {
78     std::mutex processMutex;
79     std::condition_variable cv;
80     SyncProcess syncProcess;
81     const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
82         const std::map<std::string, SyncProcess> &process) {
83         {
84             std::lock_guard<std::mutex> autoLock(processMutex);
85             syncProcess = process.begin()->second;
86             if (!process.empty()) {
87                 syncProcess = process.begin()->second;
88             } else {
89                 SyncProcess tmpProcess;
90                 syncProcess = tmpProcess;
91             }
92             callCount++;
93         }
94         cv.notify_all();
95     };
96     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
97     {
98         LOGI("begin to wait sync");
99         std::unique_lock<std::mutex> uniqueLock(processMutex);
100         cv.wait(uniqueLock, [&syncProcess]() {
101             return syncProcess.process == ProcessStatus::FINISHED;
102         });
103         LOGI("end to wait sync");
104     }
105     return syncProcess.errCode;
106 }
107 
108 class DistributedDBCloudDBProxyTest : public testing::Test {
109 public:
110     static void SetUpTestCase();
111     static void TearDownTestCase();
112     void SetUp() override;
113     void TearDown() override;
114 
115 protected:
116     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
117 };
118 
SetUpTestCase()119 void DistributedDBCloudDBProxyTest::SetUpTestCase()
120 {
121 }
122 
TearDownTestCase()123 void DistributedDBCloudDBProxyTest::TearDownTestCase()
124 {
125 }
126 
SetUp()127 void DistributedDBCloudDBProxyTest::SetUp()
128 {
129     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
130     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
131 }
132 
TearDown()133 void DistributedDBCloudDBProxyTest::TearDown()
134 {
135     virtualCloudDb_ = nullptr;
136 }
137 
138 /**
139  * @tc.name: CloudDBProxyTest001
140  * @tc.desc: Verify cloud db init and close function.
141  * @tc.type: FUNC
142  * @tc.require:
143  * @tc.author: zhangqiquan
144  */
145 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
146 {
147     /**
148      * @tc.steps: step1. set cloud db to proxy
149      * @tc.expected: step1. E_OK
150      */
151     CloudDBProxy proxy;
152     proxy.SetCloudDB(virtualCloudDb_);
153     /**
154      * @tc.steps: step2. proxy close cloud db with cloud error
155      * @tc.expected: step2. -E_CLOUD_ERROR
156      */
157     virtualCloudDb_->SetCloudError(true);
158     EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
159     /**
160      * @tc.steps: step3. proxy close cloud db again
161      * @tc.expected: step3. E_OK because cloud db has been set nullptr
162      */
163     EXPECT_EQ(proxy.Close(), E_OK);
164     virtualCloudDb_->SetCloudError(false);
165     EXPECT_EQ(proxy.Close(), E_OK);
166 }
167 
168 /**
169  * @tc.name: CloudDBProxyTest002
170  * @tc.desc: Verify cloud db insert function.
171  * @tc.type: FUNC
172  * @tc.require:
173  * @tc.author: zhangqiquan
174  */
175 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
176 {
177     /**
178      * @tc.steps: step1. set cloud db to proxy
179      * @tc.expected: step1. E_OK
180      */
181     CloudDBProxy proxy;
182     proxy.SetCloudDB(virtualCloudDb_);
183     /**
184      * @tc.steps: step2. insert data to cloud db
185      * @tc.expected: step2. OK
186      */
187     TableSchema schema = {
188         .name = TABLE_NAME,
189         .sharedTableName = "",
190         .fields = GetFields()
191     };
192     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
193     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
194     Info uploadInfo;
195     std::vector<VBucket> insert = expectRecords;
196     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
197 
198     VBucket extend;
199     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
200     std::vector<VBucket> actualRecords;
201     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
202     /**
203      * @tc.steps: step3. proxy query data
204      * @tc.expected: step3. data is equal to expect
205      */
206     ASSERT_EQ(actualRecords.size(), expectRecords.size());
207     for (size_t i = 0; i < actualRecords.size(); ++i) {
208         for (const auto &field: schema.fields) {
209             Type expect = expectRecords[i][field.colName];
210             Type actual = actualRecords[i][field.colName];
211             EXPECT_EQ(expect.index(), actual.index());
212         }
213     }
214     /**
215      * @tc.steps: step4. proxy close cloud db
216      * @tc.expected: step4. E_OK
217      */
218     EXPECT_EQ(proxy.Close(), E_OK);
219 }
220 
221 /**
222  * @tc.name: CloudDBProxyTest003
223  * @tc.desc: Verify cloud db update function.
224  * @tc.type: FUNC
225  * @tc.require:
226  * @tc.author: zhangqiquan
227  */
228 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
229 {
230     TableSchema schema = {
231         .name = TABLE_NAME,
232         .sharedTableName = "",
233         .fields = GetFields()
234     };
235     /**
236      * @tc.steps: step1. set cloud db to proxy
237      * @tc.expected: step1. E_OK
238      */
239     CloudDBProxy proxy;
240     proxy.SetCloudDB(virtualCloudDb_);
241     /**
242      * @tc.steps: step2. insert data to cloud db
243      * @tc.expected: step2. OK
244      */
245     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
246     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
247     Info uploadInfo;
248     std::vector<VBucket> insert = expectRecords;
249     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
250     /**
251      * @tc.steps: step3. update data to cloud db
252      * @tc.expected: step3. E_OK
253      */
254     ModifyRecords(expectRecords);
255     std::vector<VBucket> update = expectRecords;
256     EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), E_OK);
257     /**
258      * @tc.steps: step3. proxy close cloud db
259      * @tc.expected: step3. E_OK
260      */
261     VBucket extend;
262     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
263     std::vector<VBucket> actualRecords;
264     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
265     ASSERT_EQ(actualRecords.size(), expectRecords.size());
266     for (size_t i = 0; i < actualRecords.size(); ++i) {
267         for (const auto &field: schema.fields) {
268             Type expect = expectRecords[i][field.colName];
269             Type actual = actualRecords[i][field.colName];
270             EXPECT_EQ(expect.index(), actual.index());
271         }
272     }
273     /**
274      * @tc.steps: step4. proxy close cloud db
275      * @tc.expected: step4. E_OK
276      */
277     EXPECT_EQ(proxy.Close(), E_OK);
278 }
279 
280 /**
281  * @tc.name: CloudDBProxyTest005
282  * @tc.desc: Verify sync failed after cloud error.
283  * @tc.type: FUNC
284  * @tc.require:
285  * @tc.author: zhangqiquan
286  */
287 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
288 {
289     /**
290      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
291      * @tc.expected: step1. E_OK
292      */
293     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
294     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
295     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
296     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
297     ASSERT_NE(cloudSyncer, nullptr);
298     cloudSyncer->SetCloudDB(virtualCloudDb_);
299     cloudSyncer->SetSyncAction(true, false);
300     virtualCloudDb_->SetCloudError(true);
301     /**
302      * @tc.steps: step2. call sync and wait sync finish
303      * @tc.expected: step2. CLOUD_ERROR by lock error
304      */
305     int callCount = 0;
306     EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
307     /**
308      * @tc.steps: step3. get cloud lock status and heartbeat count
309      * @tc.expected: step3. cloud is unlock and no heartbeat
310      */
311     EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
312     EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
313     virtualCloudDb_->ClearHeartbeatCount();
314     cloudSyncer->Close();
315     RefObject::KillAndDecObjRef(cloudSyncer);
316 }
317 
318 /**
319  * @tc.name: CloudDBProxyTest008
320  * @tc.desc: Verify cloud db heartbeat with diff status.
321  * @tc.type: FUNC
322  * @tc.require:
323  * @tc.author: zhangqiquan
324  */
325 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
326 {
327     /**
328      * @tc.steps: step1. set cloud db to proxy
329      * @tc.expected: step1. E_OK
330      */
331     CloudDBProxy proxy;
332     proxy.SetCloudDB(virtualCloudDb_);
333     /**
334      * @tc.steps: step2. proxy heartbeat with diff status
335      */
336     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
337     int errCode = proxy.HeartBeat();
338     EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
339     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
340 
341     virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
342     errCode = proxy.HeartBeat();
343     EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
344     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
345 
346     virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
347     errCode = proxy.HeartBeat();
348     EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
349     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
350 
351     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
352     errCode = proxy.HeartBeat();
353     EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
354     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
355 
356     virtualCloudDb_->SetActionStatus(DB_ERROR);
357     errCode = proxy.HeartBeat();
358     EXPECT_EQ(errCode, -E_CLOUD_ERROR);
359     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
360 
361     /**
362      * @tc.steps: step3. proxy close cloud db
363      * @tc.expected: step3. E_OK
364      */
365     EXPECT_EQ(proxy.Close(), E_OK);
366 }
367 
368 /**
369  * @tc.name: CloudDBProxyTest009
370  * @tc.desc: Verify cloud db closed and current task exit .
371  * @tc.type: FUNC
372  * @tc.require:
373  * @tc.author: zhangqiquan
374  */
375 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
376 {
377     /**
378      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
379      * @tc.expected: step1. E_OK
380      */
381     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
382     ASSERT_NE(iCloud, nullptr);
383     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
384     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
385     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
386     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
387     ASSERT_NE(cloudSyncer, nullptr);
388     cloudSyncer->SetCloudDB(virtualCloudDb_);
389     cloudSyncer->SetSyncAction(true, false);
__anon130c91520402() 390     cloudSyncer->SetDownloadFunc([]() {
391         std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
392         return -E_CLOUD_ERROR;
393     });
394     /**
395      * @tc.steps: step2. call sync and wait sync finish
396      * @tc.expected: step2. E_OK
397      */
398     std::mutex processMutex;
399     bool finished = false;
400     std::condition_variable cv;
401     LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon130c91520502(const std::map<std::string, SyncProcess> &process) 402     const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
403         {
404             std::lock_guard<std::mutex> autoLock(processMutex);
405             for (const auto &item: process) {
406                 if (item.second.process == DistributedDB::FINISHED) {
407                     finished = true;
408                     EXPECT_EQ(item.second.errCode, DB_CLOSED);
409                 }
410             }
411         }
412         cv.notify_all();
413     };
414     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
415     std::this_thread::sleep_for(std::chrono::seconds(1));
416     cloudSyncer->Close();
417     {
418         LOGI("[CloudDBProxyTest009] begin to wait sync");
419         std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon130c91520602() 420         cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finished]() {
421             return finished;
422         });
423         LOGI("[CloudDBProxyTest009] end to wait sync");
424     }
425     RefObject::KillAndDecObjRef(cloudSyncer);
426 }
427 
428 /**
429  * @tc.name: CloudDBProxyTest010
430  * @tc.desc: Verify cloud db lock with diff status.
431  * @tc.type: FUNC
432  * @tc.require:
433  * @tc.author: zhangqiquan
434  */
435 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
436 {
437     /**
438      * @tc.steps: step1. set cloud db to proxy
439      * @tc.expected: step1. E_OK
440      */
441     CloudDBProxy proxy;
442     proxy.SetCloudDB(virtualCloudDb_);
443     /**
444      * @tc.steps: step2. proxy lock with diff status
445      */
446     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
447     auto ret = proxy.Lock();
448     EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
449     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
450 
451     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
452     ret = proxy.Lock();
453     EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
454     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
455     /**
456      * @tc.steps: step3. proxy close cloud db
457      * @tc.expected: step3. E_OK
458      */
459     EXPECT_EQ(proxy.Close(), E_OK);
460 }
461 
462 /**
463  * @tc.name: CloudDBProxyTest008
464  * @tc.desc: Verify cloud db heartbeat with diff status.
465  * @tc.type: FUNC
466  * @tc.require:
467  * @tc.author: zhangqiquan
468  */
469 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest011, TestSize.Level2)
470 {
471     /**
472      * @tc.steps: step1. set cloud db to proxy
473      * @tc.expected: step1. E_OK
474      */
475     CloudDBProxy proxy;
476     proxy.SetCloudDB(virtualCloudDb_);
477     virtualCloudDb_->SetHeartbeatBlockTime(100); // block 100 ms
478     std::mutex waitMutex;
479     std::condition_variable waitCv;
480     const int scheduleCount = 12;
481     int currentCount = 0;
482     for (int i = 0; i < scheduleCount; ++i) {
__anon130c91520702() 483         RuntimeContext::GetInstance()->ScheduleTask([&proxy, &waitMutex, &waitCv, &currentCount]() {
484             proxy.HeartBeat();
485             {
486                 std::lock_guard<std::mutex> autoLock(waitMutex);
487                 currentCount++;
488                 LOGI("[CloudDBProxyTest011] CurrentCount %d", currentCount);
489             }
490             waitCv.notify_all();
491         });
492     }
493     LOGI("[CloudDBProxyTest011] Begin wait all task finish");
494     std::unique_lock<std::mutex> uniqueLock(waitMutex);
__anon130c91520802() 495     waitCv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [&currentCount, scheduleCount]() {
496         return currentCount >= scheduleCount;
497     });
498     LOGI("[CloudDBProxyTest011] End wait all task finish");
499     EXPECT_EQ(currentCount, scheduleCount);
500 }
501 
502 /**
503  * @tc.name: CloudDBProxyTest012
504  * @tc.desc: Asset data deduplication.
505  * @tc.type: FUNC
506  * @tc.require:
507  * @tc.author: tankaisheng
508  */
509 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest012, TestSize.Level2)
510 {
511     /**
512      * @tc.steps: step1. set cloud db to proxy
513      * @tc.expected: step1. E_OK
514      */
515     Assets assets;
516     Asset asset1;
517     asset1.name = "assetName1";
518     asset1.assetId = "";
519     asset1.modifyTime = "20240730";
520     assets.push_back(asset1);
521 
522     Asset asset2;
523     asset2.name = "assetName1";
524     asset2.assetId = "123";
525     asset2.modifyTime = "20240730";
526     assets.push_back(asset2);
527 
528     Asset asset3;
529     asset3.name = "assetName2";
530     asset3.assetId = "456";
531     asset3.modifyTime = "20240730";
532     assets.push_back(asset3);
533 
534     Asset asset4;
535     asset4.name = "assetName2";
536     asset4.assetId = "789";
537     asset4.modifyTime = "20240731";
538     assets.push_back(asset4);
539 
540     Asset asset5;
541     asset5.name = "assetName3";
542     asset5.assetId = "123";
543     asset5.modifyTime = "20240730";
544     assets.push_back(asset5);
545 
546     Asset asset6;
547     asset6.name = "assetName3";
548     asset6.assetId = "789";
549     asset6.modifyTime = "20240730";
550     assets.push_back(asset6);
551 
552     Asset asset7;
553     asset7.name = "assetName1";
554     asset7.assetId = "456";
555     asset7.modifyTime = "20240731";
556     assets.push_back(asset7);
557 
558     DBCommon::RemoveDuplicateAssetsData(assets);
559 
560     /**
561      * @tc.steps: step2. check data
562      * @tc.expected: step2. E_OK
563      */
564     std::string assetNameArr[] = {"assetName2", "assetName3", "assetName1"};
565     std::string assetIdArr[] = {"789", "123", "456"};
566     EXPECT_EQ(assets.size(), 3u);
567     for (std::vector<DistributedDB::Asset>::size_type i = 0; i < assets.size(); ++i) {
568         EXPECT_EQ(assets.at(i).name, assetNameArr[i]);
569         EXPECT_EQ(assets.at(i).assetId, assetIdArr[i]);
570     }
571 }
572 
573 /**
574  * @tc.name: CloudSyncQueue001
575  * @tc.desc: Verify sync task count decrease after sync finished.
576  * @tc.type: FUNC
577  * @tc.require:
578  * @tc.author: zhangqiquan
579  */
580 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
581 {
582     /**
583      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
584      * @tc.expected: step1. E_OK
585      */
586     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
587     ASSERT_NE(iCloud, nullptr);
588     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
589     ASSERT_NE(cloudSyncer, nullptr);
590     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
591     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
592     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
593     cloudSyncer->SetCloudDB(virtualCloudDb_);
594     cloudSyncer->SetSyncAction(true, false);
__anon130c91520902() 595     cloudSyncer->SetDownloadFunc([cloudSyncer]() {
596         EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
597         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
598         return E_OK;
599     });
600     /**
601      * @tc.steps: step2. call sync and wait sync finish
602      */
603     int callCount = 0;
604     EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
605     RuntimeContext::GetInstance()->StopTaskPool();
606     EXPECT_EQ(callCount, 1);
607     RefObject::KillAndDecObjRef(cloudSyncer);
608 }
609 
610 /**
611  * @tc.name: CloudSyncQueue002
612  * @tc.desc: Verify sync task abort after close.
613  * @tc.type: FUNC
614  * @tc.require:
615  * @tc.author: zhangqiquan
616  */
617 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue002, TestSize.Level2)
618 {
619     /**
620      * @tc.steps: step1. set cloud db to proxy and sleep 2s when download
621      * @tc.expected: step1. E_OK
622      */
623     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
624     ASSERT_NE(iCloud, nullptr);
625     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
626     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
627     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
628     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
629     ASSERT_NE(cloudSyncer, nullptr);
630     cloudSyncer->SetCloudDB(virtualCloudDb_);
631     cloudSyncer->SetSyncAction(true, false);
632     std::atomic<bool> close = false;
__anon130c91520a02() 633     cloudSyncer->SetDownloadFunc([cloudSyncer, &close]() {
634         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
635         cloudSyncer->PauseCurrentTask();
636         EXPECT_TRUE(close);
637         return -E_TASK_PAUSED;
638     });
639     /**
640      * @tc.steps: step2. call sync and wait sync finish
641      */
642     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, nullptr, 0), E_OK);
643     std::this_thread::sleep_for(std::chrono::seconds(1));
644     close = true;
645     cloudSyncer->Close();
646     RuntimeContext::GetInstance()->StopTaskPool();
647     RefObject::KillAndDecObjRef(cloudSyncer);
648 }
649 
650 /**
651  * @tc.name: CloudSyncerTest001
652  * @tc.desc: Verify syncer notify by queue schedule.
653  * @tc.type: FUNC
654  * @tc.require:
655  * @tc.author: zhangqiquan
656  */
657 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
658 {
659     /**
660      * @tc.steps: step1. set cloud db to proxy
661      * @tc.expected: step1. E_OK
662      */
663     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
664     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
665     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
666     EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
667     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
668     std::atomic<int> callCount = 0;
669     std::condition_variable cv;
__anon130c91520b02(const std::map<std::string, SyncProcess> &) 670     cloudSyncer->SetCurrentTaskInfo([&callCount, &cv](const std::map<std::string, SyncProcess> &) {
671         callCount++;
672         int before = callCount;
673         LOGD("on callback %d", before);
674         std::this_thread::sleep_for(std::chrono::seconds(1));
675         EXPECT_EQ(before, callCount);
676         cv.notify_all();
677     }, 1u);
678     const int notifyCount = 2;
679     for (int i = 0; i < notifyCount; ++i) {
680         cloudSyncer->Notify();
681     }
682     cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
683     std::mutex processMutex;
684     std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon130c91520c02() 685     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&callCount]() {
686         return callCount == notifyCount;
687     });
688     cloudSyncer->Close();
689     RefObject::KillAndDecObjRef(cloudSyncer);
690 }
691 
692 /**
693  * @tc.name: SameBatchTest001
694  * @tc.desc: Verify update cache in same batch.
695  * @tc.type: FUNC
696  * @tc.require:
697  * @tc.author: zhangqiquan
698  */
699 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest001, TestSize.Level0)
700 {
701     std::map<std::string, LogInfo> localLogInfoCache;
702     LogInfo cloudInfo;
703     LogInfo localInfo;
704     localInfo.hashKey = {'k'};
705     cloudInfo.cloudGid = "gid";
706     /**
707      * @tc.steps: step1. insert cloud into local
708      * @tc.expected: step1. local cache has gid
709      */
710     CloudSyncUtils::UpdateLocalCache(OpType::INSERT, cloudInfo, localInfo, localLogInfoCache);
711     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
712     EXPECT_EQ(localLogInfoCache[hashKey].cloudGid, cloudInfo.cloudGid);
713     /**
714      * @tc.steps: step2. delete local
715      * @tc.expected: step2. local flag is delete
716      */
717     CloudSyncUtils::UpdateLocalCache(OpType::DELETE, cloudInfo, localInfo, localLogInfoCache);
718     EXPECT_EQ(localLogInfoCache[hashKey].flag, static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE));
719 }
720 
721 /**
722  * @tc.name: SameBatchTest002
723  * @tc.desc: Verify cal opType in same batch.
724  * @tc.type: FUNC
725  * @tc.require:
726  * @tc.author: zhangqiquan
727  */
728 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest002, TestSize.Level0)
729 {
730     /**
731      * @tc.steps: step1. prepare two data with same pk
732      */
733     ICloudSyncer::SyncParam param;
734     param.downloadData.opType.push_back(OpType::INSERT);
735     param.downloadData.opType.push_back(OpType::UPDATE);
736     const std::string pkField = "pk";
737     param.changedData.field.push_back(pkField);
738     VBucket oneRow;
739     oneRow[pkField] = static_cast<int64_t>(1); // 1 is pk
740     param.downloadData.data.push_back(oneRow);
741     param.downloadData.data.push_back(oneRow);
742     /**
743      * @tc.steps: step2. cal opType by utils
744      * @tc.expected: step2. all type should be INSERT
745      */
746     for (size_t i = 0; i < param.downloadData.data.size(); ++i) {
747         EXPECT_EQ(CloudSyncUtils::CalOpType(param, i), OpType::INSERT);
748     }
749     /**
750      * @tc.steps: step3. cal opType by utils
751      * @tc.expected: step3. should be UPDATE because diff pk
752      */
753     oneRow[pkField] = static_cast<int64_t>(2); // 2 is pk
754     param.downloadData.data.push_back(oneRow);
755     param.downloadData.opType.push_back(OpType::UPDATE);
756     // index start with zero
757     EXPECT_EQ(CloudSyncUtils::CalOpType(param, param.downloadData.data.size() - 1), OpType::UPDATE);
758 }
759 
760 /**
761  * @tc.name: CloudDBProxyTest013
762  * @tc.desc: Verify CloudDBProxy interfaces.
763  * @tc.type: FUNC
764  * @tc.require: DTS2024073106613
765  * @tc.author: suyue
766  */
767 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest013, TestSize.Level0)
768 {
769     /**
770      * @tc.steps: step1. call CloudDBProxy interfaces when ICloudDb is nullptr.
771      * @tc.expected: step1. return -E_CLOUD_ERROR.
772      */
773     CloudDBProxy proxy;
774     int ret = proxy.UnLock();
775     EXPECT_EQ(ret, -E_CLOUD_ERROR);
776     ret = proxy.HeartBeat();
777     EXPECT_EQ(ret, -E_CLOUD_ERROR);
778     VBucket extend;
779     const std::string tableName = "test";
780     std::vector<VBucket> record;
781     ret = proxy.Query(tableName, extend, record);
782     EXPECT_EQ(ret, -E_CLOUD_ERROR);
783     Info info;
784     ret = proxy.BatchInsert(tableName, record, record, info);
785     EXPECT_EQ(ret, -E_CLOUD_ERROR);
786     ret = proxy.BatchUpdate(tableName, record, record, info);
787     EXPECT_EQ(ret, -E_CLOUD_ERROR);
788     ret = proxy.BatchDelete(tableName, record, record, info);
789     EXPECT_EQ(ret, -E_CLOUD_ERROR);
790     std::pair<int, uint64_t> res = proxy.Lock();
791     EXPECT_EQ(res.first, -E_CLOUD_ERROR);
792     std::pair<int, std::string> cursor = proxy.GetEmptyCursor(tableName);
793     EXPECT_EQ(cursor.first, -E_CLOUD_ERROR);
794 
795     /**
796      * @tc.steps: step2. call CloudDBProxy interfaces when para is err.
797      * @tc.expected: step2. return fail.
798      */
799     std::pair<int, std::string> ver = proxy.GetCloudVersion("test");
800     EXPECT_EQ(ver.first, -E_NOT_SUPPORT);
801     std::vector<Asset> assets;
802     ret = proxy.RemoveLocalAssets(assets);
803     EXPECT_EQ(ret, -E_OK);
804     assets = {{}};
805     ret = proxy.RemoveLocalAssets(assets);
806     EXPECT_EQ(ret, -E_OK);
807 }
808 
809 /**
810  * @tc.name: CloudSyncUtilsTest
811  * @tc.desc: Verify CloudSyncUtils interfaces
812  * @tc.type: FUNC
813  * @tc.require: DTS2024073106613
814  * @tc.author: suyue
815  */
816 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncUtilsTest, TestSize.Level0)
817 {
818     /**
819      * @tc.steps: step1. Test type translation interfaces.
820      * @tc.expected: step1. success.
821      */
822     CloudSyncUtils utilsObj;
823     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::INSERT), AssetOpType::INSERT);
824     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DELETE), AssetOpType::DELETE);
825     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::UPDATE), AssetOpType::UPDATE);
826     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::NORMAL), AssetOpType::NO_CHANGE);
827     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DOWNLOADING), AssetOpType::NO_CHANGE);
828     EXPECT_EQ(utilsObj.OpTypeToChangeType(OpType::ONLY_UPDATE_GID), ChangeType::OP_BUTT);
829 
830     /**
831      * @tc.steps: step2. call CloudSyncUtils interfaces when para is err.
832      * @tc.expected: step2. return false.
833      */
834     const std::vector<DeviceID> devices = {"test"};
835     int mode = 10; // set metaMode to 10 not in enum class MetaMode
836     int ret = utilsObj.CheckParamValid(devices, static_cast<SyncMode>(mode));
837     EXPECT_EQ(ret, -E_INVALID_ARGS);
838     VBucket record;
839     const std::vector<std::string> pkColNames;
840     std::vector<Type> cloudPkVals = {{}};
841     ret = utilsObj.GetCloudPkVals(record, pkColNames, 0, cloudPkVals);
842     EXPECT_EQ(ret, -E_INVALID_ARGS);
843     Assets assets = {{}};
844     utilsObj.StatusToFlagForAssets(assets);
845     std::vector<Field> fields = {{"test", TYPE_INDEX<Assets>, true, true}};
846     utilsObj.StatusToFlagForAssetsInRecord(fields, record);
847     Timestamp timestamp;
848     CloudSyncData uploadData;
849     const int64_t count = 0;
850     ret = utilsObj.UpdateExtendTime(uploadData, count, 0, timestamp);
851     EXPECT_EQ(ret, -E_INTERNAL_ERROR);
852     CloudSyncBatch data;
853     data.assets = {{}};
854     ret = utilsObj.FillAssetIdToAssets(data, 0, CloudWaterType::UPDATE);
855     EXPECT_EQ(ret, -E_CLOUD_ERROR);
856 
857     /**
858      * @tc.steps: step3. call IsChangeDataEmpty interface when para is different.
859      * @tc.expected: step3. success.
860      */
861     ChangedData changedData;
862     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
863     changedData.primaryData[OP_INSERT] = {{}};
864     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
865     changedData.primaryData[OP_UPDATE] = {{}};
866     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
867     changedData.primaryData[OP_DELETE] = {{}};
868     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), false);
869 }
870 }