• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 
17 #include <utility>
18 #include "cloud_db_constant.h"
19 #include "cloud_db_data_utils.h"
20 #include "cloud_db_types.h"
21 #include "cloud_db_proxy.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_store_errno.h"
24 #include "mock_icloud_sync_storage_interface.h"
25 #include "virtual_cloud_db.h"
26 #include "virtual_cloud_syncer.h"
27 
28 using namespace std;
29 using namespace testing::ext;
30 using namespace DistributedDB;
31 
32 namespace {
33 constexpr const char *TABLE_NAME = "Table";
GetFields()34 std::vector<Field> GetFields()
35 {
36     return {
37         {
38             .colName = "col1",
39             .type = TYPE_INDEX<int64_t>,
40             .primary = true,
41             .nullable = false
42         },
43         {
44             .colName = "col2",
45             .type = TYPE_INDEX<std::string>,
46             .primary = false
47         },
48         {
49             .colName = "col3",
50             .type = TYPE_INDEX<Bytes>,
51             .primary = false
52         }
53     };
54 }
55 
ModifyRecords(std::vector<VBucket> & expectRecord)56 void ModifyRecords(std::vector<VBucket> &expectRecord)
57 {
58     std::vector<VBucket> tempRecord;
59     for (const auto &record: expectRecord) {
60         VBucket bucket;
61         for (auto &[field, val] : record) {
62             LOGD("modify field %s", field.c_str());
63             if (val.index() == TYPE_INDEX<int64_t>) {
64                 int64_t v = std::get<int64_t>(val);
65                 bucket.insert({ field, static_cast<int64_t>(v + 1) });
66             } else {
67                 bucket.insert({ field, val });
68             }
69         }
70         tempRecord.push_back(bucket);
71     }
72     expectRecord = tempRecord;
73 }
74 
Sync(CloudSyncer * cloudSyncer,int & callCount)75 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
76 {
77     std::mutex processMutex;
78     std::condition_variable cv;
79     SyncProcess syncProcess;
80     const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
81         const std::map<std::string, SyncProcess> &process) {
82         {
83             std::lock_guard<std::mutex> autoLock(processMutex);
84             syncProcess = process.begin()->second;
85             if (!process.empty()) {
86                 syncProcess = process.begin()->second;
87             } else {
88                 SyncProcess tmpProcess;
89                 syncProcess = tmpProcess;
90             }
91             callCount++;
92         }
93         cv.notify_all();
94     };
95     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
96     {
97         LOGI("begin to wait sync");
98         std::unique_lock<std::mutex> uniqueLock(processMutex);
99         cv.wait(uniqueLock, [&syncProcess]() {
100             return syncProcess.process == ProcessStatus::FINISHED;
101         });
102         LOGI("end to wait sync");
103     }
104     return syncProcess.errCode;
105 }
106 
107 class DistributedDBCloudDBProxyTest : public testing::Test {
108 public:
109     static void SetUpTestCase();
110     static void TearDownTestCase();
111     void SetUp() override;
112     void TearDown() override;
113 
114 protected:
115     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
116 };
117 
SetUpTestCase()118 void DistributedDBCloudDBProxyTest::SetUpTestCase()
119 {
120 }
121 
TearDownTestCase()122 void DistributedDBCloudDBProxyTest::TearDownTestCase()
123 {
124 }
125 
SetUp()126 void DistributedDBCloudDBProxyTest::SetUp()
127 {
128     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
129     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
130 }
131 
TearDown()132 void DistributedDBCloudDBProxyTest::TearDown()
133 {
134     virtualCloudDb_ = nullptr;
135 }
136 
137 /**
138  * @tc.name: CloudDBProxyTest001
139  * @tc.desc: Verify cloud db init and close function.
140  * @tc.type: FUNC
141  * @tc.require:
142  * @tc.author: zhangqiquan
143  */
144 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
145 {
146     /**
147      * @tc.steps: step1. set cloud db to proxy
148      * @tc.expected: step1. E_OK
149      */
150     CloudDBProxy proxy;
151     proxy.SetCloudDB(virtualCloudDb_);
152     /**
153      * @tc.steps: step2. proxy close cloud db with cloud error
154      * @tc.expected: step2. -E_CLOUD_ERROR
155      */
156     virtualCloudDb_->SetCloudError(true);
157     EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
158     /**
159      * @tc.steps: step3. proxy close cloud db again
160      * @tc.expected: step3. E_OK because cloud db has been set nullptr
161      */
162     EXPECT_EQ(proxy.Close(), E_OK);
163     virtualCloudDb_->SetCloudError(false);
164     EXPECT_EQ(proxy.Close(), E_OK);
165 }
166 
167 /**
168  * @tc.name: CloudDBProxyTest002
169  * @tc.desc: Verify cloud db insert function.
170  * @tc.type: FUNC
171  * @tc.require:
172  * @tc.author: zhangqiquan
173  */
174 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
175 {
176     /**
177      * @tc.steps: step1. set cloud db to proxy
178      * @tc.expected: step1. E_OK
179      */
180     CloudDBProxy proxy;
181     proxy.SetCloudDB(virtualCloudDb_);
182     /**
183      * @tc.steps: step2. insert data to cloud db
184      * @tc.expected: step2. OK
185      */
186     TableSchema schema = {
187         .name = TABLE_NAME,
188         .fields = GetFields()
189     };
190     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
191     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
192     Info uploadInfo;
193     std::vector<VBucket> insert = expectRecords;
194     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), OK);
195 
196     VBucket extend;
197     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
198     std::vector<VBucket> actualRecords;
199     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
200     /**
201      * @tc.steps: step3. proxy query data
202      * @tc.expected: step3. data is equal to expect
203      */
204     ASSERT_EQ(actualRecords.size(), expectRecords.size());
205     for (size_t i = 0; i < actualRecords.size(); ++i) {
206         for (const auto &field: schema.fields) {
207             Type expect = expectRecords[i][field.colName];
208             Type actual = actualRecords[i][field.colName];
209             EXPECT_EQ(expect.index(), actual.index());
210         }
211     }
212     /**
213      * @tc.steps: step4. proxy close cloud db
214      * @tc.expected: step4. E_OK
215      */
216     EXPECT_EQ(proxy.Close(), E_OK);
217 }
218 
219 /**
220  * @tc.name: CloudDBProxyTest003
221  * @tc.desc: Verify cloud db update function.
222  * @tc.type: FUNC
223  * @tc.require:
224  * @tc.author: zhangqiquan
225  */
226 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
227 {
228     TableSchema schema = {
229         .name = TABLE_NAME,
230         .fields = GetFields()
231     };
232     /**
233      * @tc.steps: step1. set cloud db to proxy
234      * @tc.expected: step1. E_OK
235      */
236     CloudDBProxy proxy;
237     proxy.SetCloudDB(virtualCloudDb_);
238     /**
239      * @tc.steps: step2. insert data to cloud db
240      * @tc.expected: step2. OK
241      */
242     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
243     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
244     Info uploadInfo;
245     std::vector<VBucket> insert = expectRecords;
246     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), OK);
247     /**
248      * @tc.steps: step3. update data to cloud db
249      * @tc.expected: step3. E_OK
250      */
251     ModifyRecords(expectRecords);
252     std::vector<VBucket> update = expectRecords;
253     EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), OK);
254     /**
255      * @tc.steps: step3. proxy close cloud db
256      * @tc.expected: step3. E_OK
257      */
258     VBucket extend;
259     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
260     std::vector<VBucket> actualRecords;
261     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
262     ASSERT_EQ(actualRecords.size(), expectRecords.size());
263     for (size_t i = 0; i < actualRecords.size(); ++i) {
264         for (const auto &field: schema.fields) {
265             Type expect = expectRecords[i][field.colName];
266             Type actual = actualRecords[i][field.colName];
267             EXPECT_EQ(expect.index(), actual.index());
268         }
269     }
270     /**
271      * @tc.steps: step4. proxy close cloud db
272      * @tc.expected: step4. E_OK
273      */
274     EXPECT_EQ(proxy.Close(), E_OK);
275 }
276 
277 /**
278  * @tc.name: CloudDBProxyTest004
279  * @tc.desc: Verify cloud db heartbeat and lock function.
280  * @tc.type: FUNC
281  * @tc.require:
282  * @tc.author: zhangqiquan
283  */
284 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest004, TestSize.Level3)
285 {
286     /**
287      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
288      * @tc.expected: step1. E_OK
289      */
290     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
291     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
292     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
293     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
294     ASSERT_NE(cloudSyncer, nullptr);
295     cloudSyncer->SetCloudDB(virtualCloudDb_);
296     cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310402() 297     cloudSyncer->SetDownloadFunc([]() {
298         std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
299         return E_OK;
300     });
301     /**
302      * @tc.steps: step2. call sync and wait sync finish
303      * @tc.expected: step2. E_OK
304      */
305     std::mutex processMutex;
306     std::condition_variable cv;
307     SyncProcess syncProcess;
308     LOGI("[CloudDBProxyTest004] Call cloud sync");
__anon6a98ce310502(const std::map<std::string, SyncProcess> &process) 309     const auto callback = [&syncProcess, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
310         {
311             std::lock_guard<std::mutex> autoLock(processMutex);
312             if (process.size() >= 1u) {
313                 syncProcess = std::move(process.begin()->second);
314             } else {
315                 SyncProcess tmpProcess;
316                 syncProcess = tmpProcess;
317             }
318         }
319         cv.notify_all();
320     };
321     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
322     std::this_thread::sleep_for(std::chrono::seconds(1));
323     EXPECT_TRUE(virtualCloudDb_->GetLockStatus());
324     {
325         LOGI("[CloudDBProxyTest004] begin to wait sync");
326         std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon6a98ce310602() 327         cv.wait(uniqueLock, [&syncProcess]() {
328             return syncProcess.process == ProcessStatus::FINISHED;
329         });
330         LOGI("[CloudDBProxyTest004] end to wait sync");
331         EXPECT_EQ(syncProcess.errCode, OK);
332     }
333     /**
334      * @tc.steps: step3. get cloud lock status and heartbeat count
335      * @tc.expected: step3. cloud is unlock and more than twice heartbeat
336      */
337     EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
338     EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 2);
339     virtualCloudDb_->ClearHeartbeatCount();
340     cloudSyncer->Close();
341     RefObject::KillAndDecObjRef(cloudSyncer);
342 }
343 
344 /**
345  * @tc.name: CloudDBProxyTest005
346  * @tc.desc: Verify sync failed after cloud error.
347  * @tc.type: FUNC
348  * @tc.require:
349  * @tc.author: zhangqiquan
350  */
351 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
352 {
353     /**
354      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
355      * @tc.expected: step1. E_OK
356      */
357     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
358     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
359     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
360     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
361     ASSERT_NE(cloudSyncer, nullptr);
362     cloudSyncer->SetCloudDB(virtualCloudDb_);
363     cloudSyncer->SetSyncAction(false, false);
364     virtualCloudDb_->SetCloudError(true);
365     /**
366      * @tc.steps: step2. call sync and wait sync finish
367      * @tc.expected: step2. CLOUD_ERROR by lock error
368      */
369     int callCount = 0;
370     EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
371     /**
372      * @tc.steps: step3. get cloud lock status and heartbeat count
373      * @tc.expected: step3. cloud is unlock and no heartbeat
374      */
375     EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
376     EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
377     virtualCloudDb_->ClearHeartbeatCount();
378     cloudSyncer->Close();
379     RefObject::KillAndDecObjRef(cloudSyncer);
380 }
381 
382 /**
383  * @tc.name: CloudDBProxyTest006
384  * @tc.desc: Verify sync failed by heartbeat failed.
385  * @tc.type: FUNC
386  * @tc.require:
387  * @tc.author: zhangqiquan
388  */
389 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest006, TestSize.Level3)
390 {
391     /**
392      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
393      * @tc.expected: step1. E_OK
394      */
395     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
396     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
397     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
398     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
399     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
400     ASSERT_NE(cloudSyncer, nullptr);
401     cloudSyncer->SetCloudDB(virtualCloudDb_);
402     cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310702() 403     cloudSyncer->SetDownloadFunc([cloudSyncer]() {
404         std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
405         cloudSyncer->Notify(false);
406         return E_OK;
407     });
408     virtualCloudDb_->SetHeartbeatError(true);
409     /**
410      * @tc.steps: step2. call sync and wait sync finish
411      * @tc.expected: step2. sync failed by heartbeat error
412      */
413     int callCount = 0;
414     EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
415     RuntimeContext::GetInstance()->StopTaskPool();
416     EXPECT_EQ(callCount, 1);
417     /**
418      * @tc.steps: step3. get cloud lock status and heartbeat count
419      * @tc.expected: step3. cloud is unlock and twice heartbeat
420      */
421     EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
422     EXPECT_EQ(virtualCloudDb_->GetHeartbeatCount(), 2);
423     virtualCloudDb_->ClearHeartbeatCount();
424     cloudSyncer->Close();
425     RefObject::KillAndDecObjRef(cloudSyncer);
426 }
427 
428 /**
429  * @tc.name: CloudDBProxyTest007
430  * @tc.desc: Verify syncer close after notify finish.
431  * @tc.type: FUNC
432  * @tc.require:
433  * @tc.author: zhangqiquan
434  */
435 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest007, TestSize.Level4)
436 {
437     /**
438      * @tc.steps: step1. set cloud db to proxy
439      * @tc.expected: step1. E_OK
440      */
441     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
442     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
443     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
444     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
445     ASSERT_NE(cloudSyncer, nullptr);
446     cloudSyncer->SetCloudDB(virtualCloudDb_);
447     cloudSyncer->SetSyncAction(false, false);
448     /**
449      * @tc.steps: step2. call sync and wait sync finish
450      * @tc.expected: step2. notify before close finished
451      */
452     std::atomic<bool> close = false;
453     int callCount = 0;
454     std::mutex callMutex;
455     std::condition_variable cv;
456     const auto callback = [&close, &callCount, &callMutex, &cv](
__anon6a98ce310802( const std::map<std::string, SyncProcess> &) 457         const std::map<std::string, SyncProcess> &) {
458         std::this_thread::sleep_for(std::chrono::seconds(5)); // block notify 5s
459         {
460             std::lock_guard<std::mutex> autoLock(callMutex);
461             callCount++;
462         }
463         cv.notify_all();
464         EXPECT_EQ(close, false);
465     };
466     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
467     /**
468      * @tc.steps: step3. wait notify finished
469      */
470     std::this_thread::sleep_for(std::chrono::seconds(2)); // block 2s
471     cloudSyncer->Close();
472     close = true;
473     {
474         LOGI("begin to wait sync");
475         std::unique_lock<std::mutex> uniqueLock(callMutex);
__anon6a98ce310902() 476         cv.wait(uniqueLock, [&callCount]() {
477             return callCount > 0;
478         });
479         LOGI("end to wait sync");
480     }
481     RefObject::KillAndDecObjRef(cloudSyncer);
482 }
483 
484 /**
485  * @tc.name: CloudDBProxyTest008
486  * @tc.desc: Verify cloud db heartbeat with diff status.
487  * @tc.type: FUNC
488  * @tc.require:
489  * @tc.author: zhangqiquan
490  */
491 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
492 {
493     /**
494      * @tc.steps: step1. set cloud db to proxy
495      * @tc.expected: step1. E_OK
496      */
497     CloudDBProxy proxy;
498     proxy.SetCloudDB(virtualCloudDb_);
499     /**
500      * @tc.steps: step2. proxy heartbeat with diff status
501      */
502     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
503     int errCode = proxy.HeartBeat();
504     EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
505     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
506 
507     virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
508     errCode = proxy.HeartBeat();
509     EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
510     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
511 
512     virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
513     errCode = proxy.HeartBeat();
514     EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
515     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
516 
517     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
518     errCode = proxy.HeartBeat();
519     EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
520     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
521 
522     virtualCloudDb_->SetActionStatus(DB_ERROR);
523     errCode = proxy.HeartBeat();
524     EXPECT_EQ(errCode, -E_CLOUD_ERROR);
525     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
526 
527     /**
528      * @tc.steps: step3. proxy close cloud db
529      * @tc.expected: step3. E_OK
530      */
531     EXPECT_EQ(proxy.Close(), E_OK);
532 }
533 
534 /**
535  * @tc.name: CloudDBProxyTest009
536  * @tc.desc: Verify cloud db closed and current task exit .
537  * @tc.type: FUNC
538  * @tc.require:
539  * @tc.author: zhangqiquan
540  */
541 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
542 {
543     /**
544      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
545      * @tc.expected: step1. E_OK
546      */
547     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
548     ASSERT_NE(iCloud, nullptr);
549     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
550     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
551     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
552     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
553     ASSERT_NE(cloudSyncer, nullptr);
554     cloudSyncer->SetCloudDB(virtualCloudDb_);
555     cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310a02() 556     cloudSyncer->SetDownloadFunc([]() {
557         std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
558         return -E_CLOUD_ERROR;
559     });
560     /**
561      * @tc.steps: step2. call sync and wait sync finish
562      * @tc.expected: step2. E_OK
563      */
564     std::mutex processMutex;
565     bool finished = false;
566     std::condition_variable cv;
567     LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon6a98ce310b02(const std::map<std::string, SyncProcess> &process) 568     const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
569         {
570             std::lock_guard<std::mutex> autoLock(processMutex);
571             for (const auto &item: process) {
572                 if (item.second.process == DistributedDB::FINISHED) {
573                     finished = true;
574                     EXPECT_EQ(item.second.errCode, DB_CLOSED);
575                 }
576             }
577         }
578         cv.notify_all();
579     };
580     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
581     std::this_thread::sleep_for(std::chrono::seconds(1));
582     cloudSyncer->Close();
583     {
584         LOGI("[CloudDBProxyTest009] begin to wait sync");
585         std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon6a98ce310c02() 586         cv.wait(uniqueLock, [&finished]() {
587             return finished;
588         });
589         LOGI("[CloudDBProxyTest009] end to wait sync");
590     }
591     RefObject::KillAndDecObjRef(cloudSyncer);
592 }
593 
594 /**
595  * @tc.name: CloudDBProxyTest010
596  * @tc.desc: Verify cloud db lock with diff status.
597  * @tc.type: FUNC
598  * @tc.require:
599  * @tc.author: zhangqiquan
600  */
601 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
602 {
603     /**
604      * @tc.steps: step1. set cloud db to proxy
605      * @tc.expected: step1. E_OK
606      */
607     CloudDBProxy proxy;
608     proxy.SetCloudDB(virtualCloudDb_);
609     /**
610      * @tc.steps: step2. proxy lock with diff status
611      */
612     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
613     auto ret = proxy.Lock();
614     EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
615     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
616 
617     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
618     ret = proxy.Lock();
619     EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
620     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
621     /**
622      * @tc.steps: step3. proxy close cloud db
623      * @tc.expected: step3. E_OK
624      */
625     EXPECT_EQ(proxy.Close(), E_OK);
626 }
627 
628 /**
629  * @tc.name: CloudSyncQueue001
630  * @tc.desc: Verify sync task count decrease after sync finished.
631  * @tc.type: FUNC
632  * @tc.require:
633  * @tc.author: zhangqiquan
634  */
635 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
636 {
637     /**
638      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
639      * @tc.expected: step1. E_OK
640      */
641     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
642     ASSERT_NE(iCloud, nullptr);
643     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
644     ASSERT_NE(cloudSyncer, nullptr);
645     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
646     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
647     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
648     cloudSyncer->SetCloudDB(virtualCloudDb_);
649     cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310d02() 650     cloudSyncer->SetDownloadFunc([cloudSyncer]() {
651         EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
652         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
653         return E_OK;
654     });
655     /**
656      * @tc.steps: step2. call sync and wait sync finish
657      */
658     int callCount = 0;
659     EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
660     RuntimeContext::GetInstance()->StopTaskPool();
661     EXPECT_EQ(callCount, 1);
662 }
663 
664 /**
665  * @tc.name: CloudSyncerTest001
666  * @tc.desc: Verify syncer notify by queue schedule.
667  * @tc.type: FUNC
668  * @tc.require:
669  * @tc.author: zhangqiquan
670  */
671 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
672 {
673     /**
674      * @tc.steps: step1. set cloud db to proxy
675      * @tc.expected: step1. E_OK
676      */
677     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
678     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
679     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
680     EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
681     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
682     std::atomic<int> callCount = 0;
__anon6a98ce310e02(const std::map<std::string, SyncProcess> &) 683     cloudSyncer->SetCurrentTaskInfo([&callCount](const std::map<std::string, SyncProcess> &) {
684         callCount++;
685         int before = callCount;
686         LOGD("on callback %d", before);
687         std::this_thread::sleep_for(std::chrono::seconds(1));
688         EXPECT_EQ(before, callCount);
689     }, 1u);
690     const int notifyCount = 2;
691     for (int i = 0; i < notifyCount; ++i) {
692         cloudSyncer->Notify();
693     }
694     cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
695     cloudSyncer->Close();
696     RefObject::KillAndDecObjRef(cloudSyncer);
697 }
698 }