• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "cloud/cloud_db_constant.h"
19 #include "db_base64_utils.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_virtual_device.h"
23 #include "kv_store_nb_delegate.h"
24 #include "kv_store_nb_delegate_impl.h"
25 #include "platform_specific.h"
26 #include "process_system_api_adapter_impl.h"
27 #include "virtual_communicator_aggregator.h"
28 #include "virtual_cloud_db.h"
29 #include "sqlite_utils.h"
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 using namespace std;
34 
35 namespace {
36 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
37 string g_testDir;
38 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
39 CloudSyncOption g_CloudSyncoption;
40 const std::string USER_ID_2 = "user2";
41 const std::string USER_ID_3 = "user3";
42 const Key KEY_1 = {'k', '1'};
43 const Key KEY_2 = {'k', '2'};
44 const Key KEY_3 = {'k', '3'};
45 const Value VALUE_1 = {'v', '1'};
46 const Value VALUE_2 = {'v', '2'};
47 const Value VALUE_3 = {'v', '3'};
48 class DistributedDBCloudKvTest : public testing::Test {
49 public:
50     static void SetUpTestCase();
51     static void TearDownTestCase();
52     void SetUp();
53     void TearDown();
54     void InsertRecord(int num);
55     void SetDeviceId(const Key &key, const std::string &deviceId);
56     void SetFlag(const Key &key, LogInfoFlag flag);
57     int CheckFlag(const Key &key, LogInfoFlag flag);
58     int CheckLogTable(const std::string &deviceId);
59     int CheckWaterMark(const std::string &key);
60     int ChangeUserId(const std::string &deviceId, const std::string &wantUserId);
61     int ChangeHashKey(const std::string &deviceId);
62 protected:
63     DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
64         bool invalidSchema = false);
65     void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
66     void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
67         DBStatus expectSyncResult = OK);
68     void SyncAndGetProcessInfo(KvStoreNbDelegate *delegate, CloudSyncOption option);
69     bool CheckUserSyncInfo(const vector<std::string> users, const vector<DBStatus> userStatus,
70         const vector<Info> userExpectInfo);
71     static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
72     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
73     std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
74     KvStoreConfig config_;
75     KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
76     KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
77     SyncProcess lastProcess_;
78     std::map<std::string, SyncProcess> lastSyncProcess_;
79     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
80     KvVirtualDevice *deviceB_ = nullptr;
81 };
82 
SetUpTestCase()83 void DistributedDBCloudKvTest::SetUpTestCase()
84 {
85     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
86     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
87         LOGE("rm test db files error!");
88     }
89     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
90     g_CloudSyncoption.users.push_back(USER_ID);
91     g_CloudSyncoption.devices.push_back("cloud");
92 
93     string dir = g_testDir + "/single_ver";
94     DIR* dirTmp = opendir(dir.c_str());
95     if (dirTmp == nullptr) {
96         OS::MakeDBDirectory(dir);
97     } else {
98         closedir(dirTmp);
99     }
100 }
101 
TearDownTestCase()102 void DistributedDBCloudKvTest::TearDownTestCase()
103 {
104     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
105         LOGE("rm test db files error!");
106     }
107 }
108 
SetUp()109 void DistributedDBCloudKvTest::SetUp()
110 {
111     DistributedDBToolsUnitTest::PrintTestCaseInfo();
112     config_.dataDir = g_testDir;
113     /**
114      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
115      */
116     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
117     virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
118     g_mgr.SetKvStoreConfig(config_);
119     KvStoreNbDelegate::Option option1;
120     ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
121     // set aggregator after get store1, only store2 can sync with p2p
122     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
123     ASSERT_TRUE(communicatorAggregator_ != nullptr);
124     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
125     KvStoreNbDelegate::Option option2;
126     ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
127 
128     deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
129     ASSERT_TRUE(deviceB_ != nullptr);
130     auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
131     ASSERT_TRUE(syncInterfaceB != nullptr);
132     ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
133 }
134 
TearDown()135 void DistributedDBCloudKvTest::TearDown()
136 {
137     CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
138     CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
139     virtualCloudDb_ = nullptr;
140     virtualCloudDb2_ = nullptr;
141     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
142         LOGE("rm test db files error!");
143     }
144 
145     if (deviceB_ != nullptr) {
146         delete deviceB_;
147         deviceB_ = nullptr;
148     }
149 
150     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
151     communicatorAggregator_ = nullptr;
152     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
153 }
154 
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)155 void DistributedDBCloudKvTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
156     DBStatus expectSyncResult)
157 {
158     if (delegate == nullptr) {
159         return;
160     }
161     std::mutex dataMutex;
162     std::condition_variable cv;
163     bool finish = false;
164     SyncProcess last;
165     auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
166         SyncProcess> &process) {
167         size_t notifyCnt = 0;
168         for (const auto &item: process) {
169             LOGD("user = %s, status = %d", item.first.c_str(), item.second.process);
170             if (item.second.process != DistributedDB::FINISHED) {
171                 continue;
172             }
173             EXPECT_EQ(item.second.errCode, expectDBStatus);
174             {
175                 std::lock_guard<std::mutex> autoLock(dataMutex);
176                 notifyCnt++;
177                 std::set<std::string> userSet(option.users.begin(), option.users.end());
178                 if (notifyCnt == userSet.size()) {
179                     finish = true;
180                     last = item.second;
181                     cv.notify_one();
182                 }
183             }
184         }
185     };
186     auto actualRet = delegate->Sync(option, callback);
187     EXPECT_EQ(actualRet, expectSyncResult);
188     if (actualRet == OK) {
189         std::unique_lock<std::mutex> uniqueLock(dataMutex);
190         cv.wait(uniqueLock, [&finish]() {
191             return finish;
192         });
193     }
194     lastProcess_ = last;
195 }
196 
GetDataBaseSchema(bool invalidSchema)197 DataBaseSchema DistributedDBCloudKvTest::GetDataBaseSchema(bool invalidSchema)
198 {
199     DataBaseSchema schema;
200     TableSchema tableSchema;
201     tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
202     Field field;
203     field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
204     field.type = TYPE_INDEX<std::string>;
205     field.primary = true;
206     tableSchema.fields.push_back(field);
207     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
208     field.primary = false;
209     tableSchema.fields.push_back(field);
210     field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
211     tableSchema.fields.push_back(field);
212     field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
213     tableSchema.fields.push_back(field);
214     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
215     field.type = TYPE_INDEX<int64_t>;
216     tableSchema.fields.push_back(field);
217     schema.tables.push_back(tableSchema);
218     return schema;
219 }
220 
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)221 DBStatus DistributedDBCloudKvTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
222     KvStoreNbDelegate::Option option, bool invalidSchema)
223 {
224     DBStatus openRet = OK;
225     g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
226         openRet = status;
227         delegate = openDelegate;
228     });
229     EXPECT_EQ(openRet, OK);
230     EXPECT_NE(delegate, nullptr);
231 
232     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
233     cloudDbs[USER_ID] = virtualCloudDb_;
234     cloudDbs[USER_ID_2] = virtualCloudDb2_;
235     delegate->SetCloudDB(cloudDbs);
236     std::map<std::string, DataBaseSchema> schemas;
237     schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
238     schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
239     return delegate->SetCloudDbSchema(schemas);
240 }
241 
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)242 void DistributedDBCloudKvTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
243 {
244     if (delegate != nullptr) {
245         ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
246         delegate = nullptr;
247         DBStatus status = g_mgr.DeleteKvStore(storeId);
248         LOGD("delete kv store status %d store %s", status, storeId.c_str());
249         ASSERT_EQ(status, OK);
250     }
251 }
252 
SyncAndGetProcessInfo(KvStoreNbDelegate * delegate,CloudSyncOption option)253 void DistributedDBCloudKvTest::SyncAndGetProcessInfo(KvStoreNbDelegate *delegate, CloudSyncOption option)
254 {
255     if (delegate == nullptr) {
256         return;
257     }
258     std::mutex dataMutex;
259     std::condition_variable cv;
260     bool isFinish = false;
261     vector<std::map<std::string, SyncProcess>> lists;
262     auto callback = [&cv, &dataMutex, &isFinish, &option, &lists](const std::map<std::string, SyncProcess> &process) {
263         size_t notifyCnt = 0;
264         for (const auto &item: process) {
265             LOGD("user = %s, status = %d", item.first.c_str(), item.second.process);
266             if (item.second.process != DistributedDB::FINISHED) {
267                 continue;
268             }
269             {
270                 std::lock_guard<std::mutex> autoLock(dataMutex);
271                 notifyCnt++;
272                 std::set<std::string> userSet(option.users.begin(), option.users.end());
273                 if (notifyCnt == userSet.size()) {
274                     isFinish = true;
275                     cv.notify_one();
276                 }
277                 lists.push_back(process);
278             }
279         }
280     };
281     auto ret = delegate->Sync(option, callback);
282     EXPECT_EQ(ret, OK);
283     if (ret == OK) {
284         std::unique_lock<std::mutex> uniqueLock(dataMutex);
285         cv.wait(uniqueLock, [&isFinish]() {
286             return isFinish;
287         });
288     }
289     lastSyncProcess_ = lists.back();
290 }
291 
CheckUserSyncInfo(const vector<std::string> users,const vector<DBStatus> userStatus,const vector<Info> userExpectInfo)292 bool DistributedDBCloudKvTest::CheckUserSyncInfo(const vector<std::string> users, const vector<DBStatus> userStatus,
293     const vector<Info> userExpectInfo)
294 {
295     uint32_t idx = 0;
296     for (auto &it: lastSyncProcess_) {
297         if ((idx >= users.size()) || (idx >= userStatus.size()) || (idx >= userExpectInfo.size())) {
298             return false;
299         }
300         string user = it.first;
301         if (user.compare(0, user.length(), users[idx]) != 0) {
302             return false;
303         }
304         SyncProcess actualSyncProcess = it.second;
305         EXPECT_EQ(actualSyncProcess.process, FINISHED);
306         EXPECT_EQ(actualSyncProcess.errCode, userStatus[idx]);
307         for (const auto &table : actualSyncProcess.tableProcess) {
308             EXPECT_EQ(table.second.upLoadInfo.total, userExpectInfo[idx].total);
309             EXPECT_EQ(table.second.upLoadInfo.successCount, userExpectInfo[idx].successCount);
310             EXPECT_EQ(table.second.upLoadInfo.insertCount, userExpectInfo[idx].insertCount);
311             EXPECT_EQ(table.second.upLoadInfo.failCount, userExpectInfo[idx].failCount);
312         }
313         idx++;
314     }
315     return true;
316 }
317 
318 /**
319  * @tc.name: SubUser001
320  * @tc.desc: Test get and delete db with sub user.
321  * @tc.type: FUNC
322  * @tc.require:
323  * @tc.author: liaoyonghuang
324  */
325 HWTEST_F(DistributedDBCloudKvTest, SubUser001, TestSize.Level0)
326 {
327     /**
328      * @tc.steps: step1. Get db with subUser
329      * @tc.expected: step1.ok
330      */
331     KvStoreDelegateManager manager(APP_ID, USER_ID, "subUser");
332     KvStoreConfig subUserConfig = config_;
333     subUserConfig.dataDir = config_.dataDir + "/subUser";
334     OS::MakeDBDirectory(subUserConfig.dataDir);
335     manager.SetKvStoreConfig(subUserConfig);
336     KvStoreNbDelegate::Option option;
337     DBStatus openRet = OK;
338     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
339     manager.GetKvStore(STORE_ID_1, option,
__anon6df255ee0702(DBStatus status, KvStoreNbDelegate *openDelegate) 340         [&openRet, &kvDelegatePtrS3_](DBStatus status, KvStoreNbDelegate *openDelegate) {
341         openRet = status;
342         kvDelegatePtrS3_ = openDelegate;
343     });
344     EXPECT_EQ(openRet, OK);
345     ASSERT_NE(kvDelegatePtrS3_, nullptr);
346     /**
347      * @tc.steps: step2. close and delete db
348      * @tc.expected: step2.ok
349      */
350     ASSERT_EQ(manager.CloseKvStore(kvDelegatePtrS3_), OK);
351     DBStatus status = manager.DeleteKvStore(STORE_ID_1);
352     EXPECT_EQ(status, OK);
353 }
354 
355 /**
356  * @tc.name: NormalSync001
357  * @tc.desc: Test normal push sync for add data.
358  * @tc.type: FUNC
359  * @tc.require:
360  * @tc.author: zhangqiquan
361  */
362 HWTEST_F(DistributedDBCloudKvTest, NormalSync001, TestSize.Level0)
363 {
364     Key key = {'k'};
365     Value expectValue = {'v'};
366     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
__anon6df255ee0802(const std::string &origin) 367     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
368         LOGW("origin is %s", origin.c_str());
369         return origin + "1";
370     });
371     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
372     EXPECT_EQ(virtualCloudDb_->GetPrepareTraceId(), std::string(""));
373     for (const auto &table : lastProcess_.tableProcess) {
374         EXPECT_EQ(table.second.upLoadInfo.total, 1u);
375         EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
376     }
377     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
378     for (const auto &table : lastProcess_.tableProcess) {
379         EXPECT_EQ(table.second.downLoadInfo.total, 2u); // download 2 records
380         EXPECT_EQ(table.second.downLoadInfo.insertCount, 2u); // download 2 records
381     }
382     Value actualValue;
383     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
384     EXPECT_EQ(actualValue, expectValue);
385     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
386     auto result = kvDelegatePtrS2_->GetCloudVersion("");
387     EXPECT_EQ(result.first, OK);
388     for (const auto &item : result.second) {
389         EXPECT_EQ(item.second, "1");
390     }
391 }
392 
393 /**
394  * @tc.name: NormalSync002
395  * @tc.desc: Test normal push pull sync for add data.
396  * @tc.type: FUNC
397  * @tc.require:
398  * @tc.author: zhangqiquan
399  */
400 HWTEST_F(DistributedDBCloudKvTest, NormalSync002, TestSize.Level1)
401 {
402     /**
403      * @tc.steps: step1. store1 put (k1,v1) store2 put (k2,v2)
404      * @tc.expected: step1. both put ok
405      */
406     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anon6df255ee0902(const std::string &origin) 407     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
408         LOGW("origin is %s", origin.c_str());
409         return origin + "1";
410     });
__anon6df255ee0a02(const std::string &origin) 411     kvDelegatePtrS2_->SetGenCloudVersionCallback([](const std::string &origin) {
412         LOGW("origin is %s", origin.c_str());
413         return origin + "1";
414     });
415     Key key1 = {'k', '1'};
416     Value expectValue1 = {'v', '1'};
417     Key key2 = {'k', '2'};
418     Value expectValue2 = {'v', '2'};
419     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
420     ASSERT_EQ(kvDelegatePtrS2_->Put(key2, expectValue2), OK);
421     /**
422      * @tc.steps: step2. both store1 and store2 sync
423      * @tc.expected: step2. both sync ok, and store2 got (k1,v1) store1 not exist (k2,v2)
424      */
425     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
426     LOGW("Store1 sync end");
427     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
428     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
429     LOGW("Store2 sync end");
430     Value actualValue;
431     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
432     std::vector<Entry> entries;
433     EXPECT_EQ(kvDelegatePtrS2_->GetDeviceEntries(std::string("DEVICES_A"), entries), OK);
434     EXPECT_EQ(entries.size(), 1u); // 1 record
435     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
436     EXPECT_EQ(actualValue, expectValue1);
437     EXPECT_EQ(kvDelegatePtrS1_->Get(key2, actualValue), NOT_FOUND);
438     /**
439      * @tc.steps: step3. store1 sync again
440      * @tc.expected: step3. sync ok store1 got (k2,v2)
441      */
442     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
443     LOGW("Store1 sync end");
444     EXPECT_EQ(kvDelegatePtrS1_->Get(key2, actualValue), OK);
445     EXPECT_EQ(actualValue, expectValue2);
446     auto result = kvDelegatePtrS2_->GetCloudVersion("");
447     EXPECT_EQ(result.first, OK);
448     for (const auto &item : result.second) {
449         EXPECT_EQ(DBBase64Utils::DecodeIfNeed(item.first), item.first);
450     }
451     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
452     kvDelegatePtrS2_->SetGenCloudVersionCallback(nullptr);
453 }
454 
455 /**
456  * @tc.name: NormalSync003
457  * @tc.desc: Test normal pull sync for update data.
458  * @tc.type: FUNC
459  * @tc.require:
460  * @tc.author: zhangqiquan
461  */
462 HWTEST_F(DistributedDBCloudKvTest, NormalSync003, TestSize.Level1)
463 {
464     /**
465      * @tc.steps: step1. store1 put (k1,v1) store2 put (k1,v2)
466      * @tc.expected: step1. both put ok
467      */
468     Key key = {'k', '1'};
469     Value expectValue1 = {'v', '1'};
470     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
471     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep for 100ms
472     Value expectValue2 = {'v', '2'};
473     ASSERT_EQ(kvDelegatePtrS2_->Put(key, expectValue2), OK);
474     /**
475      * @tc.steps: step2. both store1 and store2 sync
476      * @tc.expected: step2. both sync ok and store2 got (k1,v2)
477      */
478     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
479     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
480     Value actualValue;
481     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
482     EXPECT_EQ(actualValue, expectValue2);
483     /**
484      * @tc.steps: step2. store1 sync again
485      * @tc.expected: step2. sync ok and store1 got (k1,v2)
486      */
487     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
488     EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
489     EXPECT_EQ(actualValue, expectValue2);
490 }
491 
492 /**
493  * @tc.name: NormalSync004
494  * @tc.desc: Test normal push sync for delete data.
495  * @tc.type: FUNC
496  * @tc.require:
497  * @tc.author: zhangqiquan
498  */
499 HWTEST_F(DistributedDBCloudKvTest, NormalSync004, TestSize.Level1)
500 {
501     /**
502      * @tc.steps: step1. store1 put (k1,v1) and both sync
503      * @tc.expected: step1. put ok and both sync ok
504      */
505     Key key = {'k'};
506     Value expectValue = {'v'};
507     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
508     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
509     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
510     Value actualValue;
511     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
512     EXPECT_EQ(actualValue, expectValue);
513     /**
514      * @tc.steps: step2. store1 delete (k1,v1) and both sync
515      * @tc.expected: step2. both put ok
516      */
517     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
518     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
519     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
520     actualValue.clear();
521     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), NOT_FOUND);
522     EXPECT_NE(actualValue, expectValue);
523 }
524 
525 /**
526  * @tc.name: NormalSync005
527  * @tc.desc: Test normal push sync for add data.
528  * @tc.type: FUNC
529  * @tc.require:
530  * @tc.author: zhangqiquan
531  */
532 HWTEST_F(DistributedDBCloudKvTest, NormalSync005, TestSize.Level1)
533 {
534     for (int i = 0; i < 60; ++i) { // sync 60 records
535         Key key = {'k'};
536         Value expectValue = {'v'};
537         key.push_back(static_cast<uint8_t>(i));
538         expectValue.push_back(static_cast<uint8_t>(i));
539         ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
540     }
541     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
542     for (const auto &process : lastProcess_.tableProcess) {
543         EXPECT_EQ(process.second.upLoadInfo.insertCount, 60u); // sync 60 records
544     }
545     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
546     for (const auto &process : lastProcess_.tableProcess) {
547         EXPECT_EQ(process.second.downLoadInfo.insertCount, 60u); // sync 60 records
548     }
549 }
550 
551 /**
552  * @tc.name: NormalSync006
553  * @tc.desc: Test normal push sync with insert delete update.
554  * @tc.type: FUNC
555  * @tc.require:
556  * @tc.author: zhangqiquan
557  */
558 HWTEST_F(DistributedDBCloudKvTest, NormalSync006, TestSize.Level1)
559 {
560     Key k1 = {'k', '1'};
561     Key k2 = {'k', '2'};
562     Value v1 = {'v', '1'};
563     Value v2 = {'v', '2'};
564     Value v3 = {'v', '3'};
565     ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
566     ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v2), OK);
567     ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v3), OK);
568     ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
569     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
570     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
571     Value actualValue;
572     EXPECT_EQ(kvDelegatePtrS2_->Get(k1, actualValue), NOT_FOUND);
573     EXPECT_EQ(kvDelegatePtrS2_->Get(k2, actualValue), OK);
574     EXPECT_EQ(actualValue, v3);
575 }
576 
577 /**
578  * @tc.name: NormalSync007
579  * @tc.desc: Test normal push sync with download and upload.
580  * @tc.type: FUNC
581  * @tc.require:
582  * @tc.author: zhangqiquan
583  */
584 HWTEST_F(DistributedDBCloudKvTest, NormalSync007, TestSize.Level1)
585 {
586     Key k1 = {'k', '1'};
587     Key k2 = {'k', '2'};
588     Key k3 = {'k', '3'};
589     Key k4 = {'k', '4'};
590     Value v1 = {'v', '1'};
591     Value v2 = {'v', '2'};
592     Value v3 = {'v', '3'};
593     ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v1), OK);
594     ASSERT_EQ(kvDelegatePtrS2_->Put(k2, v1), OK);
595     ASSERT_EQ(kvDelegatePtrS2_->Put(k3, v1), OK);
596     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
597     ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v2), OK);
598     ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v2), OK);
599     ASSERT_EQ(kvDelegatePtrS1_->Put(k4, v2), OK);
600     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
601     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
602     ASSERT_EQ(kvDelegatePtrS2_->Put(k4, v3), OK);
603     ASSERT_EQ(kvDelegatePtrS1_->Delete(k2), OK);
604     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
605 }
606 
607 /**
608  * @tc.name: NormalSync008
609  * @tc.desc: Test complex sync.
610  * @tc.type: FUNC
611  * @tc.require:
612  * @tc.author: zhangqiquan
613  */
614 HWTEST_F(DistributedDBCloudKvTest, NormalSync008, TestSize.Level1)
615 {
616     Key k1 = {'k', '1'};
617     Value v1 = {'v', '1'};
618     deviceB_->PutData(k1, v1, 1u, 0); // 1 is current timestamp
619     deviceB_->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true);
620     Value actualValue;
621     EXPECT_EQ(kvDelegatePtrS2_->Get(k1, actualValue), OK);
622     EXPECT_EQ(actualValue, v1);
623     CloudSyncOption option;
624     option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
625     option.users.push_back(USER_ID);
626     option.devices.push_back("cloud");
627     BlockSync(kvDelegatePtrS2_, OK, option);
628     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
629     EXPECT_EQ(kvDelegatePtrS1_->Get(k1, actualValue), NOT_FOUND);
630 }
631 
632 /**
633  * @tc.name: NormalSync009
634  * @tc.desc: Test normal push sync with download and upload.
635  * @tc.type: FUNC
636  * @tc.require:
637  * @tc.author: zhangqiquan
638  */
639 HWTEST_F(DistributedDBCloudKvTest, NormalSync009, TestSize.Level1)
640 {
641     Key k1 = {'k', '1'};
642     Key k2 = {'k', '2'};
643     Key k3 = {'k', '3'};
644     Value v1 = {'v', '1'};
645     Value v2 = {'v', '2'};
646     Value v3 = {'v', '3'};
647     ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
648     ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v1), OK);
649     ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
650     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
651     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
652     ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v2), OK);
653     ASSERT_EQ(kvDelegatePtrS2_->Put(k3, v2), OK);
654     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
655     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
656     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
657 }
658 
659 /**
660  * @tc.name: NormalSync010
661  * @tc.desc: Test normal push sync for add data with different user.
662  * @tc.type: FUNC
663  * @tc.require:
664  * @tc.author: zhangshijie
665  */
666 HWTEST_F(DistributedDBCloudKvTest, NormalSync010, TestSize.Level1)
667 {
668     // add <k1, v1>, sync to cloud with user1
669     Key key1 = {'k', '1'};
670     Value expectValue1 = {'v', '1'};
671     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
672     CloudSyncOption option;
673     option.users.push_back(USER_ID);
674     option.devices.push_back("cloud");
675     BlockSync(kvDelegatePtrS1_, OK, option);
676     for (const auto &table : lastProcess_.tableProcess) {
677         EXPECT_EQ(table.second.upLoadInfo.total, 1u);
678     }
679 
680     // add <k2, v2>, sync to cloud with user2
681     Key key2 = {'k', '2'};
682     Value expectValue2 = {'v', '2'};
683     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
684     option.users.clear();
685     option.users.push_back(USER_ID_2);
686     BlockSync(kvDelegatePtrS1_, OK, option);
687     for (const auto &table : lastProcess_.tableProcess) {
688         EXPECT_EQ(table.second.upLoadInfo.total, 2u);
689     }
690 
691     option.users.clear();
692     option.users.push_back(USER_ID);
693     option.users.push_back(USER_ID_2);
694     BlockSync(kvDelegatePtrS2_, OK, option);
695     for (const auto &table : lastProcess_.tableProcess) {
696         EXPECT_EQ(table.second.downLoadInfo.total, 2u);
697     }
698     Value actualValue;
699     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
700     EXPECT_EQ(actualValue, expectValue1);
701     Value actualValue2;
702     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
703     EXPECT_EQ(actualValue2, expectValue2);
704 }
705 
706 /**
707  * @tc.name: NormalSync011
708  * @tc.desc: Do not synchronize when security label is S4.
709  * @tc.type: FUNC
710  * @tc.require:
711  * @tc.author: liaoyonghuang
712  */
713 HWTEST_F(DistributedDBCloudKvTest, NormalSync011, TestSize.Level1)
714 {
715     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
716     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
717     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
718 
719     KvStoreNbDelegate::Option option;
720     option.secOption.securityLabel = S4;
721     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
722     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
723     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
724     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, SECURITY_OPTION_CHECK_ERROR);
725     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
726 }
727 
728 /**
729  * @tc.name: NormalSync012
730  * @tc.desc: Test normal push sync with memory db.
731  * @tc.type: FUNC
732  * @tc.require:
733  * @tc.author: zhangqiquan
734  */
735 HWTEST_F(DistributedDBCloudKvTest, NormalSync012, TestSize.Level1)
736 {
737     KvStoreNbDelegate *memoryDB1 = nullptr;
738     KvStoreNbDelegate::Option option1;
739     option1.isMemoryDb = true;
740     GetKvStore(memoryDB1, STORE_ID_3, option1);
741     ASSERT_NE(memoryDB1, nullptr);
742     KvStoreNbDelegate *memoryDB2 = nullptr;
743     KvStoreNbDelegate::Option option2;
744     option2.isMemoryDb = true;
745     GetKvStore(memoryDB2, STORE_ID_4, option2);
746     EXPECT_NE(memoryDB2, nullptr);
747     Key key1 = {'k', '1'};
748     Value expectValue1 = {'v', '1'};
749     EXPECT_EQ(memoryDB1->Put(key1, expectValue1), OK);
750     BlockSync(memoryDB1, OK, g_CloudSyncoption);
751     BlockSync(memoryDB2, OK, g_CloudSyncoption);
752     EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
753     EXPECT_EQ(g_mgr.CloseKvStore(memoryDB2), OK);
754 }
755 
756 /**
757  * @tc.name: NormalSync013
758  * @tc.desc: Test the wrong schema.
759  * @tc.type: FUNC
760  * @tc.require:
761  * @tc.author: liaoyonghuang
762  */
763 HWTEST_F(DistributedDBCloudKvTest, NormalSync013, TestSize.Level1)
764 {
765     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
766     KvStoreNbDelegate::Option option;
767     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option, true), INVALID_SCHEMA);
768     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
769 }
770 
771 /**
772  * @tc.name: NormalSync014
773  * @tc.desc: Test sync after user change.
774  * @tc.type: FUNC
775  * @tc.require:
776  * @tc.author: liaoyonghuang
777  */
778 HWTEST_F(DistributedDBCloudKvTest, NormalSync014, TestSize.Level1)
779 {
780     /**
781      * @tc.steps: step1. kvDelegatePtrS1_ put and sync data (k1, v1)
782      * @tc.expected: step1.ok
783      */
784     g_mgr.SetSyncActivationCheckCallback([] (const std::string &userId, const std::string &appId,
__anon6df255ee0b02(const std::string &userId, const std::string &appId, const std::string &storeId)785         const std::string &storeId)-> bool {
786         return true;
787     });
788     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
789     KvStoreNbDelegate::Option option;
790     option.syncDualTupleMode = true;
791     GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option);
792     Key key = {'k', '1'};
793     Value value = {'v', '1'};
794     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
795     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
796     /**
797      * @tc.steps: step2. Set sync block time 2s, and change user in sync block time
798      * @tc.expected: step2. Sync return  USER_CHANGED.
799      */
800     virtualCloudDb_->SetBlockTime(2000); // 2000ms
__anon6df255ee0c02() 801     std::thread thread([&]() {
802         std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // sleep for 1000ms
803         g_mgr.SetSyncActivationCheckCallback([] (const std::string &userId, const std::string &appId,
804             const std::string &storeId)-> bool {
805             return false;
806         });
807         RuntimeContext::GetInstance()->NotifyUserChanged();
808     });
809     BlockSync(kvDelegatePtrS3_, USER_CHANGED, g_CloudSyncoption);
810     thread.join();
811     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
812     g_mgr.SetSyncActivationCheckCallback(nullptr);
813 }
814 
815 /**
816  * @tc.name: NormalSync015
817  * @tc.desc: Test sync in all process.
818  * @tc.type: FUNC
819  * @tc.require:
820  * @tc.author: zhangqiquan
821  */
822 HWTEST_F(DistributedDBCloudKvTest, NormalSync015, TestSize.Level1)
823 {
824     Key key = {'k'};
825     Value expectValue = {'v'};
826     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
827     auto option = g_CloudSyncoption;
828     auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
829         | static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD);
830     option.lockAction = static_cast<LockAction>(action);
831     BlockSync(kvDelegatePtrS1_, OK, option);
832     for (const auto &table : lastProcess_.tableProcess) {
833         EXPECT_EQ(table.second.upLoadInfo.total, 1u);
834     }
835     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
836     Value actualValue;
837     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
838     EXPECT_EQ(actualValue, expectValue);
839 }
840 
841 /**
842  * @tc.name: NormalSync016
843  * @tc.desc: Device A and device B have the same key data,
844  *           and then devices B and A perform cloud synchronization sequentially.
845  *           Finally, device A updates the data and performs cloud synchronization.
846  *           Test if there is new data inserted into the cloud database.
847  * @tc.type: FUNC
848  * @tc.require:
849  * @tc.author: liaoyonghuang
850  */
851 HWTEST_F(DistributedDBCloudKvTest, NormalSync016, TestSize.Level1)
852 {
853     Key key = {'k', '1'};
854     Value value1 = {'v', '1'};
855     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value1), OK);
856     Value value2 = {'v', '2'};
857     ASSERT_EQ(kvDelegatePtrS2_->Put(key, value2), OK);
858     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
859     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
860 
861     Value value3 = {'v', '3'};
862     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value3), OK);
__anon6df255ee0e02(VBucket &record) 863     virtualCloudDb_->SetInsertHook([](VBucket &record) {
864         for (auto &recordData : record) {
865             std::string insertKey = "key";
866             Type insertValue = "k1";
867             EXPECT_FALSE(recordData.first == insertKey && recordData.second == insertValue);
868         }
869     });
870     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
871     virtualCloudDb_->SetInsertHook(nullptr);
872 }
873 
874 /**
875  * @tc.name: NormalSync017
876  * @tc.desc: Test duplicate addition, deletion, and sync.
877  * @tc.type: FUNC
878  * @tc.require:
879  * @tc.author: liaoyonghuang
880  */
881 HWTEST_F(DistributedDBCloudKvTest, NormalSync017, TestSize.Level1)
882 {
883     Key key = {'k'};
884     Value value = {'v'};
885     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
886     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
887     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
888     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
889     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
890     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
891 }
892 
893 /**
894  * @tc.name: NormalSync018
895  * @tc.desc: Test putBatch and sync with memory db.
896  * @tc.type: FUNC
897  * @tc.require:
898  * @tc.author: liaoyonghuang
899  */
900 HWTEST_F(DistributedDBCloudKvTest, NormalSync018, TestSize.Level1)
901 {
902     /**
903      * @tc.steps:step1. Get two Memory DB.
904      * @tc.expected: step1 OK.
905      */
906     KvStoreNbDelegate *memoryDB1 = nullptr;
907     KvStoreNbDelegate::Option option1;
908     option1.isMemoryDb = true;
909     GetKvStore(memoryDB1, STORE_ID_3, option1);
910     ASSERT_NE(memoryDB1, nullptr);
911     KvStoreNbDelegate *memoryDB2 = nullptr;
912     KvStoreNbDelegate::Option option2;
913     option2.isMemoryDb = true;
914     GetKvStore(memoryDB2, STORE_ID_4, option2);
915     EXPECT_NE(memoryDB2, nullptr);
916 
917     /**
918      * @tc.steps:step2. put 301 records and sync to cloud.
919      * @tc.expected: step2 OK.
920      */
921     vector<Entry> entries;
922     int count = 301; // put 301 records.
923     for (int i = 0; i < count; i++) {
924         std::string keyStr = "k_" + std::to_string(i);
925         std::string valueStr = "v_" + std::to_string(i);
926         Key key(keyStr.begin(), keyStr.end());
927         Value value(valueStr.begin(), valueStr.end());
928         entries.push_back({key, value});
929     }
930     EXPECT_EQ(memoryDB1->PutBatch(entries), OK);
931     BlockSync(memoryDB1, OK, g_CloudSyncoption);
932 
933     /**
934      * @tc.steps:step3. Sync from cloud and check values.
935      * @tc.expected: step3 OK.
936      */
937     BlockSync(memoryDB2, OK, g_CloudSyncoption);
938     for (int i = 0; i < count; i++) {
939         std::string keyStr = "k_" + std::to_string(i);
940         std::string valueStr = "v_" + std::to_string(i);
941         Key key(keyStr.begin(), keyStr.end());
942         Value expectValue(valueStr.begin(), valueStr.end());
943         Value actualValue;
944         EXPECT_EQ(memoryDB2->Get(key, actualValue), OK);
945         EXPECT_EQ(actualValue, expectValue);
946     }
947     EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
948     EXPECT_EQ(g_mgr.CloseKvStore(memoryDB2), OK);
949 }
950 
951 /**
952  * @tc.name: NormalSync019
953  * @tc.desc: Test dataItem has same time.
954  * @tc.type: FUNC
955  * @tc.require:
956  * @tc.author: zhangqiquan
957  */
958 HWTEST_F(DistributedDBCloudKvTest, NormalSync019, TestSize.Level1)
959 {
960     Key k1 = {'k', '1'};
961     Value v1 = {'v', '1'};
962     ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v1), OK);
963     deviceB_->Sync(SyncMode::SYNC_MODE_PULL_ONLY, true);
964 
965     VirtualDataItem dataItem;
966     deviceB_->GetData(k1, dataItem);
967     EXPECT_EQ(dataItem.timestamp, dataItem.writeTimestamp);
968 }
969 
970 /**
971  * @tc.name: NormalSync020
972  * @tc.desc: Test sync with two users.
973  * @tc.type: FUNC
974  * @tc.require:
975  * @tc.author: liaoyonghuang
976  */
977 HWTEST_F(DistributedDBCloudKvTest, NormalSync020, TestSize.Level1)
978 {
979     /**
980      * @tc.steps:step1. Inserts a piece of data.
981      * @tc.expected: step1 OK.
982      */
983     Key k1 = {'k', '1'};
984     Value v1 = {'v', '1'};
985     ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
986     /**
987      * @tc.steps:step2. sync with two users.
988      * @tc.expected: step2 OK.
989      */
990     CloudSyncOption option;
991     option.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
992     option.users.push_back(USER_ID);
993     option.users.push_back(USER_ID_2);
994     option.devices.push_back("cloud");
995     BlockSync(kvDelegatePtrS1_, OK, option);
996     /**
997      * @tc.steps:step3. Check upLoadInfo.batchIndex of two users.
998      * @tc.expected: Both users have a upLoadInfo.batchIndex of 1.
999      */
1000     for (const auto &table : lastProcess_.tableProcess) {
1001         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 1u);
1002     }
1003 }
1004 
1005 /**
1006  * @tc.name: NormalSync021
1007  * @tc.desc: Test Get Func to get cloudVersion.
1008  * @tc.type: FUNC
1009  * @tc.require:
1010  * @tc.author: caihaoting
1011  */
1012 HWTEST_F(DistributedDBCloudKvTest, NormalSync021, TestSize.Level1)
1013 {
1014     /**
1015      * @tc.steps:step1. store2 GetCloudVersion.
1016      * @tc.expected: step1 OK.
1017      */
1018     Key key = {'k'};
1019     Value expectValue = {'v'};
1020     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
__anon6df255ee0f02(const std::string &origin) 1021     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1022         LOGW("origin is %s", origin.c_str());
1023         return origin + "1";
1024     });
1025     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1026     for (const auto &table : lastProcess_.tableProcess) {
1027         EXPECT_EQ(table.second.upLoadInfo.total, 1u);
1028     }
1029     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1030     Value actualValue;
1031     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
1032     EXPECT_EQ(actualValue, expectValue);
1033     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1034     auto result = kvDelegatePtrS2_->GetCloudVersion("");
1035     EXPECT_EQ(result.first, OK);
1036     for (auto item : result.second) {
1037         EXPECT_EQ(item.second, "1");
1038     }
1039     /**
1040      * @tc.steps:step2. store2 GetCloudVersion.
1041      * @tc.expected: step2 NOT_FOUND.
1042      */
1043     Key keyB;
1044     Value actualValueB;
1045     std::string deviceB = DBCommon::TransferStringToHex(DBCommon::TransferHashString("DEVICE_B"));
1046     std::string versionDeviceBStr = "naturalbase_cloud_version_" + deviceB;
1047     const char *buffer = versionDeviceBStr.c_str();
1048     for (uint32_t i = 0; i < versionDeviceBStr.size(); i++) {
1049         keyB.emplace_back(buffer[i]);
1050     }
1051     EXPECT_EQ(kvDelegatePtrS2_->Get(keyB, actualValueB), NOT_FOUND);
1052 }
1053 
1054 /**
1055  * @tc.name: NormalSync022
1056  * @tc.desc: Test Cloud sync without schema.
1057  * @tc.type: FUNC
1058  * @tc.require:
1059  * @tc.author: zhangqiquan
1060  */
1061 HWTEST_F(DistributedDBCloudKvTest, NormalSync022, TestSize.Level0)
1062 {
1063     /**
1064      * @tc.steps:step1. Get Memory DB.
1065      * @tc.expected: step1 OK.
1066      */
1067     KvStoreNbDelegate *memoryDB1 = nullptr;
1068     KvStoreNbDelegate::Option option;
1069     option.isMemoryDb = true;
1070     DBStatus openRet = OK;
__anon6df255ee1002(DBStatus status, KvStoreNbDelegate *openDelegate) 1071     g_mgr.GetKvStore(STORE_ID_4, option, [&openRet, &memoryDB1](DBStatus status, KvStoreNbDelegate *openDelegate) {
1072         openRet = status;
1073         memoryDB1 = openDelegate;
1074     });
1075     EXPECT_EQ(openRet, OK);
1076     ASSERT_NE(memoryDB1, nullptr);
1077     /**
1078      * @tc.steps:step2. Sync without cloud schema.
1079      * @tc.expected: step2 SCHEMA_MISMATCH.
1080      */
1081     BlockSync(memoryDB1, OK, g_CloudSyncoption, CLOUD_ERROR);
1082     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
1083     cloudDbs[USER_ID] = virtualCloudDb_;
1084     cloudDbs[USER_ID_2] = virtualCloudDb2_;
1085     memoryDB1->SetCloudDB(cloudDbs);
1086     BlockSync(memoryDB1, OK, g_CloudSyncoption, SCHEMA_MISMATCH);
1087     EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
1088 }
1089 
1090 /**
1091  * @tc.name: NormalSync023
1092  * @tc.desc: Test normal local delete before cloud delete.
1093  * @tc.type: FUNC
1094  * @tc.require:
1095  * @tc.author: zhangqiquan
1096  */
1097 HWTEST_F(DistributedDBCloudKvTest, NormalSync023, TestSize.Level1)
1098 {
1099     Key k1 = {'k', '1'};
1100     Value v1 = {'v', '1'};
1101     ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
1102     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
1103     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1104     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1105     ASSERT_EQ(kvDelegatePtrS2_->Delete(k1), OK);
1106     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
1107     ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
1108     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1109     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1110 }
1111 
1112 /**
1113  * @tc.name: NormalSync024
1114  * @tc.desc: Test duplicate addition, deletion, and sync.
1115  * @tc.type: FUNC
1116  * @tc.require:
1117  * @tc.author: liaoyonghuang
1118  */
1119 HWTEST_F(DistributedDBCloudKvTest, NormalSync024, TestSize.Level1)
1120 {
1121     /**
1122      * @tc.steps:step1. Device A inserts data and synchronizes, then Device B synchronizes.
1123      * @tc.expected: step1 OK.
1124      */
1125     Key key = {'k'};
1126     Value value = {'v'};
1127     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1128     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1129     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1130     /**
1131      * @tc.steps:step2. Device A deletes data and synchronizes, then Device B synchronizes.
1132      * @tc.expected: step2 OK.
1133      */
1134     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
1135     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1136     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1137     /**
1138      * @tc.steps:step3. Device B inserts data and synchronizes it.
1139      * @tc.expected: step3 OK.
1140      */
1141     int insertNum = 0;
__anon6df255ee1102(VBucket &record) 1142     virtualCloudDb_->SetInsertHook([&insertNum](VBucket &record) {
1143         insertNum++;
1144     });
1145     ASSERT_EQ(kvDelegatePtrS2_->Put(key, value), OK);
1146     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1147     EXPECT_TRUE(insertNum > 0);
1148     virtualCloudDb_->SetInsertHook(nullptr);
1149 }
1150 
1151 /**
1152  * @tc.name: NormalSync025
1153  * @tc.desc: Test merge sync.
1154  * @tc.type: FUNC
1155  * @tc.require:
1156  * @tc.author: zhangqiquan
1157  */
1158 HWTEST_F(DistributedDBCloudKvTest, NormalSync025, TestSize.Level1)
1159 {
1160     virtualCloudDb_->SetBlockTime(1000); // block query 1000ms
1161     auto option = g_CloudSyncoption;
1162     option.merge = false;
1163     EXPECT_EQ(kvDelegatePtrS1_->Sync(option, nullptr), OK);
1164     option.merge = true;
1165     EXPECT_EQ(kvDelegatePtrS1_->Sync(option, nullptr), OK);
1166     BlockSync(kvDelegatePtrS1_, CLOUD_SYNC_TASK_MERGED, option);
1167 }
1168 
1169 /**
1170  * @tc.name: NormalSync026
1171  * @tc.desc: Test delete when sync mode DEVICE_COLLABORATION.
1172  * @tc.type: FUNC
1173  * @tc.require:
1174  * @tc.author: liaoyonghuang
1175  */
1176 HWTEST_F(DistributedDBCloudKvTest, NormalSync026, TestSize.Level1)
1177 {
1178     /**
1179      * @tc.steps:step1. Create a database with the DEVICE_COLLABORATION mode on device1.
1180      * @tc.expected: step1 OK.
1181      */
1182     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1183     KvStoreNbDelegate::Option option;
1184     option.conflictResolvePolicy = DEVICE_COLLABORATION;
1185     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
1186     /**
1187      * @tc.steps:step2. put 1 record and sync.
1188      * @tc.expected: step2 OK.
1189      */
1190     Key key = {'k'};
1191     Value expectValue1 = {'v', '1'};
1192     ASSERT_EQ(kvDelegatePtrS3_->Put(key, expectValue1), OK);
1193     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption);
1194     /**
1195      * @tc.steps:step3. Update this record on device2.
1196      * @tc.expected: step3 OK.
1197      */
1198     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1199     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
1200     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1201     /**
1202      * @tc.steps:step4. device1 sync.
1203      * @tc.expected: step4. The record was not covered by the cloud and cloud was covered.
1204      */
1205     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption);
1206     Value actualValue1;
1207     EXPECT_EQ(kvDelegatePtrS3_->Get(key, actualValue1), OK);
1208     EXPECT_EQ(actualValue1, expectValue1);
1209     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
1210 }
1211 
1212 /**
1213  * @tc.name: NormalSync027
1214  * @tc.desc: Test lock failed during data insert.
1215  * @tc.type: FUNC
1216  * @tc.require:
1217  * @tc.author: liaoyonghuang
1218  */
1219 HWTEST_F(DistributedDBCloudKvTest, NormalSync027, TestSize.Level1)
1220 {
1221     /**
1222      * @tc.steps:step1. put 1 record and sync.
1223      * @tc.expected: step1 OK.
1224      */
1225     Key key = {'k'};
1226     Value value = {'v'};
1227     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1228     /**
1229      * @tc.steps:step2. Lock cloud DB, and do sync.
1230      * @tc.expected: step2 sync failed due to lock fail, and failCount is 1.
1231      */
1232     virtualCloudDb_->Lock();
1233     BlockSync(kvDelegatePtrS1_, CLOUD_LOCK_ERROR, g_CloudSyncoption);
1234     for (const auto &table : lastProcess_.tableProcess) {
1235         EXPECT_EQ(table.second.upLoadInfo.total, 1u);
1236         EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
1237         EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1238     }
1239     virtualCloudDb_->UnLock();
1240 }
1241 
1242 /**
1243  * @tc.name: NormalSync028
1244  * @tc.desc: Test multi user sync.
1245  * @tc.type: FUNC
1246  * @tc.require:
1247  * @tc.author: caihaoting
1248  */
1249 HWTEST_F(DistributedDBCloudKvTest, NormalSync028, TestSize.Level1)
1250 {
1251     /**
1252      * @tc.steps:step1. put 1 record and sync.
1253      * @tc.expected: step1 OK.
1254      */
1255     Key key = {'k'};
1256     Value value = {'v'};
1257     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1258     auto option = g_CloudSyncoption;
1259     option.users = {USER_ID, USER_ID_2};
1260     BlockSync(kvDelegatePtrS1_, OK, option);
1261     option.users = {USER_ID_2};
1262     BlockSync(kvDelegatePtrS2_, OK, option);
1263     option.users = {USER_ID, USER_ID_2};
1264     BlockSync(kvDelegatePtrS2_, OK, option);
1265     EXPECT_EQ(lastProcess_.tableProcess[USER_ID_2].downLoadInfo.total, 0u);
1266 }
1267 
1268 /**
1269  * @tc.name: NormalSync029
1270  * @tc.desc: Test lock failed during data insert.
1271  * @tc.type: FUNC
1272  * @tc.require:
1273  * @tc.author: zhangqiquan
1274  */
1275 HWTEST_F(DistributedDBCloudKvTest, NormalSync029, TestSize.Level3)
1276 {
1277     /**
1278      * @tc.steps:step1. block virtual cloud.
1279      */
1280     virtualCloudDb_->SetBlockTime(1000); // block 1s
1281     /**
1282      * @tc.steps:step2. Call sync and let it run with block cloud.
1283      * @tc.expected: step2 sync ok.
1284      */
1285     auto option = g_CloudSyncoption;
1286     std::mutex finishMutex;
1287     std::condition_variable cv;
1288     std::vector<std::map<std::string, SyncProcess>> processRes;
__anon6df255ee1202(const std::map<std::string, SyncProcess> &process) 1289     auto callback = [&processRes, &finishMutex, &cv](const std::map<std::string, SyncProcess> &process) {
1290         for (const auto &item : process) {
1291             if (item.second.process != ProcessStatus::FINISHED) {
1292                 return;
1293             }
1294         }
1295         {
1296             std::lock_guard<std::mutex> autoLock(finishMutex);
1297             processRes.push_back(process);
1298         }
1299         cv.notify_all();
1300     };
1301     EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1302     /**
1303      * @tc.steps:step3. USER0 sync first and USER2 sync second.
1304      * @tc.expected: step3 sync ok and USER2 task has been merged.
1305      */
1306     option.merge = true;
1307     option.users = {USER_ID};
1308     EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1309     option.users = {USER_ID_2};
1310     EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1311     /**
1312      * @tc.steps:step4. Wait all task finished.
1313      * @tc.expected: step4 Second sync task has USER0 and USER2.
1314      */
1315     virtualCloudDb_->SetBlockTime(0); // cancel block for test case run quickly
1316     std::unique_lock<std::mutex> uniqueLock(finishMutex);
__anon6df255ee1302() 1317     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [&processRes]() {
1318         return processRes.size() >= 3; // call sync 3 times
1319     });
1320     EXPECT_EQ(processRes[processRes.size() - 1].size(), 2u); // check the last process has 2 user
1321 }
1322 
1323 /**
1324  * @tc.name: NormalSync030
1325  * @tc.desc: Test sync while set null cloudDB
1326  * @tc.type: FUNC
1327  * @tc.require:
1328  * @tc.author: caihaoting
1329  */
1330 HWTEST_F(DistributedDBCloudKvTest, NormalSync030, TestSize.Level1)
1331 {
1332     /**
1333      * @tc.steps:step1. Create a database.
1334      * @tc.expected: step1 OK.
1335      */
1336     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1337     KvStoreNbDelegate::Option option;
1338     DBStatus ret = OK;
__anon6df255ee1402(DBStatus status, KvStoreNbDelegate *openDelegate) 1339     g_mgr.GetKvStore(STORE_ID_3, option, [&ret, &kvDelegatePtrS3_](DBStatus status, KvStoreNbDelegate *openDelegate) {
1340         ret = status;
1341         kvDelegatePtrS3_ = openDelegate;
1342     });
1343     EXPECT_EQ(ret, OK);
1344     ASSERT_NE(kvDelegatePtrS3_, nullptr);
1345     /**
1346      * @tc.steps:step2. Put {k, v}.
1347      * @tc.expected: step2 OK.
1348      */
1349     Key key = {'k'};
1350     Value value = {'v'};
1351     ASSERT_EQ(kvDelegatePtrS3_->Put(key, value), OK);
1352     /**
1353      * @tc.steps:step3. Set null cloudDB.
1354      * @tc.expected: step3 INVALID_ARGS.
1355      */
1356     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, CLOUD_ERROR);
1357     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
1358     cloudDbs[USER_ID] = nullptr;
1359     cloudDbs[USER_ID_2] = virtualCloudDb2_;
1360     EXPECT_EQ(kvDelegatePtrS3_->SetCloudDB(cloudDbs), INVALID_ARGS);
1361     /**
1362      * @tc.steps:step4. Sync while set null cloudDB.
1363      * @tc.expected: step4 CLOUD_ERROR.
1364      */
1365     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, CLOUD_ERROR);
1366     EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrS3_), OK);
1367 }
1368 
1369 /**
1370  * @tc.name: NormalSync031
1371  * @tc.desc: Test sync with error local device
1372  * @tc.type: FUNC
1373  * @tc.require:
1374  * @tc.author: zhangqiquan
1375  */
1376 HWTEST_F(DistributedDBCloudKvTest, NormalSync031, TestSize.Level1)
1377 {
1378     /**
1379      * @tc.steps:step1. put 1 record and sync.
1380      * @tc.expected: step1 OK.
1381      */
1382     Key key = {'k'};
1383     Value value = {'v'};
1384     ASSERT_EQ(kvDelegatePtrS2_->Put(key, value), OK);
1385     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1386     /**
1387      * @tc.steps:step2. Set local devices error and sync.
1388      * @tc.expected: step2 sync fail.
1389      */
1390     communicatorAggregator_->MockGetLocalDeviceRes(-E_CLOUD_ERROR);
1391     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1392     communicatorAggregator_->MockGetLocalDeviceRes(E_OK);
1393     for (const auto &table : lastProcess_.tableProcess) {
1394         EXPECT_EQ(table.second.downLoadInfo.total, 0u);
1395         EXPECT_EQ(table.second.downLoadInfo.failCount, 0u);
1396         EXPECT_EQ(table.second.upLoadInfo.total, 0u);
1397     }
1398 }
1399 
1400 /**
1401  * @tc.name: NormalSync032
1402  * @tc.desc: Test some record upload fail in 1 batch.
1403  * @tc.type: FUNC
1404  * @tc.require:
1405  * @tc.author: liaoyonghuang
1406  */
1407 HWTEST_F(DistributedDBCloudKvTest, NormalSync032, TestSize.Level1)
1408 {
1409     /**
1410      * @tc.steps:step1. put 10 records.
1411      * @tc.expected: step1 ok.
1412      */
1413     vector<Entry> entries;
1414     int count = 10; // put 10 records.
1415     for (int i = 0; i < count; i++) {
1416         std::string keyStr = "k_" + std::to_string(i);
1417         std::string valueStr = "v_" + std::to_string(i);
1418         Key key(keyStr.begin(), keyStr.end());
1419         Value value(valueStr.begin(), valueStr.end());
1420         entries.push_back({key, value});
1421     }
1422     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1423     /**
1424      * @tc.steps:step2. sync and set the last record upload fail.
1425      * @tc.expected: step2 sync fail and upLoadInfo.failCount is 1.
1426      */
1427     int uploadFailId = 0;
1428     virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee1502(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1429         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1430         uploadFailId++;
1431         if (uploadFailId == 10) { // 10 is the last record
1432             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1433             return CLOUD_ERROR;
1434         }
1435         return OK;
1436     });
1437     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1438     for (const auto &table : lastProcess_.tableProcess) {
1439         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1440         EXPECT_EQ(table.second.upLoadInfo.successCount, 9u);
1441         EXPECT_EQ(table.second.upLoadInfo.insertCount, 9u);
1442         EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1443     }
1444     virtualCloudDb_->ForkUpload(nullptr);
1445 }
1446 
1447 /**
1448  * @tc.name: NormalSync033
1449  * @tc.desc: test sync with different operation type and check upLoadInfo
1450  * @tc.type: FUNC
1451  * @tc.require:
1452  * @tc.author: liaoyonghuang
1453  */
1454 HWTEST_F(DistributedDBCloudKvTest, NormalSync033, TestSize.Level1)
1455 {
1456     /**
1457      * @tc.steps:step1. put local records {k1, v1} {k2, v2} and sync to cloud.
1458      * @tc.expected: step1 ok.
1459      */
1460     Key key1 = {'k', '1'};
1461     Value value1 = {'v', '1'};
1462     kvDelegatePtrS1_->Put(key1, value1);
1463     Key key2 = {'k', '2'};
1464     Value value2 = {'v', '2'};
1465     kvDelegatePtrS1_->Put(key2, value2);
1466     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1467     /**
1468      * @tc.steps:step2. put {k3, v3}, delete {k1, v1}, and put {k2, v3}
1469      * @tc.expected: step2 ok.
1470      */
1471     Key key3 = {'k', '3'};
1472     Value value3 = {'v', '3'};
1473     kvDelegatePtrS1_->Put(key3, value3);
1474     kvDelegatePtrS1_->Delete(key1);
1475     kvDelegatePtrS1_->Put(key2, value3);
1476     /**
1477      * @tc.steps:step3. sync and check upLoadInfo
1478      * @tc.expected: step3 ok.
1479      */
1480     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1481     for (const auto &table : lastProcess_.tableProcess) {
1482         EXPECT_EQ(table.second.upLoadInfo.total, 3u);
1483         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 3u);
1484         EXPECT_EQ(table.second.upLoadInfo.successCount, 3u);
1485         EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
1486         EXPECT_EQ(table.second.upLoadInfo.deleteCount, 1u);
1487         EXPECT_EQ(table.second.upLoadInfo.updateCount, 1u);
1488         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1489     }
1490 }
1491 
1492 /**
1493  * @tc.name: NormalSync034
1494  * @tc.desc: test sync data which size > maxUploadSize.
1495  * @tc.type: FUNC
1496  * @tc.require:
1497  * @tc.author: liaoyonghuang
1498  */
1499 HWTEST_F(DistributedDBCloudKvTest, NormalSync034, TestSize.Level1)
1500 {
1501     /**
1502      * @tc.steps:step1. put data which size > maxUploadSize.
1503      * @tc.expected: step1 ok.
1504      */
1505     CloudSyncConfig config;
1506     int maxUploadSize = 1024000;
1507     int maxUploadCount = 40;
1508     config.maxUploadSize = maxUploadSize;
1509     config.maxUploadCount = maxUploadCount;
1510     kvDelegatePtrS1_->SetCloudSyncConfig(config);
1511     Key key = {'k'};
1512     Value value = {'v'};
1513     value.insert(value.end(), maxUploadSize, '0');
1514     kvDelegatePtrS1_->Put(key, value);
1515     /**
1516      * @tc.steps:step2. put entries which count > maxUploadCount.
1517      * @tc.expected: step2 ok.
1518      */
1519     vector<Entry> entries;
1520     for (int i = 0; i < maxUploadCount; i++) {
1521         std::string keyStr = "k_" + std::to_string(i);
1522         std::string valueStr = "v_" + std::to_string(i);
1523         Key key(keyStr.begin(), keyStr.end());
1524         Value value(valueStr.begin(), valueStr.end());
1525         entries.push_back({key, value});
1526     }
1527     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1528     /**
1529      * @tc.steps:step3. sync and check upLoadInfo.batchIndex.
1530      * @tc.expected: step3 ok.
1531      */
1532     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1533     for (const auto &table : lastProcess_.tableProcess) {
1534         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 2u);
1535     }
1536 }
1537 
1538 /**
1539  * @tc.name: NormalSync035
1540  * @tc.desc:test sync after export and import
1541  * @tc.type: FUNC
1542  * @tc.require:
1543  * @tc.author: caihaoting
1544  */
1545 HWTEST_F(DistributedDBCloudKvTest, NormalSync035, TestSize.Level1)
1546 {
1547     /**
1548      * @tc.steps:step1. put local records {k1, v1} and sync to cloud.
1549      * @tc.expected: step1 ok.
1550      */
1551     Key key1 = {'k', '1'};
1552     Value value1 = {'v', '1'};
1553     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, value1), OK);
__anon6df255ee1602(const std::string &origin) 1554     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1555         LOGW("origin is %s", origin.c_str());
1556         return origin + "1";
1557     });
1558     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1559     Value actualValue1;
1560     EXPECT_EQ(kvDelegatePtrS1_->Get(key1, actualValue1), OK);
1561     EXPECT_EQ(actualValue1, value1);
1562     auto result = kvDelegatePtrS1_->GetCloudVersion("");
1563     EXPECT_EQ(result.first, OK);
1564     for (const auto &item : result.second) {
1565         EXPECT_EQ(item.second, "1");
1566     }
1567     /**
1568      * @tc.steps:step2. export and import.
1569      * @tc.expected: step2 export and import ok.
1570      */
1571     std::string singleExportFileName = g_testDir + "/NormalSync034.$$";
1572     CipherPassword passwd;
1573     EXPECT_EQ(kvDelegatePtrS1_->Export(singleExportFileName, passwd), OK);
1574     EXPECT_EQ(kvDelegatePtrS1_->Import(singleExportFileName, passwd), OK);
1575     /**
1576      * @tc.steps:step3. put {k1, v2} and sync to cloud.
1577      * @tc.expected: step3 ok.
1578      */
1579     Value value2 = {'v', '2'};
1580     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, value2), OK);
1581     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1582     Value actualValue2;
1583     EXPECT_EQ(kvDelegatePtrS1_->Get(key1, actualValue2), OK);
1584     EXPECT_EQ(actualValue2, value2);
1585     result = kvDelegatePtrS1_->GetCloudVersion("");
1586     EXPECT_EQ(result.first, OK);
1587     for (const auto &item : result.second) {
1588         EXPECT_EQ(item.second, "11");
1589     }
1590     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1591 }
1592 
1593 /**
1594  * @tc.name: NormalSync036
1595  * @tc.desc: test sync data with SetCloudSyncConfig.
1596  * @tc.type: FUNC
1597  * @tc.require:
1598  * @tc.author: caihaoting
1599  */
1600 HWTEST_F(DistributedDBCloudKvTest, NormalSync036, TestSize.Level1)
1601 {
1602     /**
1603      * @tc.steps:step1. put data and SetCloudSyncConfig.
1604      * @tc.expected: step1. ok.
1605      */
1606     CloudSyncConfig config;
1607     int maxUploadCount = 40;
1608     config.maxUploadCount = maxUploadCount;
1609     kvDelegatePtrS1_->SetCloudSyncConfig(config);
1610     Key key = {'k', '1'};
1611     Value value = {'v', '1'};
1612     kvDelegatePtrS1_->Put(key, value);
1613     /**
1614      * @tc.steps:step2. sync.
1615      * @tc.expected: step2. ok.
1616      */
1617     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1618 }
1619 
1620 /**
1621  * @tc.name: NormalSync037
1622  * @tc.desc: test update sync data while local and cloud record device is same and not local device.
1623  * @tc.type: FUNC
1624  * @tc.require:
1625  * @tc.author: caihaoting
1626  */
1627 HWTEST_F(DistributedDBCloudKvTest, NormalSync037, TestSize.Level1)
1628 {
1629     /**
1630      * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1631      * @tc.expected: step1. ok.
1632      */
1633     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1634     Key key = {'k', '1'};
1635     Value expectValue1 = {'v', '1'};
1636     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
1637     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1638     /**
1639      * @tc.steps:step2. deviceB sync to cloud.
1640      * @tc.expected: step2. ok.
1641      */
1642     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1643     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1644     Value actualValue1;
1645     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
1646     EXPECT_EQ(actualValue1, expectValue1);
1647     /**
1648      * @tc.steps:step3. deviceA put {k1, v2} and sync to cloud.
1649      * @tc.expected: step3. ok.
1650      */
1651     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1652     Value expectValue2 = {'v', '2'};
1653     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue2), OK);
1654     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1655     /**
1656      * @tc.steps:step4. deviceB sync to cloud.
1657      * @tc.expected: step4. ok.
1658      */
1659     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1660     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1661     Value actualValue2;
1662     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
1663     EXPECT_EQ(actualValue2, expectValue2);
1664 }
1665 
1666 /**
1667  * @tc.name: NormalSync038
1668  * @tc.desc: test insert sync data while local and cloud record device is same and not local device.
1669  * @tc.type: FUNC
1670  * @tc.require:
1671  * @tc.author: caihaoting
1672  */
1673 HWTEST_F(DistributedDBCloudKvTest, NormalSync038, TestSize.Level1)
1674 {
1675     /**
1676      * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1677      * @tc.expected: step1. ok.
1678      */
1679     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1680     Key key1 = {'k', '1'};
1681     Value expectValue1 = {'v', '1'};
1682     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1683     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1684     /**
1685      * @tc.steps:step2. deviceB sync to cloud.
1686      * @tc.expected: step2. ok.
1687      */
1688     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1689     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1690     Value actualValue1;
1691     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1692     EXPECT_EQ(actualValue1, expectValue1);
1693     /**
1694      * @tc.steps:step3. deviceA put {k2, v2} and sync to cloud.
1695      * @tc.expected: step3. ok.
1696      */
1697     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1698     Key key2 = {'k', '2'};
1699     Value expectValue2 = {'v', '2'};
1700     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1701     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1702     /**
1703      * @tc.steps:step4. deviceB sync to cloud.
1704      * @tc.expected: step4. ok.
1705      */
1706     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1707     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1708     actualValue1.clear();
1709     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1710     EXPECT_EQ(actualValue1, expectValue1);
1711     Value actualValue2;
1712     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
1713     EXPECT_EQ(actualValue2, expectValue2);
1714 }
1715 
1716 /**
1717  * @tc.name: NormalSync039
1718  * @tc.desc: test delete sync data while local and cloud record device is same and not local device.
1719  * @tc.type: FUNC
1720  * @tc.require:
1721  * @tc.author: caihaoting
1722  */
1723 HWTEST_F(DistributedDBCloudKvTest, NormalSync039, TestSize.Level1)
1724 {
1725     /**
1726      * @tc.steps:step1. deviceA put {k1, v1} {k2, v2} and sync to cloud.
1727      * @tc.expected: step1. ok.
1728      */
1729     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1730     Key key1 = {'k', '1'};
1731     Value expectValue1 = {'v', '1'};
1732     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1733     Key key2 = {'k', '2'};
1734     Value expectValue2 = {'v', '2'};
1735     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1736     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1737     /**
1738      * @tc.steps:step2. deviceB sync to cloud.
1739      * @tc.expected: step2. ok.
1740      */
1741     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1742     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1743     Value actualValue1;
1744     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1745     EXPECT_EQ(actualValue1, expectValue1);
1746     Value actualValue2;
1747     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
1748     EXPECT_EQ(actualValue2, expectValue2);
1749     /**
1750      * @tc.steps:step3. deviceA delete {k2, v2} and sync to cloud.
1751      * @tc.expected: step3. ok.
1752      */
1753     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1754     ASSERT_EQ(kvDelegatePtrS1_->Delete(key2), OK);
1755     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1756     /**
1757      * @tc.steps:step4. deviceB sync to cloud.
1758      * @tc.expected: step4. ok.
1759      */
1760     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1761     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1762     actualValue1.clear();
1763     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1764     EXPECT_EQ(actualValue1, expectValue1);
1765     actualValue2.clear();
1766     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), NOT_FOUND);
1767     EXPECT_NE(actualValue2, expectValue2);
1768 }
1769 
1770 /**
1771  * @tc.name: NormalSync040
1772  * @tc.desc: test update sync data while local and cloud record device is different and not local device.
1773  * @tc.type: FUNC
1774  * @tc.require:
1775  * @tc.author: caihaoting
1776  */
1777 HWTEST_F(DistributedDBCloudKvTest, NormalSync040, TestSize.Level1)
1778 {
1779     /**
1780      * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1781      * @tc.expected: step1. ok.
1782      */
1783     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1784     Key key = {'k', '1'};
1785     Value expectValue1 = {'v', '1'};
1786     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
1787     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1788     /**
1789      * @tc.steps:step2. deviceB sync to cloud.
1790      * @tc.expected: step2. ok.
1791      */
1792     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1793     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1794     Value actualValue1;
1795     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
1796     EXPECT_EQ(actualValue1, expectValue1);
1797     /**
1798      * @tc.steps:step3. deviceA put {k1, v2} and sync to cloud.
1799      * @tc.expected: step3. ok.
1800      */
1801     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1802     Value expectValue2 = {'v', '2'};
1803     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue2), OK);
1804     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1805     /**
1806      * @tc.steps:step4. deviceB put {k1, v3} sync to cloud.
1807      * @tc.expected: step4. ok.
1808      */
1809     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1810     Value expectValue3 = {'v', '3'};
1811     ASSERT_EQ(kvDelegatePtrS2_->Put(key, expectValue3), OK);
1812     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1813     Value actualValue2;
1814     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
1815     EXPECT_NE(actualValue2, expectValue2);
1816     EXPECT_EQ(actualValue2, expectValue3);
1817 }
1818 
1819 /**
1820  * @tc.name: NormalSync041
1821  * @tc.desc: Test concurrent sync and close DB.
1822  * @tc.type: FUNC
1823  * @tc.require:
1824  * @tc.author: liaoyonghuang
1825  */
1826 HWTEST_F(DistributedDBCloudKvTest, NormalSync041, TestSize.Level1)
1827 {
1828     /**
1829      * @tc.steps:step1. put data to cloud.
1830      * @tc.expected: step1 ok.
1831      */
1832     Key key = {'k', '1'};
1833     Value value = {'v', '1'};
1834     kvDelegatePtrS1_->Put(key, value);
1835     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1836 
1837     /**
1838      * @tc.steps:step2. sync and close DB concurrently.
1839      * @tc.expected: step2 ok.
1840      */
1841     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1842     KvStoreNbDelegate::Option option;
1843     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
__anon6df255ee1702(const std::string &tableName, VBucket &extend) 1844     virtualCloudDb_->ForkQuery([](const std::string &tableName, VBucket &extend) {
1845         std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep for 200ms
1846     });
1847     KvStoreDelegateManager &mgr = g_mgr;
__anon6df255ee1802() 1848     std::thread syncThread([&mgr, &kvDelegatePtrS3_]() {
1849         std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep for 100ms
1850         EXPECT_EQ(mgr.CloseKvStore(kvDelegatePtrS3_), OK);
1851     });
1852     EXPECT_EQ(kvDelegatePtrS3_->Sync(g_CloudSyncoption, nullptr), OK);
1853     syncThread.join();
1854 }
1855 
1856 /**
1857  * @tc.name: NormalSync042
1858  * @tc.desc: Test some record upload fail in 1 batch and get cloud version successfully.
1859  * @tc.type: FUNC
1860  * @tc.require:
1861  * @tc.author: caihaoting
1862  */
1863 HWTEST_F(DistributedDBCloudKvTest, NormalSync042, TestSize.Level1)
1864 {
1865     /**
1866      * @tc.steps:step1. put 10 records.
1867      * @tc.expected: step1 ok.
1868      */
1869     vector<Entry> entries;
1870     int count = 10; // put 10 records.
1871     for (int i = 0; i < count; i++) {
1872         std::string keyStr = "k_" + std::to_string(i);
1873         std::string valueStr = "v_" + std::to_string(i);
1874         Key key(keyStr.begin(), keyStr.end());
1875         Value value(valueStr.begin(), valueStr.end());
1876         entries.push_back({key, value});
1877     }
1878     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
__anon6df255ee1902(const std::string &origin) 1879     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1880         LOGW("origin is %s", origin.c_str());
1881         return origin + "1";
1882     });
1883     /**
1884      * @tc.steps:step2. sync and set the last record upload fail.
1885      * @tc.expected: step2 sync fail and upLoadInfo.failCount is 1.
1886      */
1887     int uploadFailId = 0;
1888     virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee1a02(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1889         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1890         uploadFailId++;
1891         if (uploadFailId == 10) { // 10 is the last record
1892             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1893             return CLOUD_ERROR;
1894         }
1895         return OK;
1896     });
1897     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1898     for (const auto &table : lastProcess_.tableProcess) {
1899         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1900         EXPECT_EQ(table.second.upLoadInfo.successCount, 9u);
1901         EXPECT_EQ(table.second.upLoadInfo.insertCount, 9u);
1902         EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1903     }
1904     /**
1905      * @tc.steps:step3. get cloud version successfully.
1906      * @tc.expected: step3 OK.
1907      */
1908     auto result = kvDelegatePtrS1_->GetCloudVersion("");
1909     EXPECT_EQ(result.first, OK);
1910     for (const auto &item : result.second) {
1911         EXPECT_EQ(item.second, "1");
1912     }
1913     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1914     virtualCloudDb_->ForkUpload(nullptr);
1915 }
1916 
1917 /**
1918  * @tc.name: NormalSync043
1919  * @tc.desc: Test some record upload fail in 1 batch and get cloud version failed.
1920  * @tc.type: FUNC
1921  * @tc.require:
1922  * @tc.author: caihaoting
1923  */
1924 HWTEST_F(DistributedDBCloudKvTest, NormalSync043, TestSize.Level1)
1925 {
1926     /**
1927      * @tc.steps:step1. put 10 records.
1928      * @tc.expected: step1 ok.
1929      */
1930     vector<Entry> entries;
1931     int count = 10; // put 10 records.
1932     for (int i = 0; i < count; i++) {
1933         std::string keyStr = "k_" + std::to_string(i);
1934         std::string valueStr = "v_" + std::to_string(i);
1935         Key key(keyStr.begin(), keyStr.end());
1936         Value value(valueStr.begin(), valueStr.end());
1937         entries.push_back({key, value});
1938     }
1939     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
__anon6df255ee1b02(const std::string &origin) 1940     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1941         LOGW("origin is %s", origin.c_str());
1942         return origin + "1";
1943     });
1944     /**
1945      * @tc.steps:step2. sync and set all record upload fail.
1946      * @tc.expected: step2 sync fail and upLoadInfo.failCount is 10.
1947      */
1948     virtualCloudDb_->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee1c02(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1949         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1950         extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1951         return CLOUD_ERROR;
1952     });
1953     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1954     for (const auto &table : lastProcess_.tableProcess) {
1955         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1956         EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
1957         EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
1958         EXPECT_EQ(table.second.upLoadInfo.failCount, 10u);
1959     }
1960     /**
1961      * @tc.steps:step3. get cloud version failed.
1962      * @tc.expected: step3 NOT_FOUND.
1963      */
1964     auto result = kvDelegatePtrS1_->GetCloudVersion("");
1965     EXPECT_EQ(result.first, NOT_FOUND);
1966     for (const auto &item : result.second) {
1967         EXPECT_EQ(item.second, "");
1968     }
1969     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1970     virtualCloudDb_->ForkUpload(nullptr);
1971 }
1972 
1973 /**
1974  * @tc.name: NormalSync044
1975  * @tc.desc: Test RemoveDeviceData with FLAG_ONLY option
1976  * @tc.type: FUNC
1977  * @tc.require:
1978  * @tc.author: zhangtao
1979  */
1980 HWTEST_F(DistributedDBCloudKvTest, NormalSync044, TestSize.Level1)
1981 {
1982     /**
1983      * @tc.steps: step1. store1 put (k1,v1) and (k2,v2)
1984      * @tc.expected: step1. both put ok
1985      */
1986     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anon6df255ee1d02(const std::string &origin) 1987     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1988         LOGW("origin is %s", origin.c_str());
1989         return origin + "1";
1990     });
1991     Key key1 = {'k', '1'};
1992     Value expectValue1 = {'v', '1'};
1993     Key key2 = {'k', '2'};
1994     Value expectValue2 = {'v', '2'};
1995     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1996     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1997     /**
1998      * @tc.steps: step2. DEVICE_A with store1 sync and DEVICE_B with store2 sync
1999      * @tc.expected: step2. both sync ok, and store2 got (k1,v1) and (k2,v2)
2000      */
2001     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2002     LOGW("Store1 sync end");
2003     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2004     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2005     LOGW("Store2 sync end");
2006     Value actualValue;
2007     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
2008     EXPECT_EQ(actualValue, expectValue1);
2009     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue), OK);
2010     EXPECT_EQ(actualValue, expectValue2);
2011     /**
2012      * @tc.steps: step3. store2 RevoveDeviceData with FLAG_ONLY option
2013      * @tc.expected: step3. store2 delete DEVICE_A's version CloudVersion data successfully
2014      */
2015     auto result = kvDelegatePtrS2_->GetCloudVersion("");
2016     EXPECT_EQ(result.first, OK);
2017     for (auto item : result.second) {
2018         EXPECT_EQ(item.second, "1");
2019     }
2020     EXPECT_EQ(kvDelegatePtrS2_->RemoveDeviceData("DEVICES_A", ClearMode::FLAG_ONLY), OK);
2021     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
2022     result = kvDelegatePtrS2_->GetCloudVersion("");
2023     EXPECT_EQ(result.first, NOT_FOUND);
2024     for (auto item : result.second) {
2025         EXPECT_EQ(item.second, "");
2026     }
2027 }
2028 
2029 /**
2030  * @tc.name: NormalSync045
2031  * @tc.desc: Test some record upload fail in 1 batch and extend size greater than record size
2032  * @tc.type: FUNC
2033  * @tc.require:
2034  * @tc.author: zhangtao
2035  */
2036 HWTEST_F(DistributedDBCloudKvTest, NormalSync045, TestSize.Level1)
2037 {
2038     /**
2039      * @tc.steps:step1. put 10 records.
2040      * @tc.expected: step1 ok.
2041      */
2042     vector<Entry> entries;
2043     int count = 10; // put 10 records.
2044     for (int i = 0; i < count; i++) {
2045         std::string keyStr = "k_" + std::to_string(i);
2046         std::string valueStr = "v_" + std::to_string(i);
2047         Key key(keyStr.begin(), keyStr.end());
2048         Value value(valueStr.begin(), valueStr.end());
2049         entries.push_back({key, value});
2050     }
2051     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
2052     /**
2053      * @tc.steps:step2. sync and add one empty extend as result
2054      * @tc.expected: step2 sync fail and upLoadInfo.failCount is 10. 1 batch failed.
2055      */
2056     std::atomic<int> missCount = -1;
2057     virtualCloudDb_->SetClearExtend(missCount);
2058     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
2059     for (const auto &table : lastProcess_.tableProcess) {
2060         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
2061         EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
2062         EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
2063         EXPECT_EQ(table.second.upLoadInfo.failCount, 10u);
2064     }
2065     virtualCloudDb_->ForkUpload(nullptr);
2066 }
2067 
2068 /**
2069  * @tc.name: NormalSync046
2070  * @tc.desc: Test RemoveDeviceData with FLAG_ONLY option and empty deviceName
2071  * @tc.type: FUNC
2072  * @tc.require:
2073  * @tc.author: chenghuitao
2074  */
2075 HWTEST_F(DistributedDBCloudKvTest, NormalSync046, TestSize.Level1)
2076 {
2077     /**
2078      * @tc.steps: step1. store1 put (k1,v1) and (k2,v2)
2079      * @tc.expected: step1. both put ok
2080      */
2081     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anon6df255ee1e02(const std::string &origin) 2082     kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
2083         LOGW("origin is %s", origin.c_str());
2084         return origin + "1";
2085     });
2086     Key key1 = {'k', '1'};
2087     Value expectValue1 = {'v', '1'};
2088     Key key2 = {'k', '2'};
2089     Value expectValue2 = {'v', '2'};
2090     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
2091     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
2092     /**
2093      * @tc.steps: step2. DEVICE_A with store1 sync and DEVICE_B with store2 sync
2094      * @tc.expected: step2. both sync ok, and store2 got (k1,v1) and (k2,v2)
2095      */
2096     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2097     LOGW("Store1 sync end");
2098     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2099     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2100     LOGW("Store2 sync end");
2101     Value actualValue;
2102     EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
2103     EXPECT_EQ(actualValue, expectValue1);
2104     EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue), OK);
2105     EXPECT_EQ(actualValue, expectValue2);
2106     /**
2107      * @tc.steps: step3. store2 RevoveDeviceData with FLAG_ONLY option
2108      * @tc.expected: step3. store2 delete DEVICE_A's version CloudVersion data successfully
2109      */
2110     auto result = kvDelegatePtrS2_->GetCloudVersion("");
2111     EXPECT_EQ(result.first, OK);
2112     for (auto item : result.second) {
2113         EXPECT_EQ(item.second, "1");
2114     }
2115     EXPECT_EQ(kvDelegatePtrS2_->RemoveDeviceData("", ClearMode::FLAG_ONLY), OK);
2116     kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
2117     result = kvDelegatePtrS2_->GetCloudVersion("");
2118     EXPECT_EQ(result.first, NOT_FOUND);
2119     for (auto item : result.second) {
2120         EXPECT_EQ(item.second, "");
2121     }
2122 }
2123 
2124 /**
2125  * @tc.name: NormalSync047
2126  * @tc.desc: Test multi users sync when user1 sync fail.
2127  * @tc.type: FUNC
2128  * @tc.require:
2129  * @tc.author: suyue
2130  */
2131 HWTEST_F(DistributedDBCloudKvTest, NormalSync047, TestSize.Level1)
2132 {
2133     /**
2134      * @tc.steps: step1. put 20 records.
2135      * @tc.expected: step1. ok.
2136      */
2137     vector<Entry> entries;
2138     int count = 20; // put 20 records.
2139     for (int i = 0; i < count; i++) {
2140         std::string keyStr = "k_" + std::to_string(i);
2141         std::string valueStr = "v_" + std::to_string(i);
2142         Key key(keyStr.begin(), keyStr.end());
2143         Value value(valueStr.begin(), valueStr.end());
2144         entries.push_back({key, value});
2145     }
2146     EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
2147 
2148     /**
2149      * @tc.steps: step2. multi users sync and set user1 fail.
2150      * @tc.expected: step2. user1 sync fail and other user sync success.
2151      */
2152     int uploadFailId = 0;
2153     virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee1f02(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2154         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2155         uploadFailId++;
2156         if (uploadFailId > 15) { // the first 15 records success
2157             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
2158             return CLOUD_ERROR;
2159         }
2160         return OK;
2161     });
2162     CloudSyncOption option;
2163     option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2164     option.users.push_back(USER_ID);
2165     option.users.push_back(USER_ID_2);
2166     option.devices.push_back("cloud");
2167     SyncAndGetProcessInfo(kvDelegatePtrS1_, option);
2168 
2169     vector<DBStatus> userStatus = {CLOUD_ERROR, OK};
2170     vector<Info> userExpectInfo = {{1u, 20u, 15u, 5u, 15u, 0u, 0u}, {1u, 20u, 20u, 0u, 20u, 0u, 0u}};
2171     EXPECT_TRUE(CheckUserSyncInfo(option.users, userStatus, userExpectInfo));
2172     virtualCloudDb_->ForkUpload(nullptr);
2173 }
2174 
2175 /**
2176  * @tc.name: NormalSync048
2177  * @tc.desc: test sync data while cloud delete on record and local do not have this record.
2178  * @tc.type: FUNC
2179  * @tc.require:
2180  * @tc.author: tankaisheng
2181  */
2182 HWTEST_F(DistributedDBCloudKvTest, NormalSync048, TestSize.Level1)
2183 {
2184     /**
2185      * @tc.steps: step1. deviceB put {k1, v1} {k2, v2} and sync to cloud
2186      * @tc.expected: step1. ok.
2187      */
2188     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2189     Key key1 = {'k', '1'};
2190     Value expectValue1 = {'v', '1'};
2191     Key key2 = {'k', '2'};
2192     Value expectValue2 = {'v', '2'};
2193     ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
2194     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
2195     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2196 
2197     /**
2198      * @tc.steps: step2. deviceB delete {k1, v1} and sync to cloud
2199      * @tc.expected: step2. ok.
2200      */
2201     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2202     ASSERT_EQ(kvDelegatePtrS1_->Delete(key1), OK);
2203     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2204 
2205     /**
2206      * @tc.steps: step3. deviceA sync to cloud
2207      * @tc.expected: step3. ok.
2208      */
2209     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
2210     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2211     Value actualValue2;
2212     ASSERT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
2213     ASSERT_EQ(actualValue2, expectValue2);
2214 }
2215 
2216 /**
2217  * @tc.name: NormalSync049
2218  * @tc.desc: test sync data with invalid modify_time.
2219  * @tc.type: FUNC
2220  * @tc.require:
2221  * @tc.author: liaoyonghuang
2222  */
2223 HWTEST_F(DistributedDBCloudKvTest, NormalSync049, TestSize.Level1)
2224 {
2225     /**
2226      * @tc.steps: step1. put {k, v}
2227      * @tc.expected: step1. ok.
2228      */
2229     vector<Entry> entries;
2230     int count = 31; // put 31 records.
2231     for (int i = 0; i < count; i++) {
2232         std::string keyStr = "k_" + std::to_string(i);
2233         std::string valueStr = "v_" + std::to_string(i);
2234         Key key(keyStr.begin(), keyStr.end());
2235         Value value(valueStr.begin(), valueStr.end());
2236         entries.push_back({key, value});
2237     }
2238     kvDelegatePtrS1_->PutBatch(entries);
2239     /**
2240      * @tc.steps: step2. modify time in local and sync
2241      * @tc.expected: step2. ok.
2242      */
2243     sqlite3 *db_;
2244     uint64_t openFlag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
2245     std::string fileUrl = g_testDir + "/" \
2246         "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
2247     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db_, openFlag, nullptr) == SQLITE_OK);
2248     std::string sql = "UPDATE sync_data SET modify_time = modify_time + modify_time where rowid>0";
2249     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
2250     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
2251     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2252     /**
2253      * @tc.steps: step3. put {k, v2} in another device
2254      * @tc.expected: step3. ok.
2255      */
2256     Value newValue = {'v'};
2257     for (int i = 0; i < count; i++) {
2258         std::string keyStr = "k_" + std::to_string(i);
2259         Key key(keyStr.begin(), keyStr.end());
2260         entries.push_back({key, newValue});
2261     }
2262     kvDelegatePtrS2_->PutBatch(entries);
2263     /**
2264      * @tc.steps: step4. sync and check data
2265      * @tc.expected: step4. ok.
2266      */
2267     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2268     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2269     Value actualValue;
2270     for (int i = 0; i < count; i++) {
2271         std::string keyStr = "k_" + std::to_string(i);
2272         Key key(keyStr.begin(), keyStr.end());
2273         EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
2274         EXPECT_EQ(actualValue, newValue);
2275     }
2276 }
2277 
2278 /**
2279  * @tc.name: NormalSync050
2280  * @tc.desc: test upload with conflic error and chekck upload info
2281  * @tc.type: FUNC
2282  * @tc.require:
2283  * @tc.author: liaoyonghuang
2284  */
2285 HWTEST_F(DistributedDBCloudKvTest, NormalSync050, TestSize.Level1)
2286 {
2287     /**
2288      * @tc.steps: step1. Set the retry count to 1
2289      * @tc.expected: step1. ok.
2290      */
2291     CloudSyncConfig config;
2292     config.maxRetryConflictTimes = 1;
2293     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2294     /**
2295      * @tc.steps: step2. Put {k1, v1} {k2, v2} locally
2296      * @tc.expected: step2. ok.
2297      */
2298     kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2299     kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2300     /**
2301      * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2302      * @tc.expected: step3. ok.
2303      */
2304     int recordIndex = 0;
2305     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee2002(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2306         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2307         recordIndex++;
2308         if (recordIndex == 2) { // set 2nd record return CLOUD_RECORD_EXIST_CONFLICT
2309             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2310             return CLOUD_VERSION_CONFLICT;
2311         }
2312         return OK;
2313     });
2314     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2315     /**
2316      * @tc.steps: step4. Check last process
2317      * @tc.expected: step4. ok.
2318      */
2319     for (const auto &table : lastProcess_.tableProcess) {
2320         EXPECT_EQ(table.second.upLoadInfo.total, 2u);
2321         EXPECT_EQ(table.second.upLoadInfo.successCount, 2u);
2322         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
2323         EXPECT_EQ(table.second.upLoadInfo.insertCount, 2u);
2324     }
2325     virtualCloudDb_->ForkInsertConflict(nullptr);
2326 }
2327 
2328 /**
2329  * @tc.name: NormalSync051
2330  * @tc.desc: test upload with conflict error and exceeds retry limit
2331  * @tc.type: FUNC
2332  * @tc.require:
2333  * @tc.author: suyuchen
2334  */
2335 HWTEST_F(DistributedDBCloudKvTest, NormalSync051, TestSize.Level1)
2336 {
2337     /**
2338      * @tc.steps: step1. Set the retry count to 0
2339      * @tc.expected: step1. ok.
2340      */
2341     CloudSyncConfig config;
2342     config.maxRetryConflictTimes = 0;
2343     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2344     /**
2345      * @tc.steps: step2. Put {k1, v1} {k2, v2} locally
2346      * @tc.expected: step2. ok.
2347      */
2348     kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2349     kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2350     /**
2351      * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2352      * @tc.expected: step3. CLOUD_VERSION_CONFLICT.
2353      */
2354     int recordIndex = 0;
2355     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee2102(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2356         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2357         recordIndex++;
2358         if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2359             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2360             return CLOUD_VERSION_CONFLICT;
2361         }
2362         return OK;
2363     });
2364     BlockSync(kvDelegatePtrS1_, CLOUD_VERSION_CONFLICT, g_CloudSyncoption);
2365     /**
2366      * @tc.steps: step4. Check last process
2367      * @tc.expected: step4. ok.
2368      */
2369     for (const auto &table : lastProcess_.tableProcess) {
2370         EXPECT_EQ(table.second.upLoadInfo.total, 2u);
2371         EXPECT_EQ(table.second.upLoadInfo.successCount, 1u);
2372         EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
2373         EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
2374     }
2375     virtualCloudDb_->ForkInsertConflict(nullptr);
2376 }
2377 
2378 /**
2379  * @tc.name: NormalSync052
2380  * @tc.desc: test upload with version conflict error under force push mode
2381  * @tc.type: FUNC
2382  * @tc.require:
2383  * @tc.author: suyuchen
2384  */
2385 HWTEST_F(DistributedDBCloudKvTest, NormalSync052, TestSize.Level1)
2386 {
2387     /**
2388      * @tc.steps: step1. Set the retry count to 2
2389      * @tc.expected: step1. ok.
2390      */
2391     CloudSyncConfig config;
2392     config.maxRetryConflictTimes = 2;
2393     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2394     /**
2395      * @tc.steps: step2. Put {k1, v1} {k2, v2} locally, then sync to cloud
2396      * @tc.expected: step2. ok.
2397      */
2398     kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2399     kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2400     /**
2401      * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2402      * @tc.expected: step3. OK.
2403      */
2404     int recordIndex = 0;
2405     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee2202(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2406         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2407         recordIndex++;
2408         if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2409             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2410             return CLOUD_VERSION_CONFLICT;
2411         }
2412         return OK;
2413     });
2414     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2415     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2416     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
2417     /**
2418      * @tc.steps: step4. Check last process
2419      * @tc.expected: step4. ok.
2420      */
2421     for (const auto &table : lastProcess_.tableProcess) {
2422         EXPECT_EQ(table.second.upLoadInfo.total, 3u);
2423         EXPECT_EQ(table.second.upLoadInfo.successCount, 3u);
2424         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
2425         EXPECT_EQ(table.second.upLoadInfo.insertCount, 2u);
2426     }
2427     virtualCloudDb_->ForkInsertConflict(nullptr);
2428 }
2429 
2430 /**
2431  * @tc.name: NormalSync053
2432  * @tc.desc: test upload with version conflict error under force push mode, which has both update and insert
2433  * @tc.type: FUNC
2434  * @tc.require:
2435  * @tc.author: suyuchen
2436  */
2437 HWTEST_F(DistributedDBCloudKvTest, NormalSync053, TestSize.Level1)
2438 {
2439     /**
2440      * @tc.steps: step1. Set the retry count to 0
2441      * @tc.expected: step1. ok.
2442      */
2443     CloudSyncConfig config;
2444     config.maxRetryConflictTimes = 0;
2445     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2446     /**
2447      * @tc.steps: step2. Put {k1, v1} {k2, v2} locally, then sync to cloud
2448      * @tc.expected: step2. ok.
2449      */
2450     kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2451     kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2452     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2453     /**
2454      * @tc.steps: step3. Put {k3, v3} locally, and update k1 k2
2455      * @tc.expected: step3. ok.
2456      */
2457     kvDelegatePtrS1_->Put(KEY_1, VALUE_2);
2458     kvDelegatePtrS1_->Put(KEY_2, VALUE_3);
2459     kvDelegatePtrS1_->Put(KEY_3, VALUE_3);
2460     /**
2461      * @tc.steps: step4. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2462      * @tc.expected: step4. CLOUD_VERSION_CONFLICT.
2463      */
2464     int recordIndex = 0;
2465     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6df255ee2302(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2466         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2467         recordIndex++;
2468         if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2469             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2470             return CLOUD_VERSION_CONFLICT;
2471         }
2472         return OK;
2473     });
2474     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2475     BlockSync(kvDelegatePtrS1_, CLOUD_VERSION_CONFLICT, g_CloudSyncoption);
2476     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
2477     /**
2478      * @tc.steps: step5. Check last process
2479      * @tc.expected: step5. ok.
2480      */
2481     for (const auto &table : lastProcess_.tableProcess) {
2482         EXPECT_EQ(table.second.upLoadInfo.total, 3u);
2483         EXPECT_EQ(table.second.upLoadInfo.successCount, 1u);
2484         EXPECT_EQ(table.second.upLoadInfo.failCount, 2u);
2485         EXPECT_EQ(table.second.upLoadInfo.updateCount, 1u);
2486         EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
2487     }
2488     virtualCloudDb_->ForkInsertConflict(nullptr);
2489 }
2490 
2491 /**
2492  * @tc.name: NormalSync054
2493  * @tc.desc: test get and put interface execute
2494  * @tc.type: FUNC
2495  * @tc.require:
2496  * @tc.author: tankaisheng
2497  */
2498 HWTEST_F(DistributedDBCloudKvTest, NormalSync054, TestSize.Level1)
2499 {
2500     /**
2501      * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
2502      * @tc.expected: step1. ok.
2503      */
2504     communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
2505     Key key = {'k', '1'};
2506     Value expectValue1 = {'v', '1'};
2507     ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
2508     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2509 
2510     /**
2511      * @tc.steps:step2. deviceB sync to cloud.
2512      * @tc.expected: step2. ok.
2513      */
2514     communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2515     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2516     Value actualValue1;
2517     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
2518     EXPECT_EQ(actualValue1, expectValue1);
2519 
2520     EXPECT_EQ(kvDelegatePtrS2_->StartTransaction(), OK);
2521     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
2522     EXPECT_EQ(actualValue1, expectValue1);
2523     EXPECT_TRUE(kvDelegatePtrS2_->Commit() == OK);
2524 }
2525 
2526 /**
2527  * @tc.name: KvSupportEncryptTest001
2528  * @tc.desc: Test sync when different security label and encryption para is set
2529  * @tc.type: FUNC
2530  * @tc.require:
2531  * @tc.author: suyue
2532  */
2533 HWTEST_F(DistributedDBCloudKvTest, KvSupportEncryptTest001, TestSize.Level0)
2534 {
2535     /**
2536      * @tc.steps: step1. sync when security label is S4 and encryption is not supported
2537      * @tc.expected: step1. return SECURITY_OPTION_CHECK_ERROR.
2538      */
2539     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2540     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
2541     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
2542     KvStoreNbDelegate::Option option;
2543     option.secOption.securityLabel = S4;
2544     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
2545     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, SECURITY_OPTION_CHECK_ERROR);
2546 
2547     /**
2548      * @tc.steps: step2. sync when security label is S4 and encryption is supported
2549      * @tc.expected: step2. return OK.
2550      */
2551     CloudSyncConfig config;
2552     config.isSupportEncrypt = true;
2553     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2554     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2555     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
2556 
2557     /**
2558      * @tc.steps: step3. sync when security label is not set and encryption is supported
2559      * @tc.expected: step3. return ok.
2560      */
2561     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2562     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2563 
2564     /**
2565      * @tc.steps: step4. sync when security label is not set and encryption is not supported
2566      * @tc.expected: step4. return ok.
2567      */
2568     config.isSupportEncrypt = false;
2569     kvDelegatePtrS1_->SetCloudSyncConfig(config);
2570     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2571     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
2572 }
2573 
2574 /**
2575  * @tc.name: KvSupportEncryptTest002
2576  * @tc.desc: Test sync when encryption para is set multiple times
2577  * @tc.type: FUNC
2578  * @tc.require:
2579  * @tc.author: suyue
2580  */
2581 HWTEST_F(DistributedDBCloudKvTest, KvSupportEncryptTest002, TestSize.Level1)
2582 {
2583     /**
2584      * @tc.steps: step1. sync when isSupportEncrypt is set to true for the first time
2585      * @tc.expected: step1. return OK.
2586      */
2587     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2588     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
2589     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
2590     KvStoreNbDelegate::Option option;
2591     option.secOption.securityLabel = S4;
2592     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
2593     CloudSyncConfig config;
2594     config.isSupportEncrypt = true;
2595     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2596     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2597 
2598     /**
2599      * @tc.steps: step2. sync when isSupportEncrypt is set to false for the second time
2600      * @tc.expected: step2. return SECURITY_OPTION_CHECK_ERROR.
2601      */
2602     config.isSupportEncrypt = false;
2603     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2604     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, SECURITY_OPTION_CHECK_ERROR);
2605 
2606     /**
2607      * @tc.steps: step3. sync when isSupportEncrypt is set to true for the third time
2608      * @tc.expected: step3. return OK.
2609      */
2610     config.isSupportEncrypt = true;
2611     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2612     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2613     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
2614     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
2615 }
2616 
2617 /**
2618  * @tc.name: KvSupportEncryptTest003
2619  * @tc.desc: Test GetCloudSyncConfig when GetCloudKvStore is nullptr
2620  * @tc.type: FUNC
2621  * @tc.require:
2622  * @tc.author: suyue
2623  */
2624 HWTEST_F(DistributedDBCloudKvTest, KvSupportEncryptTest003, TestSize.Level1)
2625 {
2626     SQLiteSingleVerNaturalStore store;
2627     EXPECT_EQ(store.GetCloudKvStore(), nullptr);
2628     CloudSyncConfig config = store.GetCloudSyncConfig();
2629     EXPECT_EQ(config.isSupportEncrypt, false);
2630 }
2631 
2632 /**
2633  * @tc.name: KvSupportEncryptTest004
2634  * @tc.desc: Test sync when security label change and different encryption para is set
2635  * @tc.type: FUNC
2636  * @tc.require:
2637  * @tc.author: suyue
2638  */
2639 HWTEST_F(DistributedDBCloudKvTest, KvSupportEncryptTest004, TestSize.Level1)
2640 {
2641     /**
2642      * @tc.steps: step1. sync when security label is S2 and different encryption para is set
2643      * @tc.expected: step1. return OK.
2644      */
2645     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2646     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
2647     KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
2648     KvStoreNbDelegate::Option option;
2649     option.secOption.securityLabel = S2;
2650     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
2651     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2652     CloudSyncConfig config;
2653     config.isSupportEncrypt = true;
2654     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2655     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2656     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
2657 
2658     /**
2659      * @tc.steps: step2. sync when security label is S4 and different encryption para is set
2660      * @tc.expected: step2. support encrypt return OK, not support encrypt return SECURITY_OPTION_CHECK_ERROR.
2661      */
2662     option.secOption.securityLabel = S4;
2663     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
2664     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, SECURITY_OPTION_CHECK_ERROR);
2665     config.isSupportEncrypt = true;
2666     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2667     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2668     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
2669 
2670     /**
2671      * @tc.steps: step3. sync when security label is S1 and different encryption para is set
2672      * @tc.expected: step3. return OK.
2673      */
2674     option.secOption.securityLabel = S1;
2675     EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
2676     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2677     config.isSupportEncrypt = true;
2678     kvDelegatePtrS3_->SetCloudSyncConfig(config);
2679     BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, OK);
2680     CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
2681     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
2682 }
2683 }
2684