• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "db_base64_utils.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_virtual_device.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kvdb_manager.h"
26 #include "platform_specific.h"
27 #include "process_system_api_adapter_impl.h"
28 #include "sqlite_cloud_kv_executor_utils.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "virtual_cloud_db.h"
31 
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36 
37 namespace {
38 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
39 string g_testDir;
40 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
41 CloudSyncOption g_CloudSyncoption;
42 const std::string USER_ID_2 = "user2";
43 const std::string USER_ID_3 = "user3";
44 const int64_t WAIT_TIME = 5;
45 DistributedDBToolsUnitTest g_tool;
46 class DistributedDBCloudKvSyncerTest : public testing::Test {
47 public:
48     static void SetUpTestCase();
49     static void TearDownTestCase();
50     void SetUp();
51     void TearDown();
52 protected:
53     DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
54         bool invalidSchema = false);
55     void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
56     void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
57         DBStatus expectSyncResult = OK);
58     static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
59     void PutKvBatchDataAndSyncCloud(CloudSyncOption &syncOption);
60     void GetSingleStore();
61     void ReleaseSingleStore();
62     void BlockCompensatedSync(int &actSyncCnt, int expSyncCnt);
63     void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
64     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
65     std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
66     KvStoreConfig config_;
67     KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
68     KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
69     SyncProcess lastProcess_;
70     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
71     KvVirtualDevice *deviceB_ = nullptr;
72     SQLiteSingleVerNaturalStore *singleStore_ = nullptr;
73     std::mutex comSyncMutex;
74     std::condition_variable comSyncCv;
75 };
76 
SetUpTestCase()77 void DistributedDBCloudKvSyncerTest::SetUpTestCase()
78 {
79     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
80     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
81         LOGE("rm test db files error!");
82     }
83     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
84     g_CloudSyncoption.users.push_back(USER_ID);
85     g_CloudSyncoption.devices.push_back("cloud");
86 
87     string dir = g_testDir + "/single_ver";
88     DIR* dirTmp = opendir(dir.c_str());
89     if (dirTmp == nullptr) {
90         OS::MakeDBDirectory(dir);
91     } else {
92         closedir(dirTmp);
93     }
94 }
95 
TearDownTestCase()96 void DistributedDBCloudKvSyncerTest::TearDownTestCase()
97 {
98     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
99         LOGE("rm test db files error!");
100     }
101 }
102 
SetUp()103 void DistributedDBCloudKvSyncerTest::SetUp()
104 {
105     DistributedDBToolsUnitTest::PrintTestCaseInfo();
106     config_.dataDir = g_testDir;
107     /**
108      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
109      */
110     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
111     virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
112     g_mgr.SetKvStoreConfig(config_);
113     KvStoreNbDelegate::Option option1;
114     ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
115     // set aggregator after get store1, only store2 can sync with p2p
116     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
117     ASSERT_TRUE(communicatorAggregator_ != nullptr);
118     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
119     KvStoreNbDelegate::Option option2;
120     ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
121 
122     deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
123     ASSERT_TRUE(deviceB_ != nullptr);
124     auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
125     ASSERT_TRUE(syncInterfaceB != nullptr);
126     ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
127     GetSingleStore();
128 }
129 
TearDown()130 void DistributedDBCloudKvSyncerTest::TearDown()
131 {
132     ReleaseSingleStore();
133     CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
134     CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
135     virtualCloudDb_ = nullptr;
136     virtualCloudDb2_ = nullptr;
137     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138         LOGE("rm test db files error!");
139     }
140 
141     if (deviceB_ != nullptr) {
142         delete deviceB_;
143         deviceB_ = nullptr;
144     }
145 
146     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
147     communicatorAggregator_ = nullptr;
148     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
149 }
150 
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)151 void DistributedDBCloudKvSyncerTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus,
152     CloudSyncOption option, DBStatus expectSyncResult)
153 {
154     if (delegate == nullptr) {
155         return;
156     }
157     std::mutex dataMutex;
158     std::condition_variable cv;
159     bool finish = false;
160     SyncProcess last;
161     auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
162         SyncProcess> &process) {
163         size_t notifyCnt = 0;
164         for (const auto &item: process) {
165             LOGD("user = %s, status = %d, errCode = %d", item.first.c_str(), item.second.process, item.second.errCode);
166             if (item.second.process != DistributedDB::FINISHED) {
167                 continue;
168             }
169             EXPECT_EQ(item.second.errCode, expectDBStatus);
170             {
171                 std::lock_guard<std::mutex> autoLock(dataMutex);
172                 notifyCnt++;
173                 std::set<std::string> userSet(option.users.begin(), option.users.end());
174                 if (notifyCnt == userSet.size()) {
175                     finish = true;
176                     last = item.second;
177                     cv.notify_one();
178                 }
179             }
180         }
181     };
182     auto actualRet = delegate->Sync(option, callback);
183     EXPECT_EQ(actualRet, expectSyncResult);
184     if (actualRet == OK) {
185         std::unique_lock<std::mutex> uniqueLock(dataMutex);
186         cv.wait(uniqueLock, [&finish]() {
187             return finish;
188         });
189     }
190     lastProcess_ = last;
191 }
192 
GetDataBaseSchema(bool invalidSchema)193 DataBaseSchema DistributedDBCloudKvSyncerTest::GetDataBaseSchema(bool invalidSchema)
194 {
195     DataBaseSchema schema;
196     TableSchema tableSchema;
197     tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
198     Field field;
199     field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
200     field.type = TYPE_INDEX<std::string>;
201     field.primary = true;
202     tableSchema.fields.push_back(field);
203     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
204     field.primary = false;
205     tableSchema.fields.push_back(field);
206     field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
207     tableSchema.fields.push_back(field);
208     field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
209     tableSchema.fields.push_back(field);
210     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
211     field.type = TYPE_INDEX<int64_t>;
212     tableSchema.fields.push_back(field);
213     schema.tables.push_back(tableSchema);
214     return schema;
215 }
216 
GetSingleStore()217 void DistributedDBCloudKvSyncerTest::GetSingleStore()
218 {
219     KvDBProperties prop;
220     prop.SetStringProp(KvDBProperties::USER_ID, USER_ID);
221     prop.SetStringProp(KvDBProperties::APP_ID, APP_ID);
222     prop.SetStringProp(KvDBProperties::STORE_ID, STORE_ID_1);
223 
224     std::string hashIdentifier = DBCommon::TransferHashString(
225         DBCommon::GenerateIdentifierId(STORE_ID_1, APP_ID, USER_ID, "", 0));
226     prop.SetStringProp(DBProperties::IDENTIFIER_DATA, hashIdentifier);
227     prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::SINGLE_VER_TYPE_SQLITE);
228     int errCode = E_OK;
229     singleStore_ = static_cast<SQLiteSingleVerNaturalStore *>(KvDBManager::OpenDatabase(prop, errCode));
230     ASSERT_NE(singleStore_, nullptr);
231 }
232 
ReleaseSingleStore()233 void DistributedDBCloudKvSyncerTest::ReleaseSingleStore()
234 {
235     RefObject::DecObjRef(singleStore_);
236     singleStore_ = nullptr;
237 }
238 
BlockCompensatedSync(int & actSyncCnt,int expSyncCnt)239 void DistributedDBCloudKvSyncerTest::BlockCompensatedSync(int &actSyncCnt, int expSyncCnt)
240 {
241     {
242         std::unique_lock<std::mutex> lock(comSyncMutex);
243         bool result = comSyncCv.wait_for(lock, std::chrono::seconds(WAIT_TIME),
244             [&actSyncCnt, expSyncCnt]() { return actSyncCnt == expSyncCnt; });
245         ASSERT_EQ(result, true);
246     }
247 }
248 
249 
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)250 DBStatus DistributedDBCloudKvSyncerTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
251     KvStoreNbDelegate::Option option, bool invalidSchema)
252 {
253     DBStatus openRet = OK;
254     g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
255         openRet = status;
256         delegate = openDelegate;
257     });
258     EXPECT_EQ(openRet, OK);
259     EXPECT_NE(delegate, nullptr);
260 
261     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
262     cloudDbs[USER_ID] = virtualCloudDb_;
263     cloudDbs[USER_ID_2] = virtualCloudDb2_;
264     delegate->SetCloudDB(cloudDbs);
265     std::map<std::string, DataBaseSchema> schemas;
266     schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
267     schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
268     return delegate->SetCloudDbSchema(schemas);
269 }
270 
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)271 void DistributedDBCloudKvSyncerTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
272 {
273     if (delegate != nullptr) {
274         ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
275         delegate = nullptr;
276         DBStatus status = g_mgr.DeleteKvStore(storeId);
277         LOGD("delete kv store status %d store %s", status, storeId.c_str());
278         ASSERT_EQ(status, OK);
279     }
280 }
281 
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)282 void DistributedDBCloudKvSyncerTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
283 {
284     sqlite3 *db_;
285     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
286     std::string fileUrl = g_testDir + "/" \
287         "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
288     EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db_, flag, nullptr), SQLITE_OK);
289 
290     std::string sql = "SELECT count(*) FROM naturalbase_kv_aux_sync_data_log WHERE ";
291     switch (opType) {
292         case OpType::INSERT:
293             sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
294                 " cloud_gid != '' AND version !='' AND cloud_flag=cloud_flag|0x10";
295             break;
296         case OpType::UPDATE:
297             sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
298                 " cloud_gid == '' AND version =='' AND cloud_flag=cloud_flag|0x10";
299             break;
300         case OpType::DELETE:
301             sql += " cloud_gid == '' AND version ==''";
302             break;
303         default:
304             break;
305     }
306     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
307         reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
308     sqlite3_close_v2(db_);
309 }
310 
311 /**
312  * @tc.name: UploadAbnormalSync001
313  * @tc.desc: Test upload update record, cloud returned record not found.
314  * @tc.type: FUNC
315  * @tc.require:
316  * @tc.author: bty
317  */
318 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync001, TestSize.Level1)
319 {
320     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
321     ASSERT_NE(cloudHook, nullptr);
322 
323     /**
324      * @tc.steps:step1. Device A inserts data and synchronizes
325      * @tc.expected: step1 OK.
326      */
327     Key key = {'k'};
328     Value value = {'v'};
329     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
330     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
331 
332     /**
333      * @tc.steps:step2. Device A update data and synchronizes, cloud returned record not found
334      * @tc.expected: step2 OK.
335      */
336     Value value2 = {'x'};
337     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
338     int upIdx = 0;
__anon06e3d2c10602(const std::string &tableName, VBucket &extend) 339     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
340         LOGD("cloud db upload index:%d", ++upIdx);
341         if (upIdx == 1) { // 1 is index
342             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
343         }
344     });
345     int syncCnt = 0;
__anon06e3d2c10702null346     cloudHook->SetSyncFinishHook([&syncCnt, this] {
347         LOGD("sync finish times:%d", ++syncCnt);
348         if (syncCnt == 1) { // 1 is the first sync
349             CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
350         } else {
351             CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
352         }
353         comSyncCv.notify_all();
354     });
355     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
356     BlockCompensatedSync(syncCnt, 1); // 2 is sync times
357     virtualCloudDb_->ForkUpload(nullptr);
358     cloudHook->SetSyncFinishHook(nullptr);
359 }
360 
361 /**
362  * @tc.name: UploadAbnormalSync002
363  * @tc.desc: Test upload insert record, cloud returned record already existed.
364  * @tc.type: FUNC
365  * @tc.require:
366  * @tc.author: bty
367  */
368 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync002, TestSize.Level1)
369 {
370     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
371     ASSERT_NE(cloudHook, nullptr);
372 
373     /**
374      * @tc.steps:step1. Device A inserts k-v and synchronizes
375      * @tc.expected: step1 OK.
376      */
377     Key key = {'k'};
378     Value value = {'v'};
379     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
380     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
381 
382     /**
383      * @tc.steps:step2. Device A insert k2-v2 and synchronizes, Device B insert k2-v2 and sync before A upload
384      * @tc.expected: step2 OK.
385      */
386     Key key2 = {'x'};
387     Value value2 = {'y'};
388     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, value2), OK);
389     int upIdx = 0;
__anon06e3d2c10802(const std::string &tableName, VBucket &extend) 390     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
391         LOGD("cloud db upload index:%d", ++upIdx);
392         if (upIdx == 2) { // 2 is index
393             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
394         }
395     });
396     int doUpIdx = 0;
__anon06e3d2c10902null397     cloudHook->SetDoUploadHook([&doUpIdx, key2, value2, this] {
398         LOGD("begin upload index:%d", ++doUpIdx);
399         ASSERT_EQ(kvDelegatePtrS2_->Put(key2, value2), OK);
400         BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
401     });
402     int syncCnt = 0;
__anon06e3d2c10a02null403     cloudHook->SetSyncFinishHook([&syncCnt, this] {
404         LOGD("sync finish times:%d", ++syncCnt);
405         if (syncCnt == 1) { // 1 is the normal sync
406             CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
407         } else {
408             CheckUploadAbnormal(OpType::INSERT, 2L, true); // 2 is expected count
409         }
410         comSyncCv.notify_all();
411     });
412     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
413     BlockCompensatedSync(syncCnt, 2); // 2 is sync times
414     virtualCloudDb_->ForkUpload(nullptr);
415     cloudHook->SetSyncFinishHook(nullptr);
416     cloudHook->SetDoUploadHook(nullptr);
417 }
418 
419 /**
420  * @tc.name: UploadAbnormalSync003
421  * @tc.desc: Test upload delete record, cloud returned record not found.
422  * @tc.type: FUNC
423  * @tc.require:
424  * @tc.author: bty
425  */
426 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync003, TestSize.Level1)
427 {
428     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
429     ASSERT_NE(cloudHook, nullptr);
430 
431     /**
432      * @tc.steps:step1. Device A inserts data and synchronizes
433      * @tc.expected: step1 OK.
434      */
435     Key key = {'k'};
436     Value value = {'v'};
437     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
438     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
439     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
440 
441     /**
442      * @tc.steps:step2. Device A delete data and synchronizes, Device B delete data and sync before A upload
443      * @tc.expected: step2 OK.
444      */
445     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
446     int upIdx = 0;
__anon06e3d2c10b02(const std::string &tableName, VBucket &extend) 447     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
448         LOGD("cloud db upload index:%d", ++upIdx);
449         if (upIdx == 2) { // 2 is index
450             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
451         }
452     });
453     int doUpIdx = 0;
__anon06e3d2c10c02null454     cloudHook->SetDoUploadHook([&doUpIdx, key, this] {
455         LOGD("begin upload index:%d", ++doUpIdx);
456         ASSERT_EQ(kvDelegatePtrS2_->Delete(key), OK);
457         BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
458     });
459     int syncCnt = 0;
__anon06e3d2c10d02null460     cloudHook->SetSyncFinishHook([&syncCnt, this] {
461         LOGD("sync finish times:%d", ++syncCnt);
462         if (syncCnt == 1) { // 1 is the normal sync
463             CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
464         } else {
465             CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
466         }
467         comSyncCv.notify_all();
468     });
469     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
470     BlockCompensatedSync(syncCnt, 1); // 1 is sync times
471     virtualCloudDb_->ForkUpload(nullptr);
472     cloudHook->SetSyncFinishHook(nullptr);
473     cloudHook->SetDoUploadHook(nullptr);
474 }
475 
476 /**
477  * @tc.name: UploadAbnormalSync004
478  * @tc.desc: Test sync errCode is not in [27328512, 27394048).
479  * @tc.type: FUNC
480  * @tc.require:
481  * @tc.author: chenghuitao
482  */
483 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync004, TestSize.Level1)
484 {
485     /**
486      * @tc.steps:step1. Device A inserts data and synchronizes
487      * @tc.expected: step1 errCode outside DBStatus should be kept.
488      */
489     int errCode = 27394048; // an error not in [27328512, 27394048)
490     virtualCloudDb_->SetActionStatus(static_cast<DBStatus>(errCode));
491     Key key = {'k'};
492     Value value = {'v'};
493     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
494     BlockSync(kvDelegatePtrS1_, static_cast<DBStatus>(errCode), g_CloudSyncoption);
495     virtualCloudDb_->SetActionStatus(OK);
496 }
497 
498 /**
499  * @tc.name: UploadAbnormalSync005
500  * @tc.desc: Test sync return CLOUD_DISABLED
501  * @tc.type: FUNC
502  * @tc.require:
503  * @tc.author: liaoyonghuang
504  */
505 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync005, TestSize.Level1)
506 {
507     /**
508      * @tc.steps:step1. Put 1 record to cloud
509      * @tc.expected: step1 OK
510      */
511     EXPECT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
512     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
513     /**
514      * @tc.steps:step2. Set error code CLOUD_DISABLED when query cloud data, and sync
515      * @tc.expected: step2 return CLOUD_DISABLED
516      */
517     virtualCloudDb_->SetActionStatus(CLOUD_DISABLED);
518     BlockSync(kvDelegatePtrS2_, CLOUD_DISABLED, g_CloudSyncoption);
519 }
520 
521 /**
522  * @tc.name: QueryParsingProcessTest001
523  * @tc.desc: Test Query parsing process.
524  * @tc.type: FUNC
525  * @tc.require:
526  * @tc.author: luoguo
527  */
528 HWTEST_F(DistributedDBCloudKvSyncerTest, QueryParsingProcessTest001, TestSize.Level0)
529 {
530     auto cloudHook = (ICloudSyncStorageHook *)singleStore_->GetCloudKvStore();
531     ASSERT_NE(cloudHook, nullptr);
532 
533     /**
534      * @tc.steps:step1. Device A inserts data and synchronizes
535      * @tc.expected: step1 OK.
536      */
537     Key key = {'k'};
538     Value value = {'v'};
539     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
540 
541     /**
542      * @tc.steps:step2. Test Query parsing Process
543      * @tc.expected: step2 OK.
544      */
545     std::vector<QuerySyncObject> syncObject;
546     std::vector<VBucket> syncDataPk;
547     VBucket bucket;
548     bucket.insert_or_assign(std::string("k"), std::string("k"));
549     syncDataPk.push_back(bucket);
550     std::string tableName = "sync_data";
551     ASSERT_EQ(CloudStorageUtils::GetSyncQueryByPk(tableName, syncDataPk, true, syncObject), E_OK);
552 
553     Bytes bytes;
554     bytes.resize(syncObject[0].CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
555     Parcel parcel(bytes.data(), bytes.size());
556     ASSERT_EQ(syncObject[0].SerializeData(parcel, SOFTWARE_VERSION_CURRENT), E_OK);
557 
558     /**
559      * @tc.steps:step3. Check Node's type is QueryNodeType::IN.
560      * @tc.expected: step3 OK.
561      */
562     std::vector<QueryNode> queryNodes;
563     syncObject[0].ParserQueryNodes(bytes, queryNodes);
564     ASSERT_EQ(queryNodes[0].type, QueryNodeType::IN);
565 }
566 
567 /**
568  * @tc.name: UploadFinished001
569  * @tc.desc: Test upload update record when do update.
570  * @tc.type: FUNC
571  * @tc.require:
572  * @tc.author: zhangqiquan
573  */
574 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadFinished001, TestSize.Level1)
575 {
576     Key key = {'k'};
577     Value value = {'v'};
578     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
579     Value newValue = {'v', '1'};
580     // update [k,v] to [k,v1] when upload
__anon06e3d2c10e02(const std::string &, VBucket &) 581     virtualCloudDb_->ForkUpload([kvDelegatePtrS1 = kvDelegatePtrS1_, key, newValue](const std::string &, VBucket &) {
582         EXPECT_EQ(kvDelegatePtrS1->Put(key, newValue), OK);
583     });
584     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
585     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
586     Value actualValue;
587     // cloud download [k,v]
588     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
589     EXPECT_EQ(actualValue, value);
590     // sync again and get [k,v1]
591     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
592     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
593     EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
594     EXPECT_EQ(actualValue, newValue);
595     virtualCloudDb_->ForkUpload(nullptr);
596 }
597 
598 /**
599  * @tc.name: SyncWithMultipleUsers001.
600  * @tc.desc: Test sync data with multiple users.
601  * @tc.type: FUNC
602  * @tc.require:
603  * @tc.author: liufuchenxing
604  */
605 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers001, TestSize.Level1)
606 {
607     Key key = {'k'};
608     Value value = {'v'};
609     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
610     CloudSyncOption syncOption;
611     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
612     syncOption.users.push_back(USER_ID);
613     syncOption.users.push_back(USER_ID_2);
614     syncOption.devices.push_back("cloud");
615     BlockSync(kvDelegatePtrS1_, OK, syncOption);
616     BlockSync(kvDelegatePtrS2_, OK, syncOption);
617     Value actualValue;
618     // cloud download [k,v]
619     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
620     EXPECT_EQ(actualValue, value);
621 }
622 
PutKvBatchDataAndSyncCloud(CloudSyncOption & syncOption)623 void DistributedDBCloudKvSyncerTest::PutKvBatchDataAndSyncCloud(CloudSyncOption &syncOption)
624 {
625     std::vector<Entry> entries;
626     for (int i = 0; i < 200; i++) { // 200 is number of data
627         std::string keyStr = "k_" + std::to_string(i);
628         std::string valueStr = "v_" + std::to_string(i);
629         Key key(keyStr.begin(), keyStr.end());
630         Value value(valueStr.begin(), valueStr.end());
631         Entry entry;
632         entry.key = key;
633         entry.value = value;
634         entries.push_back(entry);
635     }
636 
637     ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
638     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
639     syncOption.users.push_back(USER_ID);
640     syncOption.users.push_back(USER_ID_2);
641     syncOption.devices.push_back("cloud");
642     BlockSync(kvDelegatePtrS1_, OK, syncOption);
643 }
644 
645 /**
646  * @tc.name: SyncWithMultipleUsers002.
647  * @tc.desc: test whether upload to the cloud after delete local data that does not have a gid.
648  * @tc.type: FUNC
649  * @tc.require:
650  * @tc.author: luoguo
651  */
652 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers002, TestSize.Level1)
653 {
654     /**
655      * @tc.steps: step1. kvDelegatePtrS1_ put 200 data and sync to cloud.
656      * @tc.expected: step1. return ok.
657      */
658     CloudSyncOption syncOption;
659     PutKvBatchDataAndSyncCloud(syncOption);
660 
661     /**
662      * @tc.steps: step2. kvDelegatePtrS2_ only sync user0 from cloud.
663      * @tc.expected: step2. return ok.
664      */
665     syncOption.users.clear();
666     syncOption.users.push_back(USER_ID);
667     BlockSync(kvDelegatePtrS2_, OK, syncOption);
668 
669     /**
670      * @tc.steps: step3. kvDelegatePtrS2_ delete 100 data.
671      * @tc.expected: step3. return ok.
672      */
673     std::vector<Key> keys;
674     for (int i = 0; i < 100; i++) {
675         std::string keyStr = "k_" + std::to_string(i);
676         Key key(keyStr.begin(), keyStr.end());
677         keys.push_back(key);
678     }
679     ASSERT_EQ(kvDelegatePtrS2_->DeleteBatch(keys), OK);
680 
681     /**
682      * @tc.steps: step4. kvDelegatePtrS2_ sync to cloud with user0 user2.
683      * @tc.expected: step4. return ok.
684      */
685     syncOption.users.clear();
686     syncOption.users.push_back(USER_ID);
687     syncOption.users.push_back(USER_ID_2);
688 
689     std::mutex dataMutex;
690     std::condition_variable cv;
691     bool finish = false;
692     uint32_t insertCount = 0;
693     auto callback = [&dataMutex, &syncOption, &finish, &insertCount, &cv](const std::map<std::string,
__anon06e3d2c10f02(const std::map<std::string, SyncProcess> &process) 694         SyncProcess> &process) {
695         size_t notifyCnt = 0;
696         for (const auto &item : process) {
697             LOGD("user = %s, status = %d, errCode=%d", item.first.c_str(), item.second.process, item.second.errCode);
698             if (item.second.process != DistributedDB::FINISHED) {
699                 continue;
700             }
701             EXPECT_EQ(item.second.errCode, OK);
702             {
703                 std::lock_guard<std::mutex> autoLock(dataMutex);
704                 notifyCnt++;
705                 std::set<std::string> userSet(syncOption.users.begin(), syncOption.users.end());
706                 if (notifyCnt == userSet.size()) {
707                     finish = true;
708                     std::map<std::string, TableProcessInfo> tableProcess(item.second.tableProcess);
709                     insertCount = tableProcess["sync_data"].downLoadInfo.insertCount;
710                     cv.notify_one();
711                 }
712             }
713         }
714     };
715     auto actualRet = kvDelegatePtrS2_->Sync(syncOption, callback);
716     EXPECT_EQ(actualRet, OK);
717     if (actualRet == OK) {
718         std::unique_lock<std::mutex> uniqueLock(dataMutex);
__anon06e3d2c11002() 719         cv.wait(uniqueLock, [&finish]() { return finish; });
720     }
721     /**
722      * @tc.steps: step5. check process info, user2's insertCount should be 0.
723      * @tc.expected: step5. return ok.
724      */
725     EXPECT_EQ(insertCount, 0u);
726 }
727 
728 /**
729  * @tc.name: SyncWithMultipleUsers003.
730  * @tc.desc: Test sync data with multiple users same key.
731  * @tc.type: FUNC
732  * @tc.require:
733  * @tc.author: wangxiangdong
734  */
735 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers003, TestSize.Level1)
736 {
737     /**
738      * @tc.steps: step1. put k v by user1.
739      * @tc.expected: step1. return ok.
740      */
741     Key key = {'k', '1'};
742     Value value = {'v', '1'};
743     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
744     CloudSyncOption syncOption;
745     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
746     syncOption.users.push_back(USER_ID);
747     syncOption.devices.push_back("cloud");
748     BlockSync(kvDelegatePtrS1_, OK, syncOption);
749     /**
750      * @tc.steps: step2. put k v2 by user1.
751      * @tc.expected: step2. return ok.
752      */
753     Value value2 = {'v', '2'};
754     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
755     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
756     syncOption.users.clear();
757     syncOption.users.push_back(USER_ID_2);
758     BlockSync(kvDelegatePtrS1_, OK, syncOption);
759     /**
760      * @tc.steps: step3. sync by user1.
761      * @tc.expected: step3. return ok.
762      */
763     syncOption.users.clear();
764     syncOption.users.push_back(USER_ID_2);
765     BlockSync(kvDelegatePtrS2_, OK, syncOption);
766     Value actualValue1;
767     /**
768      * @tc.steps: step4. sync by user2.
769      * @tc.expected: step4. return ok.
770      */
771     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
772     syncOption.users.clear();
773     syncOption.users.push_back(USER_ID);
774     BlockSync(kvDelegatePtrS2_, OK, syncOption);
775     Value actualValue2;
776     /**
777      * @tc.steps: step5. get k1.
778      * @tc.expected: step5. get v2.
779      */
780     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
781     EXPECT_EQ(actualValue2, value2);
782 }
783 
784 /**
785  * @tc.name: SyncWithMultipleUsers004.
786  * @tc.desc: Test sync data with same users.
787  * @tc.type: FUNC
788  * @tc.require:
789  * @tc.author: wangxiangdong
790  */
791 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers004, TestSize.Level1)
792 {
793     /**
794      * @tc.steps: step1. put k v by user1 and sync to cloud.
795      * @tc.expected: step1. return ok.
796      */
797     Key key = {'k', '1'};
798     Value value = {'v', '1'};
799     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
800     CloudSyncOption syncOption;
801     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
802     syncOption.users.push_back(USER_ID);
803     syncOption.devices.push_back("cloud");
804     BlockSync(kvDelegatePtrS1_, OK, syncOption);
805     /**
806      * @tc.steps: step2. data p2p to device B.
807      * @tc.expected: step2. return ok.
808      */
809     std::vector<std::string> devices;
810     devices.push_back(deviceB_->GetDeviceId());
811     Query query = Query::Select().PrefixKey(key);
812     std::map<std::string, DBStatus> result;
813     DBStatus status = g_tool.SyncTest(kvDelegatePtrS2_, devices, SYNC_MODE_PUSH_ONLY, result, query);
814     ASSERT_TRUE(status == OK);
815     /**
816      * @tc.steps: step3. sync by user1.
817      * @tc.expected: step3. return ok.
818      */
819     syncOption.users.clear();
820     syncOption.users.push_back(USER_ID);
821     BlockSync(kvDelegatePtrS2_, OK, syncOption);
822     /**
823      * @tc.steps: step4. get k1.
824      * @tc.expected: step4. get v2.
825      */
826     Value actualValue;
827     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
828     EXPECT_EQ(actualValue, value);
829 }
830 
831 /**
832  * @tc.name: AbnormalCloudKvExecutorTest001
833  * @tc.desc: Check SqliteCloudKvExecutorUtils interfaces abnormal scene.
834  * @tc.type: FUNC
835  * @tc.require:
836  * @tc.author: suyue
837  */
838 HWTEST_F(DistributedDBCloudKvSyncerTest, AbnormalCloudKvExecutorTest001, TestSize.Level1)
839 {
840     /**
841      * @tc.steps: step1. Call PutCloudData interface with different opType when para is invalid.
842      * @tc.expected: step1. return errCode.
843      */
844     SqliteCloudKvExecutorUtils cloudKvObj;
845     DownloadData downloadData;
846     downloadData.data = {{}};
847     int ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
848     EXPECT_EQ(ret, -E_CLOUD_ERROR);
849     downloadData.opType = {OpType::UPDATE_VERSION};
850     ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
851     EXPECT_EQ(ret, -E_CLOUD_ERROR);
852 
853     downloadData.opType = {OpType::UPDATE_TIMESTAMP};
854     ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
855     EXPECT_EQ(ret, -E_INVALID_DB);
856     downloadData.opType = {OpType::DELETE};
857     ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
858     EXPECT_EQ(ret, -E_INVALID_DB);
859 
860     /**
861      * @tc.steps: step2. Call CountAllCloudData interface when para is invalid.
862      * @tc.expected: step2. return -E_INVALID_ARGS.
863      */
864     QuerySyncObject querySyncObject;
865     std::pair<int, int64_t> res = cloudKvObj.CountAllCloudData({nullptr, true}, {}, "", true, querySyncObject);
866     EXPECT_EQ(res.first, -E_INVALID_ARGS);
867 
868     /**
869      * @tc.steps: step3. Call SqliteCloudKvExecutorUtils interfaces when db is nullptr.
870      * @tc.expected: step3. return -E_INVALID_DB.
871      */
872     res = cloudKvObj.CountCloudData(nullptr, true, 0, "", true);
873     EXPECT_EQ(res.first, -E_INVALID_DB);
874     std::pair<int, CloudSyncData> ver = cloudKvObj.GetLocalCloudVersion(nullptr, true, "");
875     EXPECT_EQ(ver.first, -E_INVALID_DB);
876     std::vector<VBucket> dataVector;
877     ret = cloudKvObj.GetCloudVersionFromCloud(nullptr, true, "", dataVector);
878     EXPECT_EQ(ret, -E_INVALID_DB);
879 }
880 
881 /**
882  * @tc.name: AbnormalCloudKvExecutorTest002
883  * @tc.desc: Check FillCloudLog interface
884  * @tc.type: FUNC
885  * @tc.require:
886  * @tc.author: suyue
887  */
888 HWTEST_F(DistributedDBCloudKvSyncerTest, AbnormalCloudKvExecutorTest002, TestSize.Level0)
889 {
890     /**
891      * @tc.steps: step1. Call FillCloudLog interface when db and para is nullptr.
892      * @tc.expected: step1. return -E_INVALID_ARGS.
893      */
894     SqliteCloudKvExecutorUtils cloudKvObj;
895     sqlite3 *db = nullptr;
896     CloudSyncData data;
897     CloudUploadRecorder recorder;
898     int ret = cloudKvObj.FillCloudLog({db, true}, OpType::INSERT, data, "", recorder);
899     EXPECT_EQ(ret, -E_INVALID_ARGS);
900 
901     /**
902      * @tc.steps: step2. open db and Call FillCloudLog.
903      * @tc.expected: step2. return -SQLITE_ERROR, because the table is not exist.
904      */
905     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
906     std::string fileUrl = g_testDir + "/test.db";
907     EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr), SQLITE_OK);
908     ASSERT_NE(db, nullptr);
909 
910     data.isCloudVersionRecord = true;
911     ret = cloudKvObj.FillCloudLog({db, true}, OpType::INSERT, data, "", recorder);
912     EXPECT_EQ(ret, -SQLITE_ERROR);
913     ret = cloudKvObj.FillCloudLog({db, true}, OpType::DELETE, data, "", recorder);
914     EXPECT_EQ(ret, -SQLITE_ERROR);
915     EXPECT_EQ(sqlite3_close_v2(db), E_OK);
916 }
917 
918 /**
919  * @tc.name: DeviceCollaborationTest001
920  * @tc.desc: Check force override data
921  * @tc.type: FUNC
922  * @tc.require:
923  * @tc.author: zqq
924  */
925 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest001, TestSize.Level1)
926 {
927     /**
928      * @tc.steps: step1. open db with DEVICE_COLLABORATION.
929      * @tc.expected: step1. return E_OK.
930      */
931     KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
932     KvStoreNbDelegate::Option option;
933     option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
934     ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
935     ASSERT_NE(kvDelegatePtrS3, nullptr);
936     KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
937     ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
938     ASSERT_NE(kvDelegatePtrS4, nullptr);
939     /**
940      * @tc.steps: step2. db3 put (k1,v1) sync to db4.
941      * @tc.expected: step2. db4 get (k1,v1).
942      */
943     Key key = {'k'};
944     Value value = {'v'};
945     EXPECT_EQ(kvDelegatePtrS3->Put(key, value), OK);
946     communicatorAggregator_->SetLocalDeviceId("DB3");
947     BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
948     communicatorAggregator_->SetLocalDeviceId("DB4");
949     BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
950     Value actualValue;
951     EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
952     EXPECT_EQ(actualValue, value);
953     /**
954      * @tc.steps: step3. db4 delete (k1,v1) db3 sync again to db4.
955      * @tc.expected: step3. db4 get (k1,v1).
956      */
957     EXPECT_EQ(kvDelegatePtrS4->Delete(key), OK);
958     communicatorAggregator_->SetLocalDeviceId("DB3");
959     EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
960     BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
961     communicatorAggregator_->SetLocalDeviceId("DB4");
962     BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
963     EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
964     EXPECT_EQ(actualValue, value);
965     CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
966     CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
967 }
968 
969 /**
970  * @tc.name: DeviceCollaborationTest002
971  * @tc.desc: Check concurrent removeDeviceData and sync
972  * @tc.type: FUNC
973  * @tc.require:
974  * @tc.author: wangxiangdong
975  */
976 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest002, TestSize.Level1)
977 {
978     /**
979      * @tc.steps: step1. open db with DEVICE_COLLABORATION.
980      * @tc.expected: step1. return E_OK.
981      */
982     KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
983     KvStoreNbDelegate::Option option;
984     option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
985     ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
986     ASSERT_NE(kvDelegatePtrS3, nullptr);
987     KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
988     ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
989     ASSERT_NE(kvDelegatePtrS4, nullptr);
990     /**
991      * @tc.steps: step2. db3 put 200 k-v data.
992      * @tc.expected: step2. OK.
993      */
994     std::vector<Entry> entries;
995     for (int i = 0; i < 200; i++) {
996         std::string keyStr = "k_" + std::to_string(i);
997         std::string valueStr = "v_" + std::to_string(i);
998         Key key(keyStr.begin(), keyStr.end());
999         Value value(valueStr.begin(), valueStr.end());
1000         Entry entry;
1001         entry.key = key;
1002         entry.value = value;
1003         entries.push_back(entry);
1004     }
1005 
1006     ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1007     communicatorAggregator_->SetLocalDeviceId("DB3");
1008     /**
1009      * @tc.steps: step3. when sync, try to removeDeviceData.
1010      * @tc.expected: step3. sync stop.
1011      */
__anon06e3d2c11102(const std::map<std::string, SyncProcess> &process) 1012     auto callback = [](const std::map<std::string, SyncProcess> &process) {
1013         LOGD("process finished");
1014     };
1015     auto actualRet = kvDelegatePtrS3->Sync(g_CloudSyncoption, callback);
1016     EXPECT_EQ(actualRet, OK);
1017     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait for 100ms
1018     EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1019     communicatorAggregator_->SetLocalDeviceId("DB4");
1020     BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
1021     std::string keyStr = "k_1";
1022     Key key(keyStr.begin(), keyStr.end());
1023     Value actualValue;
1024     EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), NOT_FOUND);
1025     CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
1026     CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
1027 }
1028 
1029 /**
1030  * @tc.name: DeviceCollaborationTest003
1031  * @tc.desc: Check concurrent removeDeviceData and sync
1032  * @tc.type: FUNC
1033  * @tc.require:
1034  * @tc.author: liaoyonghuang
1035  */
1036 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest003, TestSize.Level1)
1037 {
1038     /**
1039      * @tc.steps: step1. Block in sync
1040      * @tc.expected: step1. return E_OK.
1041      */
1042     ASSERT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
__anon06e3d2c11202(const std::string &, VBucket &) 1043     virtualCloudDb_->ForkUpload([](const std::string &, VBucket &) {
1044         std::this_thread::sleep_for(std::chrono::milliseconds(500));
1045     });
__anon06e3d2c11302() 1046     std::thread syncThread([&]() {
1047         BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1048     });
1049     /**
1050      * @tc.steps: step2. RemoveDeviceData concurrently
1051      * @tc.expected: step2. return E_OK.
1052      */
1053     std::this_thread::sleep_for(std::chrono::milliseconds(100));
__anon06e3d2c11402() 1054     std::thread removeThread1([&]() {
1055         EXPECT_EQ(kvDelegatePtrS1_->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1056     });
__anon06e3d2c11502() 1057     std::thread removeThread2([&]() {
1058         EXPECT_EQ(kvDelegatePtrS1_->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1059     });
1060     syncThread.join();
1061     removeThread1.join();
1062     removeThread2.join();
1063 }
1064 
1065 /**
1066  * @tc.name: SyncWithKvAndCloud001.
1067  * @tc.desc: Test sync data with same key and different sync way.
1068  * @tc.type: FUNC
1069  * @tc.require:
1070  * @tc.author: wangxiangdong
1071  */
1072 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithKvAndCloud001, TestSize.Level1)
1073 {
1074     /**
1075      * @tc.steps: step1. put k1 v1 by user1 and sync between devices.
1076      * @tc.expected: step1. return ok.
1077      */
1078     Key key = {'k', '1'};
1079     Value value = {'v', '1'};
1080     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1081     std::vector<std::string> devices;
1082     devices.push_back(deviceB_->GetDeviceId());
1083     Query query = Query::Select().PrefixKey(key);
1084     std::map<std::string, DBStatus> result;
1085     DBStatus status = g_tool.SyncTest(kvDelegatePtrS2_, devices, SYNC_MODE_PUSH_PULL, result, query);
1086     ASSERT_TRUE(status == OK);
1087 
1088     /**
1089      * @tc.steps: step2. insert k1 v2 and sync data to cloud.
1090      * @tc.expected: step2. return ok.
1091      */
1092     Value value2 = {'v', '2'};
1093     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
1094     CloudSyncOption syncOption;
1095     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
1096     syncOption.users.push_back(USER_ID);
1097     syncOption.devices.push_back("cloud");
1098     BlockSync(kvDelegatePtrS1_, OK, syncOption);
1099     /**
1100      * @tc.steps: step3. sync by device2.
1101      * @tc.expected: step3. return ok.
1102      */
1103     syncOption.users.clear();
1104     syncOption.users.push_back(USER_ID);
1105     BlockSync(kvDelegatePtrS2_, OK, syncOption);
1106     /**
1107      * @tc.steps: step4. get k1.
1108      * @tc.expected: step4. get v2.
1109      */
1110     Value actualValue;
1111     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
1112     EXPECT_EQ(actualValue, value2);
1113 }
1114 
1115 /**
1116  * @tc.name: ClearCloudWatermarkTest001.
1117  * @tc.desc: Test clear kv cloud watermark
1118  * @tc.type: FUNC
1119  * @tc.require:
1120  * @tc.author: liaoyonghuang
1121  */
1122 HWTEST_F(DistributedDBCloudKvSyncerTest, ClearCloudWatermarkTest001, TestSize.Level0)
1123 {
1124     /**
1125      * @tc.steps: step1. put (k1, v1)(k2, v2) and sync
1126      * @tc.expected: step1. return ok.
1127      */
1128     EXPECT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
1129     EXPECT_EQ(kvDelegatePtrS2_->Put(KEY_2, VALUE_2), OK);
1130     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1131     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1132     /**
1133      * @tc.steps: step1. clear kv cloud watermark.
1134      * @tc.expected: step1. return ok.
1135      */
1136     ClearKvMetaDataOption option;
1137     option.type = ClearKvMetaOpType::CLEAN_CLOUD_WATERMARK;
1138     EXPECT_EQ(kvDelegatePtrS1_->ClearMetaData(option), OK);
1139     EXPECT_EQ(kvDelegatePtrS2_->ClearMetaData(option), OK);
1140 }
1141 
1142 /**
1143  * @tc.name: UploadBlockTest001.
1144  * @tc.desc: Test upload block and put get data
1145  * @tc.type: FUNC
1146  * @tc.require:
1147  * @tc.author: zqq
1148  */
1149 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadBlockTest001, TestSize.Level0)
1150 {
1151     /**
1152      * @tc.steps: step1. put (k1, v1)
1153      * @tc.expected: step1. return ok.
1154      */
1155     EXPECT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
1156     /**
1157      * @tc.steps: step2. fork upload and sync
1158      * @tc.expected: step2. return ok.
1159      */
__anon06e3d2c11602(const std::string &, VBucket &) 1160     virtualCloudDb_->ForkUpload([this](const std::string &, VBucket &) {
1161         EXPECT_EQ(kvDelegatePtrS1_->Put(KEY_2, VALUE_2), OK);
1162         Value value;
1163         EXPECT_EQ(kvDelegatePtrS1_->Get(KEY_2, value), OK);
1164         EXPECT_EQ(value, VALUE_2);
1165     });
1166     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1167 }
1168 }