1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "cloud/cloud_db_constant.h"
19 #include "db_base64_utils.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_virtual_device.h"
23 #include "kv_store_nb_delegate.h"
24 #include "kv_store_nb_delegate_impl.h"
25 #include "platform_specific.h"
26 #include "process_system_api_adapter_impl.h"
27 #include "virtual_communicator_aggregator.h"
28 #include "virtual_cloud_db.h"
29 #include "sqlite_utils.h"
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 using namespace std;
34
35 namespace {
36 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
37 string g_testDir;
38 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
39 CloudSyncOption g_CloudSyncoption;
40 const std::string USER_ID_2 = "user2";
41 const std::string USER_ID_3 = "user3";
42 const Key KEY_1 = {'k', '1'};
43 const Key KEY_2 = {'k', '2'};
44 const Key KEY_3 = {'k', '3'};
45 const Value VALUE_1 = {'v', '1'};
46 const Value VALUE_2 = {'v', '2'};
47 const Value VALUE_3 = {'v', '3'};
48 class DistributedDBCloudKvTest : public testing::Test {
49 public:
50 static void SetUpTestCase();
51 static void TearDownTestCase();
52 void SetUp();
53 void TearDown();
54 void InsertRecord(int num);
55 void SetDeviceId(const Key &key, const std::string &deviceId);
56 void SetFlag(const Key &key, LogInfoFlag flag);
57 int CheckFlag(const Key &key, LogInfoFlag flag);
58 int CheckLogTable(const std::string &deviceId);
59 int CheckWaterMark(const std::string &key);
60 int ChangeUserId(const std::string &deviceId, const std::string &wantUserId);
61 int ChangeHashKey(const std::string &deviceId);
62 protected:
63 DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
64 bool invalidSchema = false);
65 void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
66 void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
67 DBStatus expectSyncResult = OK);
68 void SyncAndGetProcessInfo(KvStoreNbDelegate *delegate, CloudSyncOption option);
69 bool CheckUserSyncInfo(const vector<std::string> users, const vector<DBStatus> userStatus,
70 const vector<Info> userExpectInfo);
71 static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
72 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
73 std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
74 KvStoreConfig config_;
75 KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
76 KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
77 SyncProcess lastProcess_;
78 std::map<std::string, SyncProcess> lastSyncProcess_;
79 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
80 KvVirtualDevice *deviceB_ = nullptr;
81 };
82
SetUpTestCase()83 void DistributedDBCloudKvTest::SetUpTestCase()
84 {
85 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
86 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
87 LOGE("rm test db files error!");
88 }
89 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
90 g_CloudSyncoption.users.push_back(USER_ID);
91 g_CloudSyncoption.devices.push_back("cloud");
92
93 string dir = g_testDir + "/single_ver";
94 DIR* dirTmp = opendir(dir.c_str());
95 if (dirTmp == nullptr) {
96 OS::MakeDBDirectory(dir);
97 } else {
98 closedir(dirTmp);
99 }
100 }
101
TearDownTestCase()102 void DistributedDBCloudKvTest::TearDownTestCase()
103 {
104 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
105 LOGE("rm test db files error!");
106 }
107 }
108
SetUp()109 void DistributedDBCloudKvTest::SetUp()
110 {
111 DistributedDBToolsUnitTest::PrintTestCaseInfo();
112 config_.dataDir = g_testDir;
113 /**
114 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
115 */
116 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
117 virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
118 g_mgr.SetKvStoreConfig(config_);
119 KvStoreNbDelegate::Option option1;
120 ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
121 // set aggregator after get store1, only store2 can sync with p2p
122 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
123 ASSERT_TRUE(communicatorAggregator_ != nullptr);
124 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
125 KvStoreNbDelegate::Option option2;
126 ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
127
128 deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
129 ASSERT_TRUE(deviceB_ != nullptr);
130 auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
131 ASSERT_TRUE(syncInterfaceB != nullptr);
132 ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
133 }
134
TearDown()135 void DistributedDBCloudKvTest::TearDown()
136 {
137 CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
138 CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
139 virtualCloudDb_ = nullptr;
140 virtualCloudDb2_ = nullptr;
141 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
142 LOGE("rm test db files error!");
143 }
144
145 if (deviceB_ != nullptr) {
146 delete deviceB_;
147 deviceB_ = nullptr;
148 }
149
150 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
151 communicatorAggregator_ = nullptr;
152 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
153 }
154
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)155 void DistributedDBCloudKvTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
156 DBStatus expectSyncResult)
157 {
158 if (delegate == nullptr) {
159 return;
160 }
161 std::mutex dataMutex;
162 std::condition_variable cv;
163 bool finish = false;
164 SyncProcess last;
165 auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
166 SyncProcess> &process) {
167 size_t notifyCnt = 0;
168 for (const auto &item: process) {
169 LOGD("user = %s, status = %d", item.first.c_str(), item.second.process);
170 if (item.second.process != DistributedDB::FINISHED) {
171 continue;
172 }
173 EXPECT_EQ(item.second.errCode, expectDBStatus);
174 {
175 std::lock_guard<std::mutex> autoLock(dataMutex);
176 notifyCnt++;
177 std::set<std::string> userSet(option.users.begin(), option.users.end());
178 if (notifyCnt == userSet.size()) {
179 finish = true;
180 last = item.second;
181 cv.notify_one();
182 }
183 }
184 }
185 };
186 auto actualRet = delegate->Sync(option, callback);
187 EXPECT_EQ(actualRet, expectSyncResult);
188 if (actualRet == OK) {
189 std::unique_lock<std::mutex> uniqueLock(dataMutex);
190 cv.wait(uniqueLock, [&finish]() {
191 return finish;
192 });
193 }
194 lastProcess_ = last;
195 }
196
GetDataBaseSchema(bool invalidSchema)197 DataBaseSchema DistributedDBCloudKvTest::GetDataBaseSchema(bool invalidSchema)
198 {
199 DataBaseSchema schema;
200 TableSchema tableSchema;
201 tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
202 Field field;
203 field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
204 field.type = TYPE_INDEX<std::string>;
205 field.primary = true;
206 tableSchema.fields.push_back(field);
207 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
208 field.primary = false;
209 tableSchema.fields.push_back(field);
210 field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
211 tableSchema.fields.push_back(field);
212 field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
213 tableSchema.fields.push_back(field);
214 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
215 field.type = TYPE_INDEX<int64_t>;
216 tableSchema.fields.push_back(field);
217 schema.tables.push_back(tableSchema);
218 return schema;
219 }
220
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)221 DBStatus DistributedDBCloudKvTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
222 KvStoreNbDelegate::Option option, bool invalidSchema)
223 {
224 DBStatus openRet = OK;
225 g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
226 openRet = status;
227 delegate = openDelegate;
228 });
229 EXPECT_EQ(openRet, OK);
230 EXPECT_NE(delegate, nullptr);
231
232 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
233 cloudDbs[USER_ID] = virtualCloudDb_;
234 cloudDbs[USER_ID_2] = virtualCloudDb2_;
235 delegate->SetCloudDB(cloudDbs);
236 std::map<std::string, DataBaseSchema> schemas;
237 schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
238 schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
239 return delegate->SetCloudDbSchema(schemas);
240 }
241
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)242 void DistributedDBCloudKvTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
243 {
244 if (delegate != nullptr) {
245 ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
246 delegate = nullptr;
247 DBStatus status = g_mgr.DeleteKvStore(storeId);
248 LOGD("delete kv store status %d store %s", status, storeId.c_str());
249 ASSERT_EQ(status, OK);
250 }
251 }
252
SyncAndGetProcessInfo(KvStoreNbDelegate * delegate,CloudSyncOption option)253 void DistributedDBCloudKvTest::SyncAndGetProcessInfo(KvStoreNbDelegate *delegate, CloudSyncOption option)
254 {
255 if (delegate == nullptr) {
256 return;
257 }
258 std::mutex dataMutex;
259 std::condition_variable cv;
260 bool isFinish = false;
261 vector<std::map<std::string, SyncProcess>> lists;
262 auto callback = [&cv, &dataMutex, &isFinish, &option, &lists](const std::map<std::string, SyncProcess> &process) {
263 size_t notifyCnt = 0;
264 for (const auto &item: process) {
265 LOGD("user = %s, status = %d", item.first.c_str(), item.second.process);
266 if (item.second.process != DistributedDB::FINISHED) {
267 continue;
268 }
269 {
270 std::lock_guard<std::mutex> autoLock(dataMutex);
271 notifyCnt++;
272 std::set<std::string> userSet(option.users.begin(), option.users.end());
273 if (notifyCnt == userSet.size()) {
274 isFinish = true;
275 cv.notify_one();
276 }
277 lists.push_back(process);
278 }
279 }
280 };
281 auto ret = delegate->Sync(option, callback);
282 EXPECT_EQ(ret, OK);
283 if (ret == OK) {
284 std::unique_lock<std::mutex> uniqueLock(dataMutex);
285 cv.wait(uniqueLock, [&isFinish]() {
286 return isFinish;
287 });
288 }
289 lastSyncProcess_ = lists.back();
290 }
291
CheckUserSyncInfo(const vector<std::string> users,const vector<DBStatus> userStatus,const vector<Info> userExpectInfo)292 bool DistributedDBCloudKvTest::CheckUserSyncInfo(const vector<std::string> users, const vector<DBStatus> userStatus,
293 const vector<Info> userExpectInfo)
294 {
295 uint32_t idx = 0;
296 for (auto &it: lastSyncProcess_) {
297 if ((idx >= users.size()) || (idx >= userStatus.size()) || (idx >= userExpectInfo.size())) {
298 return false;
299 }
300 string user = it.first;
301 if (user.compare(0, user.length(), users[idx]) != 0) {
302 return false;
303 }
304 SyncProcess actualSyncProcess = it.second;
305 EXPECT_EQ(actualSyncProcess.process, FINISHED);
306 EXPECT_EQ(actualSyncProcess.errCode, userStatus[idx]);
307 for (const auto &table : actualSyncProcess.tableProcess) {
308 EXPECT_EQ(table.second.upLoadInfo.total, userExpectInfo[idx].total);
309 EXPECT_EQ(table.second.upLoadInfo.successCount, userExpectInfo[idx].successCount);
310 EXPECT_EQ(table.second.upLoadInfo.insertCount, userExpectInfo[idx].insertCount);
311 EXPECT_EQ(table.second.upLoadInfo.failCount, userExpectInfo[idx].failCount);
312 }
313 idx++;
314 }
315 return true;
316 }
317
318 /**
319 * @tc.name: SubUser001
320 * @tc.desc: Test get and delete db with sub user.
321 * @tc.type: FUNC
322 * @tc.require:
323 * @tc.author: liaoyonghuang
324 */
325 HWTEST_F(DistributedDBCloudKvTest, SubUser001, TestSize.Level0)
326 {
327 /**
328 * @tc.steps: step1. Get db with subUser
329 * @tc.expected: step1.ok
330 */
331 KvStoreDelegateManager manager(APP_ID, USER_ID, "subUser");
332 KvStoreConfig subUserConfig = config_;
333 subUserConfig.dataDir = config_.dataDir + "/subUser";
334 OS::MakeDBDirectory(subUserConfig.dataDir);
335 manager.SetKvStoreConfig(subUserConfig);
336 KvStoreNbDelegate::Option option;
337 DBStatus openRet = OK;
338 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
339 manager.GetKvStore(STORE_ID_1, option,
__anonc761b38c0702(DBStatus status, KvStoreNbDelegate *openDelegate) 340 [&openRet, &kvDelegatePtrS3_](DBStatus status, KvStoreNbDelegate *openDelegate) {
341 openRet = status;
342 kvDelegatePtrS3_ = openDelegate;
343 });
344 EXPECT_EQ(openRet, OK);
345 ASSERT_NE(kvDelegatePtrS3_, nullptr);
346 /**
347 * @tc.steps: step2. close and delete db
348 * @tc.expected: step2.ok
349 */
350 ASSERT_EQ(manager.CloseKvStore(kvDelegatePtrS3_), OK);
351 DBStatus status = manager.DeleteKvStore(STORE_ID_1);
352 EXPECT_EQ(status, OK);
353 }
354
355 /**
356 * @tc.name: NormalSync001
357 * @tc.desc: Test normal push sync for add data.
358 * @tc.type: FUNC
359 * @tc.require:
360 * @tc.author: zhangqiquan
361 */
362 HWTEST_F(DistributedDBCloudKvTest, NormalSync001, TestSize.Level0)
363 {
364 Key key = {'k'};
365 Value expectValue = {'v'};
366 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
__anonc761b38c0802(const std::string &origin) 367 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
368 LOGW("origin is %s", origin.c_str());
369 return origin + "1";
370 });
371 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
372 EXPECT_EQ(virtualCloudDb_->GetPrepareTraceId(), std::string(""));
373 for (const auto &table : lastProcess_.tableProcess) {
374 EXPECT_EQ(table.second.upLoadInfo.total, 1u);
375 EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
376 }
377 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
378 for (const auto &table : lastProcess_.tableProcess) {
379 EXPECT_EQ(table.second.downLoadInfo.total, 2u); // download 2 records
380 EXPECT_EQ(table.second.downLoadInfo.insertCount, 2u); // download 2 records
381 }
382 Value actualValue;
383 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
384 EXPECT_EQ(actualValue, expectValue);
385 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
386 auto result = kvDelegatePtrS2_->GetCloudVersion("");
387 EXPECT_EQ(result.first, OK);
388 for (const auto &item : result.second) {
389 EXPECT_EQ(item.second, "1");
390 }
391 }
392
393 /**
394 * @tc.name: NormalSync002
395 * @tc.desc: Test normal push pull sync for add data.
396 * @tc.type: FUNC
397 * @tc.require:
398 * @tc.author: zhangqiquan
399 */
400 HWTEST_F(DistributedDBCloudKvTest, NormalSync002, TestSize.Level0)
401 {
402 /**
403 * @tc.steps: step1. store1 put (k1,v1) store2 put (k2,v2)
404 * @tc.expected: step1. both put ok
405 */
406 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anonc761b38c0902(const std::string &origin) 407 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
408 LOGW("origin is %s", origin.c_str());
409 return origin + "1";
410 });
__anonc761b38c0a02(const std::string &origin) 411 kvDelegatePtrS2_->SetGenCloudVersionCallback([](const std::string &origin) {
412 LOGW("origin is %s", origin.c_str());
413 return origin + "1";
414 });
415 Key key1 = {'k', '1'};
416 Value expectValue1 = {'v', '1'};
417 Key key2 = {'k', '2'};
418 Value expectValue2 = {'v', '2'};
419 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
420 ASSERT_EQ(kvDelegatePtrS2_->Put(key2, expectValue2), OK);
421 /**
422 * @tc.steps: step2. both store1 and store2 sync
423 * @tc.expected: step2. both sync ok, and store2 got (k1,v1) store1 not exist (k2,v2)
424 */
425 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
426 LOGW("Store1 sync end");
427 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
428 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
429 LOGW("Store2 sync end");
430 Value actualValue;
431 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
432 std::vector<Entry> entries;
433 EXPECT_EQ(kvDelegatePtrS2_->GetDeviceEntries(std::string("DEVICES_A"), entries), OK);
434 EXPECT_EQ(entries.size(), 1u); // 1 record
435 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
436 EXPECT_EQ(actualValue, expectValue1);
437 EXPECT_EQ(kvDelegatePtrS1_->Get(key2, actualValue), NOT_FOUND);
438 /**
439 * @tc.steps: step3. store1 sync again
440 * @tc.expected: step3. sync ok store1 got (k2,v2)
441 */
442 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
443 LOGW("Store1 sync end");
444 EXPECT_EQ(kvDelegatePtrS1_->Get(key2, actualValue), OK);
445 EXPECT_EQ(actualValue, expectValue2);
446 auto result = kvDelegatePtrS2_->GetCloudVersion("");
447 EXPECT_EQ(result.first, OK);
448 for (const auto &item : result.second) {
449 EXPECT_EQ(DBBase64Utils::DecodeIfNeed(item.first), item.first);
450 }
451 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
452 kvDelegatePtrS2_->SetGenCloudVersionCallback(nullptr);
453 }
454
455 /**
456 * @tc.name: NormalSync003
457 * @tc.desc: Test normal pull sync for update data.
458 * @tc.type: FUNC
459 * @tc.require:
460 * @tc.author: zhangqiquan
461 */
462 HWTEST_F(DistributedDBCloudKvTest, NormalSync003, TestSize.Level0)
463 {
464 /**
465 * @tc.steps: step1. store1 put (k1,v1) store2 put (k1,v2)
466 * @tc.expected: step1. both put ok
467 */
468 Key key = {'k', '1'};
469 Value expectValue1 = {'v', '1'};
470 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
471 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep for 100ms
472 Value expectValue2 = {'v', '2'};
473 ASSERT_EQ(kvDelegatePtrS2_->Put(key, expectValue2), OK);
474 /**
475 * @tc.steps: step2. both store1 and store2 sync
476 * @tc.expected: step2. both sync ok and store2 got (k1,v2)
477 */
478 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
479 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
480 Value actualValue;
481 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
482 EXPECT_EQ(actualValue, expectValue2);
483 /**
484 * @tc.steps: step2. store1 sync again
485 * @tc.expected: step2. sync ok and store1 got (k1,v2)
486 */
487 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
488 EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
489 EXPECT_EQ(actualValue, expectValue2);
490 }
491
492 /**
493 * @tc.name: NormalSync004
494 * @tc.desc: Test normal push sync for delete data.
495 * @tc.type: FUNC
496 * @tc.require:
497 * @tc.author: zhangqiquan
498 */
499 HWTEST_F(DistributedDBCloudKvTest, NormalSync004, TestSize.Level0)
500 {
501 /**
502 * @tc.steps: step1. store1 put (k1,v1) and both sync
503 * @tc.expected: step1. put ok and both sync ok
504 */
505 Key key = {'k'};
506 Value expectValue = {'v'};
507 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
508 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
509 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
510 Value actualValue;
511 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
512 EXPECT_EQ(actualValue, expectValue);
513 /**
514 * @tc.steps: step2. store1 delete (k1,v1) and both sync
515 * @tc.expected: step2. both put ok
516 */
517 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
518 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
519 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
520 actualValue.clear();
521 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), NOT_FOUND);
522 EXPECT_NE(actualValue, expectValue);
523 }
524
525 /**
526 * @tc.name: NormalSync005
527 * @tc.desc: Test normal push sync for add data.
528 * @tc.type: FUNC
529 * @tc.require:
530 * @tc.author: zhangqiquan
531 */
532 HWTEST_F(DistributedDBCloudKvTest, NormalSync005, TestSize.Level1)
533 {
534 for (int i = 0; i < 60; ++i) { // sync 60 records
535 Key key = {'k'};
536 Value expectValue = {'v'};
537 key.push_back(static_cast<uint8_t>(i));
538 expectValue.push_back(static_cast<uint8_t>(i));
539 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
540 }
541 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
542 for (const auto &process : lastProcess_.tableProcess) {
543 EXPECT_EQ(process.second.upLoadInfo.insertCount, 60u); // sync 60 records
544 }
545 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
546 for (const auto &process : lastProcess_.tableProcess) {
547 EXPECT_EQ(process.second.downLoadInfo.insertCount, 60u); // sync 60 records
548 }
549 }
550
551 /**
552 * @tc.name: NormalSync006
553 * @tc.desc: Test normal push sync with insert delete update.
554 * @tc.type: FUNC
555 * @tc.require:
556 * @tc.author: zhangqiquan
557 */
558 HWTEST_F(DistributedDBCloudKvTest, NormalSync006, TestSize.Level0)
559 {
560 Key k1 = {'k', '1'};
561 Key k2 = {'k', '2'};
562 Value v1 = {'v', '1'};
563 Value v2 = {'v', '2'};
564 Value v3 = {'v', '3'};
565 ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
566 ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v2), OK);
567 ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v3), OK);
568 ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
569 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
570 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
571 Value actualValue;
572 EXPECT_EQ(kvDelegatePtrS2_->Get(k1, actualValue), NOT_FOUND);
573 EXPECT_EQ(kvDelegatePtrS2_->Get(k2, actualValue), OK);
574 EXPECT_EQ(actualValue, v3);
575 }
576
577 /**
578 * @tc.name: NormalSync007
579 * @tc.desc: Test normal push sync with download and upload.
580 * @tc.type: FUNC
581 * @tc.require:
582 * @tc.author: zhangqiquan
583 */
584 HWTEST_F(DistributedDBCloudKvTest, NormalSync007, TestSize.Level0)
585 {
586 Key k1 = {'k', '1'};
587 Key k2 = {'k', '2'};
588 Key k3 = {'k', '3'};
589 Key k4 = {'k', '4'};
590 Value v1 = {'v', '1'};
591 Value v2 = {'v', '2'};
592 Value v3 = {'v', '3'};
593 ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v1), OK);
594 ASSERT_EQ(kvDelegatePtrS2_->Put(k2, v1), OK);
595 ASSERT_EQ(kvDelegatePtrS2_->Put(k3, v1), OK);
596 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
597 ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v2), OK);
598 ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v2), OK);
599 ASSERT_EQ(kvDelegatePtrS1_->Put(k4, v2), OK);
600 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
601 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
602 ASSERT_EQ(kvDelegatePtrS2_->Put(k4, v3), OK);
603 ASSERT_EQ(kvDelegatePtrS1_->Delete(k2), OK);
604 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
605 }
606
607 /**
608 * @tc.name: NormalSync008
609 * @tc.desc: Test complex sync.
610 * @tc.type: FUNC
611 * @tc.require:
612 * @tc.author: zhangqiquan
613 */
614 HWTEST_F(DistributedDBCloudKvTest, NormalSync008, TestSize.Level0)
615 {
616 Key k1 = {'k', '1'};
617 Value v1 = {'v', '1'};
618 deviceB_->PutData(k1, v1, 1u, 0); // 1 is current timestamp
619 deviceB_->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true);
620 Value actualValue;
621 EXPECT_EQ(kvDelegatePtrS2_->Get(k1, actualValue), OK);
622 EXPECT_EQ(actualValue, v1);
623 CloudSyncOption option;
624 option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
625 option.users.push_back(USER_ID);
626 option.devices.push_back("cloud");
627 BlockSync(kvDelegatePtrS2_, OK, option);
628 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
629 EXPECT_EQ(kvDelegatePtrS1_->Get(k1, actualValue), NOT_FOUND);
630 }
631
632 /**
633 * @tc.name: NormalSync009
634 * @tc.desc: Test normal push sync with download and upload.
635 * @tc.type: FUNC
636 * @tc.require:
637 * @tc.author: zhangqiquan
638 */
639 HWTEST_F(DistributedDBCloudKvTest, NormalSync009, TestSize.Level0)
640 {
641 Key k1 = {'k', '1'};
642 Key k2 = {'k', '2'};
643 Key k3 = {'k', '3'};
644 Value v1 = {'v', '1'};
645 Value v2 = {'v', '2'};
646 Value v3 = {'v', '3'};
647 ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
648 ASSERT_EQ(kvDelegatePtrS1_->Put(k2, v1), OK);
649 ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
650 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
651 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
652 ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v2), OK);
653 ASSERT_EQ(kvDelegatePtrS2_->Put(k3, v2), OK);
654 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
655 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
656 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
657 }
658
659 /**
660 * @tc.name: NormalSync010
661 * @tc.desc: Test normal push sync for add data with different user.
662 * @tc.type: FUNC
663 * @tc.require:
664 * @tc.author: zhangshijie
665 */
666 HWTEST_F(DistributedDBCloudKvTest, NormalSync010, TestSize.Level0)
667 {
668 // add <k1, v1>, sync to cloud with user1
669 Key key1 = {'k', '1'};
670 Value expectValue1 = {'v', '1'};
671 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
672 CloudSyncOption option;
673 option.users.push_back(USER_ID);
674 option.devices.push_back("cloud");
675 BlockSync(kvDelegatePtrS1_, OK, option);
676 for (const auto &table : lastProcess_.tableProcess) {
677 EXPECT_EQ(table.second.upLoadInfo.total, 1u);
678 }
679
680 // add <k2, v2>, sync to cloud with user2
681 Key key2 = {'k', '2'};
682 Value expectValue2 = {'v', '2'};
683 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
684 option.users.clear();
685 option.users.push_back(USER_ID_2);
686 BlockSync(kvDelegatePtrS1_, OK, option);
687 for (const auto &table : lastProcess_.tableProcess) {
688 EXPECT_EQ(table.second.upLoadInfo.total, 2u);
689 }
690
691 option.users.clear();
692 option.users.push_back(USER_ID);
693 option.users.push_back(USER_ID_2);
694 BlockSync(kvDelegatePtrS2_, OK, option);
695 for (const auto &table : lastProcess_.tableProcess) {
696 EXPECT_EQ(table.second.downLoadInfo.total, 2u);
697 }
698 Value actualValue;
699 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
700 EXPECT_EQ(actualValue, expectValue1);
701 Value actualValue2;
702 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
703 EXPECT_EQ(actualValue2, expectValue2);
704 }
705
706 /**
707 * @tc.name: NormalSync011
708 * @tc.desc: Do not synchronize when security label is S4.
709 * @tc.type: FUNC
710 * @tc.require:
711 * @tc.author: liaoyonghuang
712 */
713 HWTEST_F(DistributedDBCloudKvTest, NormalSync011, TestSize.Level0)
714 {
715 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
716 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
717 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
718
719 KvStoreNbDelegate::Option option;
720 option.secOption.securityLabel = S4;
721 EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
722 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
723 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
724 BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, SECURITY_OPTION_CHECK_ERROR);
725 CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
726 }
727
728 /**
729 * @tc.name: NormalSync012
730 * @tc.desc: Test normal push sync with memory db.
731 * @tc.type: FUNC
732 * @tc.require:
733 * @tc.author: zhangqiquan
734 */
735 HWTEST_F(DistributedDBCloudKvTest, NormalSync012, TestSize.Level0)
736 {
737 KvStoreNbDelegate *memoryDB1 = nullptr;
738 KvStoreNbDelegate::Option option1;
739 option1.isMemoryDb = true;
740 GetKvStore(memoryDB1, STORE_ID_3, option1);
741 ASSERT_NE(memoryDB1, nullptr);
742 KvStoreNbDelegate *memoryDB2 = nullptr;
743 KvStoreNbDelegate::Option option2;
744 option2.isMemoryDb = true;
745 GetKvStore(memoryDB2, STORE_ID_4, option2);
746 EXPECT_NE(memoryDB2, nullptr);
747 Key key1 = {'k', '1'};
748 Value expectValue1 = {'v', '1'};
749 EXPECT_EQ(memoryDB1->Put(key1, expectValue1), OK);
750 BlockSync(memoryDB1, OK, g_CloudSyncoption);
751 BlockSync(memoryDB2, OK, g_CloudSyncoption);
752 EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
753 EXPECT_EQ(g_mgr.CloseKvStore(memoryDB2), OK);
754 }
755
756 /**
757 * @tc.name: NormalSync013
758 * @tc.desc: Test the wrong schema.
759 * @tc.type: FUNC
760 * @tc.require:
761 * @tc.author: liaoyonghuang
762 */
763 HWTEST_F(DistributedDBCloudKvTest, NormalSync013, TestSize.Level0)
764 {
765 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
766 KvStoreNbDelegate::Option option;
767 EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option, true), INVALID_SCHEMA);
768 CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
769 }
770
771 /**
772 * @tc.name: NormalSync014
773 * @tc.desc: Test sync after user change.
774 * @tc.type: FUNC
775 * @tc.require:
776 * @tc.author: liaoyonghuang
777 */
778 HWTEST_F(DistributedDBCloudKvTest, NormalSync014, TestSize.Level1)
779 {
780 /**
781 * @tc.steps: step1. kvDelegatePtrS1_ put and sync data (k1, v1)
782 * @tc.expected: step1.ok
783 */
784 g_mgr.SetSyncActivationCheckCallback([] (const std::string &userId, const std::string &appId,
__anonc761b38c0b02(const std::string &userId, const std::string &appId, const std::string &storeId)785 const std::string &storeId)-> bool {
786 return true;
787 });
788 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
789 KvStoreNbDelegate::Option option;
790 option.syncDualTupleMode = true;
791 GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option);
792 Key key = {'k', '1'};
793 Value value = {'v', '1'};
794 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
795 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
796 /**
797 * @tc.steps: step2. Set sync block time 2s, and change user in sync block time
798 * @tc.expected: step2. Sync return USER_CHANGED.
799 */
800 virtualCloudDb_->SetBlockTime(2000); // 2000ms
__anonc761b38c0c02() 801 std::thread thread([&]() {
802 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // sleep for 1000ms
803 g_mgr.SetSyncActivationCheckCallback([] (const std::string &userId, const std::string &appId,
804 const std::string &storeId)-> bool {
805 return false;
806 });
807 RuntimeContext::GetInstance()->NotifyUserChanged();
808 });
809 BlockSync(kvDelegatePtrS3_, USER_CHANGED, g_CloudSyncoption);
810 thread.join();
811 CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
812 g_mgr.SetSyncActivationCheckCallback(nullptr);
813 }
814
815 /**
816 * @tc.name: NormalSync015
817 * @tc.desc: Test sync in all process.
818 * @tc.type: FUNC
819 * @tc.require:
820 * @tc.author: zhangqiquan
821 */
822 HWTEST_F(DistributedDBCloudKvTest, NormalSync015, TestSize.Level0)
823 {
824 Key key = {'k'};
825 Value expectValue = {'v'};
826 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
827 auto option = g_CloudSyncoption;
828 auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
829 | static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD);
830 option.lockAction = static_cast<LockAction>(action);
831 BlockSync(kvDelegatePtrS1_, OK, option);
832 for (const auto &table : lastProcess_.tableProcess) {
833 EXPECT_EQ(table.second.upLoadInfo.total, 1u);
834 }
835 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
836 Value actualValue;
837 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
838 EXPECT_EQ(actualValue, expectValue);
839 }
840
841 /**
842 * @tc.name: NormalSync016
843 * @tc.desc: Device A and device B have the same key data,
844 * and then devices B and A perform cloud synchronization sequentially.
845 * Finally, device A updates the data and performs cloud synchronization.
846 * Test if there is new data inserted into the cloud database.
847 * @tc.type: FUNC
848 * @tc.require:
849 * @tc.author: liaoyonghuang
850 */
851 HWTEST_F(DistributedDBCloudKvTest, NormalSync016, TestSize.Level0)
852 {
853 Key key = {'k', '1'};
854 Value value1 = {'v', '1'};
855 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value1), OK);
856 Value value2 = {'v', '2'};
857 ASSERT_EQ(kvDelegatePtrS2_->Put(key, value2), OK);
858 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
859 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
860
861 Value value3 = {'v', '3'};
862 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value3), OK);
__anonc761b38c0e02(VBucket &record) 863 virtualCloudDb_->SetInsertHook([](VBucket &record) {
864 for (auto &recordData : record) {
865 std::string insertKey = "key";
866 Type insertValue = "k1";
867 EXPECT_FALSE(recordData.first == insertKey && recordData.second == insertValue);
868 }
869 });
870 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
871 virtualCloudDb_->SetInsertHook(nullptr);
872 }
873
874 /**
875 * @tc.name: NormalSync017
876 * @tc.desc: Test duplicate addition, deletion, and sync.
877 * @tc.type: FUNC
878 * @tc.require:
879 * @tc.author: liaoyonghuang
880 */
881 HWTEST_F(DistributedDBCloudKvTest, NormalSync017, TestSize.Level0)
882 {
883 Key key = {'k'};
884 Value value = {'v'};
885 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
886 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
887 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
888 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
889 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
890 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
891 }
892
893 /**
894 * @tc.name: NormalSync018
895 * @tc.desc: Test putBatch and sync with memory db.
896 * @tc.type: FUNC
897 * @tc.require:
898 * @tc.author: liaoyonghuang
899 */
900 HWTEST_F(DistributedDBCloudKvTest, NormalSync018, TestSize.Level0)
901 {
902 /**
903 * @tc.steps:step1. Get two Memory DB.
904 * @tc.expected: step1 OK.
905 */
906 KvStoreNbDelegate *memoryDB1 = nullptr;
907 KvStoreNbDelegate::Option option1;
908 option1.isMemoryDb = true;
909 GetKvStore(memoryDB1, STORE_ID_3, option1);
910 ASSERT_NE(memoryDB1, nullptr);
911 KvStoreNbDelegate *memoryDB2 = nullptr;
912 KvStoreNbDelegate::Option option2;
913 option2.isMemoryDb = true;
914 GetKvStore(memoryDB2, STORE_ID_4, option2);
915 EXPECT_NE(memoryDB2, nullptr);
916
917 /**
918 * @tc.steps:step2. put 301 records and sync to cloud.
919 * @tc.expected: step2 OK.
920 */
921 vector<Entry> entries;
922 int count = 301; // put 301 records.
923 for (int i = 0; i < count; i++) {
924 std::string keyStr = "k_" + std::to_string(i);
925 std::string valueStr = "v_" + std::to_string(i);
926 Key key(keyStr.begin(), keyStr.end());
927 Value value(valueStr.begin(), valueStr.end());
928 entries.push_back({key, value});
929 }
930 EXPECT_EQ(memoryDB1->PutBatch(entries), OK);
931 BlockSync(memoryDB1, OK, g_CloudSyncoption);
932
933 /**
934 * @tc.steps:step3. Sync from cloud and check values.
935 * @tc.expected: step3 OK.
936 */
937 BlockSync(memoryDB2, OK, g_CloudSyncoption);
938 for (int i = 0; i < count; i++) {
939 std::string keyStr = "k_" + std::to_string(i);
940 std::string valueStr = "v_" + std::to_string(i);
941 Key key(keyStr.begin(), keyStr.end());
942 Value expectValue(valueStr.begin(), valueStr.end());
943 Value actualValue;
944 EXPECT_EQ(memoryDB2->Get(key, actualValue), OK);
945 EXPECT_EQ(actualValue, expectValue);
946 }
947 EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
948 EXPECT_EQ(g_mgr.CloseKvStore(memoryDB2), OK);
949 }
950
951 /**
952 * @tc.name: NormalSync019
953 * @tc.desc: Test dataItem has same time.
954 * @tc.type: FUNC
955 * @tc.require:
956 * @tc.author: zhangqiquan
957 */
958 HWTEST_F(DistributedDBCloudKvTest, NormalSync019, TestSize.Level0)
959 {
960 Key k1 = {'k', '1'};
961 Value v1 = {'v', '1'};
962 ASSERT_EQ(kvDelegatePtrS2_->Put(k1, v1), OK);
963 deviceB_->Sync(SyncMode::SYNC_MODE_PULL_ONLY, true);
964
965 VirtualDataItem dataItem;
966 deviceB_->GetData(k1, dataItem);
967 EXPECT_EQ(dataItem.timestamp, dataItem.writeTimestamp);
968 }
969
970 /**
971 * @tc.name: NormalSync020
972 * @tc.desc: Test sync with two users.
973 * @tc.type: FUNC
974 * @tc.require:
975 * @tc.author: liaoyonghuang
976 */
977 HWTEST_F(DistributedDBCloudKvTest, NormalSync020, TestSize.Level0)
978 {
979 /**
980 * @tc.steps:step1. Inserts a piece of data.
981 * @tc.expected: step1 OK.
982 */
983 Key k1 = {'k', '1'};
984 Value v1 = {'v', '1'};
985 ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
986 /**
987 * @tc.steps:step2. sync with two users.
988 * @tc.expected: step2 OK.
989 */
990 CloudSyncOption option;
991 option.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
992 option.users.push_back(USER_ID);
993 option.users.push_back(USER_ID_2);
994 option.devices.push_back("cloud");
995 BlockSync(kvDelegatePtrS1_, OK, option);
996 /**
997 * @tc.steps:step3. Check upLoadInfo.batchIndex of two users.
998 * @tc.expected: Both users have a upLoadInfo.batchIndex of 1.
999 */
1000 for (const auto &table : lastProcess_.tableProcess) {
1001 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 1u);
1002 }
1003 }
1004
1005 /**
1006 * @tc.name: NormalSync021
1007 * @tc.desc: Test Get Func to get cloudVersion.
1008 * @tc.type: FUNC
1009 * @tc.require:
1010 * @tc.author: caihaoting
1011 */
1012 HWTEST_F(DistributedDBCloudKvTest, NormalSync021, TestSize.Level0)
1013 {
1014 /**
1015 * @tc.steps:step1. store2 GetCloudVersion.
1016 * @tc.expected: step1 OK.
1017 */
1018 Key key = {'k'};
1019 Value expectValue = {'v'};
1020 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue), OK);
__anonc761b38c0f02(const std::string &origin) 1021 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1022 LOGW("origin is %s", origin.c_str());
1023 return origin + "1";
1024 });
1025 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1026 for (const auto &table : lastProcess_.tableProcess) {
1027 EXPECT_EQ(table.second.upLoadInfo.total, 1u);
1028 }
1029 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1030 Value actualValue;
1031 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
1032 EXPECT_EQ(actualValue, expectValue);
1033 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1034 auto result = kvDelegatePtrS2_->GetCloudVersion("");
1035 EXPECT_EQ(result.first, OK);
1036 for (auto item : result.second) {
1037 EXPECT_EQ(item.second, "1");
1038 }
1039 /**
1040 * @tc.steps:step2. store2 GetCloudVersion.
1041 * @tc.expected: step2 NOT_FOUND.
1042 */
1043 Key keyB;
1044 Value actualValueB;
1045 std::string deviceB = DBCommon::TransferStringToHex(DBCommon::TransferHashString("DEVICE_B"));
1046 std::string versionDeviceBStr = "naturalbase_cloud_version_" + deviceB;
1047 const char *buffer = versionDeviceBStr.c_str();
1048 for (uint32_t i = 0; i < versionDeviceBStr.size(); i++) {
1049 keyB.emplace_back(buffer[i]);
1050 }
1051 EXPECT_EQ(kvDelegatePtrS2_->Get(keyB, actualValueB), NOT_FOUND);
1052 }
1053
1054 /**
1055 * @tc.name: NormalSync022
1056 * @tc.desc: Test Cloud sync without schema.
1057 * @tc.type: FUNC
1058 * @tc.require:
1059 * @tc.author: zhangqiquan
1060 */
1061 HWTEST_F(DistributedDBCloudKvTest, NormalSync022, TestSize.Level0)
1062 {
1063 /**
1064 * @tc.steps:step1. Get Memory DB.
1065 * @tc.expected: step1 OK.
1066 */
1067 KvStoreNbDelegate *memoryDB1 = nullptr;
1068 KvStoreNbDelegate::Option option;
1069 option.isMemoryDb = true;
1070 DBStatus openRet = OK;
__anonc761b38c1002(DBStatus status, KvStoreNbDelegate *openDelegate) 1071 g_mgr.GetKvStore(STORE_ID_4, option, [&openRet, &memoryDB1](DBStatus status, KvStoreNbDelegate *openDelegate) {
1072 openRet = status;
1073 memoryDB1 = openDelegate;
1074 });
1075 EXPECT_EQ(openRet, OK);
1076 ASSERT_NE(memoryDB1, nullptr);
1077 /**
1078 * @tc.steps:step2. Sync without cloud schema.
1079 * @tc.expected: step2 SCHEMA_MISMATCH.
1080 */
1081 BlockSync(memoryDB1, OK, g_CloudSyncoption, CLOUD_ERROR);
1082 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
1083 cloudDbs[USER_ID] = virtualCloudDb_;
1084 cloudDbs[USER_ID_2] = virtualCloudDb2_;
1085 memoryDB1->SetCloudDB(cloudDbs);
1086 BlockSync(memoryDB1, OK, g_CloudSyncoption, SCHEMA_MISMATCH);
1087 EXPECT_EQ(g_mgr.CloseKvStore(memoryDB1), OK);
1088 }
1089
1090 /**
1091 * @tc.name: NormalSync023
1092 * @tc.desc: Test normal local delete before cloud delete.
1093 * @tc.type: FUNC
1094 * @tc.require:
1095 * @tc.author: zhangqiquan
1096 */
1097 HWTEST_F(DistributedDBCloudKvTest, NormalSync023, TestSize.Level0)
1098 {
1099 Key k1 = {'k', '1'};
1100 Value v1 = {'v', '1'};
1101 ASSERT_EQ(kvDelegatePtrS1_->Put(k1, v1), OK);
1102 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
1103 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1104 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1105 ASSERT_EQ(kvDelegatePtrS2_->Delete(k1), OK);
1106 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
1107 ASSERT_EQ(kvDelegatePtrS1_->Delete(k1), OK);
1108 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1109 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1110 }
1111
1112 /**
1113 * @tc.name: NormalSync024
1114 * @tc.desc: Test duplicate addition, deletion, and sync.
1115 * @tc.type: FUNC
1116 * @tc.require:
1117 * @tc.author: liaoyonghuang
1118 */
1119 HWTEST_F(DistributedDBCloudKvTest, NormalSync024, TestSize.Level0)
1120 {
1121 /**
1122 * @tc.steps:step1. Device A inserts data and synchronizes, then Device B synchronizes.
1123 * @tc.expected: step1 OK.
1124 */
1125 Key key = {'k'};
1126 Value value = {'v'};
1127 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1128 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1129 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1130 /**
1131 * @tc.steps:step2. Device A deletes data and synchronizes, then Device B synchronizes.
1132 * @tc.expected: step2 OK.
1133 */
1134 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
1135 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1136 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1137 /**
1138 * @tc.steps:step3. Device B inserts data and synchronizes it.
1139 * @tc.expected: step3 OK.
1140 */
1141 int insertNum = 0;
__anonc761b38c1102(VBucket &record) 1142 virtualCloudDb_->SetInsertHook([&insertNum](VBucket &record) {
1143 insertNum++;
1144 });
1145 ASSERT_EQ(kvDelegatePtrS2_->Put(key, value), OK);
1146 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1147 EXPECT_TRUE(insertNum > 0);
1148 virtualCloudDb_->SetInsertHook(nullptr);
1149 }
1150
1151 /**
1152 * @tc.name: NormalSync025
1153 * @tc.desc: Test merge sync.
1154 * @tc.type: FUNC
1155 * @tc.require:
1156 * @tc.author: zhangqiquan
1157 */
1158 HWTEST_F(DistributedDBCloudKvTest, NormalSync025, TestSize.Level0)
1159 {
1160 virtualCloudDb_->SetBlockTime(1000); // block query 1000ms
1161 auto option = g_CloudSyncoption;
1162 option.merge = false;
1163 EXPECT_EQ(kvDelegatePtrS1_->Sync(option, nullptr), OK);
1164 option.merge = true;
1165 EXPECT_EQ(kvDelegatePtrS1_->Sync(option, nullptr), OK);
1166 BlockSync(kvDelegatePtrS1_, CLOUD_SYNC_TASK_MERGED, option);
1167 }
1168
1169 /**
1170 * @tc.name: NormalSync026
1171 * @tc.desc: Test delete when sync mode DEVICE_COLLABORATION.
1172 * @tc.type: FUNC
1173 * @tc.require:
1174 * @tc.author: liaoyonghuang
1175 */
1176 HWTEST_F(DistributedDBCloudKvTest, NormalSync026, TestSize.Level0)
1177 {
1178 /**
1179 * @tc.steps:step1. Create a database with the DEVICE_COLLABORATION mode on device1.
1180 * @tc.expected: step1 OK.
1181 */
1182 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1183 KvStoreNbDelegate::Option option;
1184 option.conflictResolvePolicy = DEVICE_COLLABORATION;
1185 EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
1186 /**
1187 * @tc.steps:step2. put 1 record and sync.
1188 * @tc.expected: step2 OK.
1189 */
1190 Key key = {'k'};
1191 Value expectValue1 = {'v', '1'};
1192 ASSERT_EQ(kvDelegatePtrS3_->Put(key, expectValue1), OK);
1193 BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption);
1194 /**
1195 * @tc.steps:step3. Update this record on device2.
1196 * @tc.expected: step3 OK.
1197 */
1198 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1199 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
1200 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1201 /**
1202 * @tc.steps:step4. device1 sync.
1203 * @tc.expected: step4. The record was not covered by the cloud and cloud was covered.
1204 */
1205 BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption);
1206 Value actualValue1;
1207 EXPECT_EQ(kvDelegatePtrS3_->Get(key, actualValue1), OK);
1208 EXPECT_EQ(actualValue1, expectValue1);
1209 CloseKvStore(kvDelegatePtrS3_, STORE_ID_3);
1210 }
1211
1212 /**
1213 * @tc.name: NormalSync027
1214 * @tc.desc: Test lock failed during data insert.
1215 * @tc.type: FUNC
1216 * @tc.require:
1217 * @tc.author: liaoyonghuang
1218 */
1219 HWTEST_F(DistributedDBCloudKvTest, NormalSync027, TestSize.Level0)
1220 {
1221 /**
1222 * @tc.steps:step1. put 1 record and sync.
1223 * @tc.expected: step1 OK.
1224 */
1225 Key key = {'k'};
1226 Value value = {'v'};
1227 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1228 /**
1229 * @tc.steps:step2. Lock cloud DB, and do sync.
1230 * @tc.expected: step2 sync failed due to lock fail, and failCount is 1.
1231 */
1232 virtualCloudDb_->Lock();
1233 BlockSync(kvDelegatePtrS1_, CLOUD_LOCK_ERROR, g_CloudSyncoption);
1234 for (const auto &table : lastProcess_.tableProcess) {
1235 EXPECT_EQ(table.second.upLoadInfo.total, 1u);
1236 EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
1237 EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1238 }
1239 virtualCloudDb_->UnLock();
1240 }
1241
1242 /**
1243 * @tc.name: NormalSync028
1244 * @tc.desc: Test multi user sync.
1245 * @tc.type: FUNC
1246 * @tc.require:
1247 * @tc.author: caihaoting
1248 */
1249 HWTEST_F(DistributedDBCloudKvTest, NormalSync028, TestSize.Level0)
1250 {
1251 /**
1252 * @tc.steps:step1. put 1 record and sync.
1253 * @tc.expected: step1 OK.
1254 */
1255 Key key = {'k'};
1256 Value value = {'v'};
1257 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1258 auto option = g_CloudSyncoption;
1259 option.users = {USER_ID, USER_ID_2};
1260 BlockSync(kvDelegatePtrS1_, OK, option);
1261 option.users = {USER_ID_2};
1262 BlockSync(kvDelegatePtrS2_, OK, option);
1263 option.users = {USER_ID, USER_ID_2};
1264 BlockSync(kvDelegatePtrS2_, OK, option);
1265 EXPECT_EQ(lastProcess_.tableProcess[USER_ID_2].downLoadInfo.total, 0u);
1266 }
1267
1268 /**
1269 * @tc.name: NormalSync029
1270 * @tc.desc: Test lock failed during data insert.
1271 * @tc.type: FUNC
1272 * @tc.require:
1273 * @tc.author: zhangqiquan
1274 */
1275 HWTEST_F(DistributedDBCloudKvTest, NormalSync029, TestSize.Level3)
1276 {
1277 /**
1278 * @tc.steps:step1. block virtual cloud.
1279 */
1280 virtualCloudDb_->SetBlockTime(1000); // block 1s
1281 /**
1282 * @tc.steps:step2. Call sync and let it run with block cloud.
1283 * @tc.expected: step2 sync ok.
1284 */
1285 auto option = g_CloudSyncoption;
1286 std::mutex finishMutex;
1287 std::condition_variable cv;
1288 std::vector<std::map<std::string, SyncProcess>> processRes;
__anonc761b38c1202(const std::map<std::string, SyncProcess> &process) 1289 auto callback = [&processRes, &finishMutex, &cv](const std::map<std::string, SyncProcess> &process) {
1290 for (const auto &item : process) {
1291 if (item.second.process != ProcessStatus::FINISHED) {
1292 return;
1293 }
1294 }
1295 {
1296 std::lock_guard<std::mutex> autoLock(finishMutex);
1297 processRes.push_back(process);
1298 }
1299 cv.notify_all();
1300 };
1301 EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1302 /**
1303 * @tc.steps:step3. USER0 sync first and USER2 sync second.
1304 * @tc.expected: step3 sync ok and USER2 task has been merged.
1305 */
1306 option.merge = true;
1307 option.users = {USER_ID};
1308 EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1309 option.users = {USER_ID_2};
1310 EXPECT_EQ(kvDelegatePtrS1_->Sync(option, callback), OK);
1311 /**
1312 * @tc.steps:step4. Wait all task finished.
1313 * @tc.expected: step4 Second sync task has USER0 and USER2.
1314 */
1315 virtualCloudDb_->SetBlockTime(0); // cancel block for test case run quickly
1316 std::unique_lock<std::mutex> uniqueLock(finishMutex);
__anonc761b38c1302() 1317 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [&processRes]() {
1318 return processRes.size() >= 3; // call sync 3 times
1319 });
1320 EXPECT_EQ(processRes[processRes.size() - 1].size(), 2u); // check the last process has 2 user
1321 }
1322
1323 /**
1324 * @tc.name: NormalSync030
1325 * @tc.desc: Test sync while set null cloudDB
1326 * @tc.type: FUNC
1327 * @tc.require:
1328 * @tc.author: caihaoting
1329 */
1330 HWTEST_F(DistributedDBCloudKvTest, NormalSync030, TestSize.Level0)
1331 {
1332 /**
1333 * @tc.steps:step1. Create a database.
1334 * @tc.expected: step1 OK.
1335 */
1336 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1337 KvStoreNbDelegate::Option option;
1338 DBStatus ret = OK;
__anonc761b38c1402(DBStatus status, KvStoreNbDelegate *openDelegate) 1339 g_mgr.GetKvStore(STORE_ID_3, option, [&ret, &kvDelegatePtrS3_](DBStatus status, KvStoreNbDelegate *openDelegate) {
1340 ret = status;
1341 kvDelegatePtrS3_ = openDelegate;
1342 });
1343 EXPECT_EQ(ret, OK);
1344 ASSERT_NE(kvDelegatePtrS3_, nullptr);
1345 /**
1346 * @tc.steps:step2. Put {k, v}.
1347 * @tc.expected: step2 OK.
1348 */
1349 Key key = {'k'};
1350 Value value = {'v'};
1351 ASSERT_EQ(kvDelegatePtrS3_->Put(key, value), OK);
1352 /**
1353 * @tc.steps:step3. Set null cloudDB.
1354 * @tc.expected: step3 INVALID_ARGS.
1355 */
1356 BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, CLOUD_ERROR);
1357 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
1358 cloudDbs[USER_ID] = nullptr;
1359 cloudDbs[USER_ID_2] = virtualCloudDb2_;
1360 EXPECT_EQ(kvDelegatePtrS3_->SetCloudDB(cloudDbs), INVALID_ARGS);
1361 /**
1362 * @tc.steps:step4. Sync while set null cloudDB.
1363 * @tc.expected: step4 CLOUD_ERROR.
1364 */
1365 BlockSync(kvDelegatePtrS3_, OK, g_CloudSyncoption, CLOUD_ERROR);
1366 EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrS3_), OK);
1367 }
1368
1369 /**
1370 * @tc.name: NormalSync031
1371 * @tc.desc: Test sync with error local device
1372 * @tc.type: FUNC
1373 * @tc.require:
1374 * @tc.author: zhangqiquan
1375 */
1376 HWTEST_F(DistributedDBCloudKvTest, NormalSync031, TestSize.Level0)
1377 {
1378 /**
1379 * @tc.steps:step1. put 1 record and sync.
1380 * @tc.expected: step1 OK.
1381 */
1382 Key key = {'k'};
1383 Value value = {'v'};
1384 ASSERT_EQ(kvDelegatePtrS2_->Put(key, value), OK);
1385 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1386 /**
1387 * @tc.steps:step2. Set local devices error and sync.
1388 * @tc.expected: step2 sync fail.
1389 */
1390 communicatorAggregator_->MockGetLocalDeviceRes(-E_CLOUD_ERROR);
1391 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1392 communicatorAggregator_->MockGetLocalDeviceRes(E_OK);
1393 for (const auto &table : lastProcess_.tableProcess) {
1394 EXPECT_EQ(table.second.downLoadInfo.total, 0u);
1395 EXPECT_EQ(table.second.downLoadInfo.failCount, 0u);
1396 EXPECT_EQ(table.second.upLoadInfo.total, 0u);
1397 }
1398 }
1399
1400 /**
1401 * @tc.name: NormalSync032
1402 * @tc.desc: Test some record upload fail in 1 batch.
1403 * @tc.type: FUNC
1404 * @tc.require:
1405 * @tc.author: liaoyonghuang
1406 */
1407 HWTEST_F(DistributedDBCloudKvTest, NormalSync032, TestSize.Level0)
1408 {
1409 /**
1410 * @tc.steps:step1. put 10 records.
1411 * @tc.expected: step1 ok.
1412 */
1413 vector<Entry> entries;
1414 int count = 10; // put 10 records.
1415 for (int i = 0; i < count; i++) {
1416 std::string keyStr = "k_" + std::to_string(i);
1417 std::string valueStr = "v_" + std::to_string(i);
1418 Key key(keyStr.begin(), keyStr.end());
1419 Value value(valueStr.begin(), valueStr.end());
1420 entries.push_back({key, value});
1421 }
1422 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1423 /**
1424 * @tc.steps:step2. sync and set the last record upload fail.
1425 * @tc.expected: step2 sync fail and upLoadInfo.failCount is 1.
1426 */
1427 int uploadFailId = 0;
1428 virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c1502(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1429 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1430 uploadFailId++;
1431 if (uploadFailId == 10) { // 10 is the last record
1432 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1433 return CLOUD_ERROR;
1434 }
1435 return OK;
1436 });
1437 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1438 for (const auto &table : lastProcess_.tableProcess) {
1439 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1440 EXPECT_EQ(table.second.upLoadInfo.successCount, 9u);
1441 EXPECT_EQ(table.second.upLoadInfo.insertCount, 9u);
1442 EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1443 }
1444 virtualCloudDb_->ForkUpload(nullptr);
1445 }
1446
1447 /**
1448 * @tc.name: NormalSync033
1449 * @tc.desc: test sync with different operation type and check upLoadInfo
1450 * @tc.type: FUNC
1451 * @tc.require:
1452 * @tc.author: liaoyonghuang
1453 */
1454 HWTEST_F(DistributedDBCloudKvTest, NormalSync033, TestSize.Level0)
1455 {
1456 /**
1457 * @tc.steps:step1. put local records {k1, v1} {k2, v2} and sync to cloud.
1458 * @tc.expected: step1 ok.
1459 */
1460 Key key1 = {'k', '1'};
1461 Value value1 = {'v', '1'};
1462 kvDelegatePtrS1_->Put(key1, value1);
1463 Key key2 = {'k', '2'};
1464 Value value2 = {'v', '2'};
1465 kvDelegatePtrS1_->Put(key2, value2);
1466 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1467 /**
1468 * @tc.steps:step2. put {k3, v3}, delete {k1, v1}, and put {k2, v3}
1469 * @tc.expected: step2 ok.
1470 */
1471 Key key3 = {'k', '3'};
1472 Value value3 = {'v', '3'};
1473 kvDelegatePtrS1_->Put(key3, value3);
1474 kvDelegatePtrS1_->Delete(key1);
1475 kvDelegatePtrS1_->Put(key2, value3);
1476 /**
1477 * @tc.steps:step3. sync and check upLoadInfo
1478 * @tc.expected: step3 ok.
1479 */
1480 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1481 for (const auto &table : lastProcess_.tableProcess) {
1482 EXPECT_EQ(table.second.upLoadInfo.total, 3u);
1483 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 3u);
1484 EXPECT_EQ(table.second.upLoadInfo.successCount, 3u);
1485 EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
1486 EXPECT_EQ(table.second.upLoadInfo.deleteCount, 1u);
1487 EXPECT_EQ(table.second.upLoadInfo.updateCount, 1u);
1488 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1489 }
1490 }
1491
1492 /**
1493 * @tc.name: NormalSync034
1494 * @tc.desc: test sync data which size > maxUploadSize.
1495 * @tc.type: FUNC
1496 * @tc.require:
1497 * @tc.author: liaoyonghuang
1498 */
1499 HWTEST_F(DistributedDBCloudKvTest, NormalSync034, TestSize.Level0)
1500 {
1501 /**
1502 * @tc.steps:step1. put data which size > maxUploadSize.
1503 * @tc.expected: step1 ok.
1504 */
1505 CloudSyncConfig config;
1506 int maxUploadSize = 1024000;
1507 int maxUploadCount = 40;
1508 config.maxUploadSize = maxUploadSize;
1509 config.maxUploadCount = maxUploadCount;
1510 kvDelegatePtrS1_->SetCloudSyncConfig(config);
1511 Key key = {'k'};
1512 Value value = {'v'};
1513 value.insert(value.end(), maxUploadSize, '0');
1514 kvDelegatePtrS1_->Put(key, value);
1515 /**
1516 * @tc.steps:step2. put entries which count > maxUploadCount.
1517 * @tc.expected: step2 ok.
1518 */
1519 vector<Entry> entries;
1520 for (int i = 0; i < maxUploadCount; i++) {
1521 std::string keyStr = "k_" + std::to_string(i);
1522 std::string valueStr = "v_" + std::to_string(i);
1523 Key key(keyStr.begin(), keyStr.end());
1524 Value value(valueStr.begin(), valueStr.end());
1525 entries.push_back({key, value});
1526 }
1527 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1528 /**
1529 * @tc.steps:step3. sync and check upLoadInfo.batchIndex.
1530 * @tc.expected: step3 ok.
1531 */
1532 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1533 for (const auto &table : lastProcess_.tableProcess) {
1534 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 2u);
1535 }
1536 }
1537
1538 /**
1539 * @tc.name: NormalSync035
1540 * @tc.desc:test sync after export and import
1541 * @tc.type: FUNC
1542 * @tc.require:
1543 * @tc.author: caihaoting
1544 */
1545 HWTEST_F(DistributedDBCloudKvTest, NormalSync035, TestSize.Level0)
1546 {
1547 /**
1548 * @tc.steps:step1. put local records {k1, v1} and sync to cloud.
1549 * @tc.expected: step1 ok.
1550 */
1551 Key key1 = {'k', '1'};
1552 Value value1 = {'v', '1'};
1553 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, value1), OK);
__anonc761b38c1602(const std::string &origin) 1554 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1555 LOGW("origin is %s", origin.c_str());
1556 return origin + "1";
1557 });
1558 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1559 Value actualValue1;
1560 EXPECT_EQ(kvDelegatePtrS1_->Get(key1, actualValue1), OK);
1561 EXPECT_EQ(actualValue1, value1);
1562 auto result = kvDelegatePtrS1_->GetCloudVersion("");
1563 EXPECT_EQ(result.first, OK);
1564 for (const auto &item : result.second) {
1565 EXPECT_EQ(item.second, "1");
1566 }
1567 /**
1568 * @tc.steps:step2. export and import.
1569 * @tc.expected: step2 export and import ok.
1570 */
1571 std::string singleExportFileName = g_testDir + "/NormalSync034.$$";
1572 CipherPassword passwd;
1573 EXPECT_EQ(kvDelegatePtrS1_->Export(singleExportFileName, passwd), OK);
1574 EXPECT_EQ(kvDelegatePtrS1_->Import(singleExportFileName, passwd), OK);
1575 /**
1576 * @tc.steps:step3. put {k1, v2} and sync to cloud.
1577 * @tc.expected: step3 ok.
1578 */
1579 Value value2 = {'v', '2'};
1580 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, value2), OK);
1581 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1582 Value actualValue2;
1583 EXPECT_EQ(kvDelegatePtrS1_->Get(key1, actualValue2), OK);
1584 EXPECT_EQ(actualValue2, value2);
1585 result = kvDelegatePtrS1_->GetCloudVersion("");
1586 EXPECT_EQ(result.first, OK);
1587 for (const auto &item : result.second) {
1588 EXPECT_EQ(item.second, "11");
1589 }
1590 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1591 }
1592
1593 /**
1594 * @tc.name: NormalSync036
1595 * @tc.desc: test sync data with SetCloudSyncConfig.
1596 * @tc.type: FUNC
1597 * @tc.require:
1598 * @tc.author: caihaoting
1599 */
1600 HWTEST_F(DistributedDBCloudKvTest, NormalSync036, TestSize.Level0)
1601 {
1602 /**
1603 * @tc.steps:step1. put data and SetCloudSyncConfig.
1604 * @tc.expected: step1. ok.
1605 */
1606 CloudSyncConfig config;
1607 int maxUploadCount = 40;
1608 config.maxUploadCount = maxUploadCount;
1609 kvDelegatePtrS1_->SetCloudSyncConfig(config);
1610 Key key = {'k', '1'};
1611 Value value = {'v', '1'};
1612 kvDelegatePtrS1_->Put(key, value);
1613 /**
1614 * @tc.steps:step2. sync.
1615 * @tc.expected: step2. ok.
1616 */
1617 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1618 }
1619
1620 /**
1621 * @tc.name: NormalSync037
1622 * @tc.desc: test update sync data while local and cloud record device is same and not local device.
1623 * @tc.type: FUNC
1624 * @tc.require:
1625 * @tc.author: caihaoting
1626 */
1627 HWTEST_F(DistributedDBCloudKvTest, NormalSync037, TestSize.Level0)
1628 {
1629 /**
1630 * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1631 * @tc.expected: step1. ok.
1632 */
1633 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1634 Key key = {'k', '1'};
1635 Value expectValue1 = {'v', '1'};
1636 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
1637 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1638 /**
1639 * @tc.steps:step2. deviceB sync to cloud.
1640 * @tc.expected: step2. ok.
1641 */
1642 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1643 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1644 Value actualValue1;
1645 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
1646 EXPECT_EQ(actualValue1, expectValue1);
1647 /**
1648 * @tc.steps:step3. deviceA put {k1, v2} and sync to cloud.
1649 * @tc.expected: step3. ok.
1650 */
1651 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1652 Value expectValue2 = {'v', '2'};
1653 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue2), OK);
1654 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1655 /**
1656 * @tc.steps:step4. deviceB sync to cloud.
1657 * @tc.expected: step4. ok.
1658 */
1659 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1660 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1661 Value actualValue2;
1662 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
1663 EXPECT_EQ(actualValue2, expectValue2);
1664 }
1665
1666 /**
1667 * @tc.name: NormalSync038
1668 * @tc.desc: test insert sync data while local and cloud record device is same and not local device.
1669 * @tc.type: FUNC
1670 * @tc.require:
1671 * @tc.author: caihaoting
1672 */
1673 HWTEST_F(DistributedDBCloudKvTest, NormalSync038, TestSize.Level0)
1674 {
1675 /**
1676 * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1677 * @tc.expected: step1. ok.
1678 */
1679 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1680 Key key1 = {'k', '1'};
1681 Value expectValue1 = {'v', '1'};
1682 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1683 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1684 /**
1685 * @tc.steps:step2. deviceB sync to cloud.
1686 * @tc.expected: step2. ok.
1687 */
1688 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1689 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1690 Value actualValue1;
1691 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1692 EXPECT_EQ(actualValue1, expectValue1);
1693 /**
1694 * @tc.steps:step3. deviceA put {k2, v2} and sync to cloud.
1695 * @tc.expected: step3. ok.
1696 */
1697 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1698 Key key2 = {'k', '2'};
1699 Value expectValue2 = {'v', '2'};
1700 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1701 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1702 /**
1703 * @tc.steps:step4. deviceB sync to cloud.
1704 * @tc.expected: step4. ok.
1705 */
1706 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1707 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1708 actualValue1.clear();
1709 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1710 EXPECT_EQ(actualValue1, expectValue1);
1711 Value actualValue2;
1712 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
1713 EXPECT_EQ(actualValue2, expectValue2);
1714 }
1715
1716 /**
1717 * @tc.name: NormalSync039
1718 * @tc.desc: test delete sync data while local and cloud record device is same and not local device.
1719 * @tc.type: FUNC
1720 * @tc.require:
1721 * @tc.author: caihaoting
1722 */
1723 HWTEST_F(DistributedDBCloudKvTest, NormalSync039, TestSize.Level0)
1724 {
1725 /**
1726 * @tc.steps:step1. deviceA put {k1, v1} {k2, v2} and sync to cloud.
1727 * @tc.expected: step1. ok.
1728 */
1729 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1730 Key key1 = {'k', '1'};
1731 Value expectValue1 = {'v', '1'};
1732 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1733 Key key2 = {'k', '2'};
1734 Value expectValue2 = {'v', '2'};
1735 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1736 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1737 /**
1738 * @tc.steps:step2. deviceB sync to cloud.
1739 * @tc.expected: step2. ok.
1740 */
1741 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1742 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1743 Value actualValue1;
1744 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1745 EXPECT_EQ(actualValue1, expectValue1);
1746 Value actualValue2;
1747 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
1748 EXPECT_EQ(actualValue2, expectValue2);
1749 /**
1750 * @tc.steps:step3. deviceA delete {k2, v2} and sync to cloud.
1751 * @tc.expected: step3. ok.
1752 */
1753 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1754 ASSERT_EQ(kvDelegatePtrS1_->Delete(key2), OK);
1755 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1756 /**
1757 * @tc.steps:step4. deviceB sync to cloud.
1758 * @tc.expected: step4. ok.
1759 */
1760 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1761 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1762 actualValue1.clear();
1763 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue1), OK);
1764 EXPECT_EQ(actualValue1, expectValue1);
1765 actualValue2.clear();
1766 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), NOT_FOUND);
1767 EXPECT_NE(actualValue2, expectValue2);
1768 }
1769
1770 /**
1771 * @tc.name: NormalSync040
1772 * @tc.desc: test update sync data while local and cloud record device is different and not local device.
1773 * @tc.type: FUNC
1774 * @tc.require:
1775 * @tc.author: caihaoting
1776 */
1777 HWTEST_F(DistributedDBCloudKvTest, NormalSync040, TestSize.Level0)
1778 {
1779 /**
1780 * @tc.steps:step1. deviceA put {k1, v1} and sync to cloud.
1781 * @tc.expected: step1. ok.
1782 */
1783 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1784 Key key = {'k', '1'};
1785 Value expectValue1 = {'v', '1'};
1786 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue1), OK);
1787 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1788 /**
1789 * @tc.steps:step2. deviceB sync to cloud.
1790 * @tc.expected: step2. ok.
1791 */
1792 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1793 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1794 Value actualValue1;
1795 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
1796 EXPECT_EQ(actualValue1, expectValue1);
1797 /**
1798 * @tc.steps:step3. deviceA put {k1, v2} and sync to cloud.
1799 * @tc.expected: step3. ok.
1800 */
1801 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
1802 Value expectValue2 = {'v', '2'};
1803 ASSERT_EQ(kvDelegatePtrS1_->Put(key, expectValue2), OK);
1804 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1805 /**
1806 * @tc.steps:step4. deviceB put {k1, v3} sync to cloud.
1807 * @tc.expected: step4. ok.
1808 */
1809 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
1810 Value expectValue3 = {'v', '3'};
1811 ASSERT_EQ(kvDelegatePtrS2_->Put(key, expectValue3), OK);
1812 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
1813 Value actualValue2;
1814 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
1815 EXPECT_NE(actualValue2, expectValue2);
1816 EXPECT_EQ(actualValue2, expectValue3);
1817 }
1818
1819 /**
1820 * @tc.name: NormalSync041
1821 * @tc.desc: Test concurrent sync and close DB.
1822 * @tc.type: FUNC
1823 * @tc.require:
1824 * @tc.author: liaoyonghuang
1825 */
1826 HWTEST_F(DistributedDBCloudKvTest, NormalSync041, TestSize.Level1)
1827 {
1828 /**
1829 * @tc.steps:step1. put data to cloud.
1830 * @tc.expected: step1 ok.
1831 */
1832 Key key = {'k', '1'};
1833 Value value = {'v', '1'};
1834 kvDelegatePtrS1_->Put(key, value);
1835 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
1836
1837 /**
1838 * @tc.steps:step2. sync and close DB concurrently.
1839 * @tc.expected: step2 ok.
1840 */
1841 KvStoreNbDelegate* kvDelegatePtrS3_ = nullptr;
1842 KvStoreNbDelegate::Option option;
1843 EXPECT_EQ(GetKvStore(kvDelegatePtrS3_, STORE_ID_3, option), OK);
__anonc761b38c1702(const std::string &tableName, VBucket &extend) 1844 virtualCloudDb_->ForkQuery([](const std::string &tableName, VBucket &extend) {
1845 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep for 200ms
1846 });
1847 KvStoreDelegateManager &mgr = g_mgr;
__anonc761b38c1802() 1848 std::thread syncThread([&mgr, &kvDelegatePtrS3_]() {
1849 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep for 100ms
1850 EXPECT_EQ(mgr.CloseKvStore(kvDelegatePtrS3_), OK);
1851 });
1852 EXPECT_EQ(kvDelegatePtrS3_->Sync(g_CloudSyncoption, nullptr), OK);
1853 syncThread.join();
1854 }
1855
1856 /**
1857 * @tc.name: NormalSync042
1858 * @tc.desc: Test some record upload fail in 1 batch and get cloud version successfully.
1859 * @tc.type: FUNC
1860 * @tc.require:
1861 * @tc.author: caihaoting
1862 */
1863 HWTEST_F(DistributedDBCloudKvTest, NormalSync042, TestSize.Level0)
1864 {
1865 /**
1866 * @tc.steps:step1. put 10 records.
1867 * @tc.expected: step1 ok.
1868 */
1869 vector<Entry> entries;
1870 int count = 10; // put 10 records.
1871 for (int i = 0; i < count; i++) {
1872 std::string keyStr = "k_" + std::to_string(i);
1873 std::string valueStr = "v_" + std::to_string(i);
1874 Key key(keyStr.begin(), keyStr.end());
1875 Value value(valueStr.begin(), valueStr.end());
1876 entries.push_back({key, value});
1877 }
1878 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
__anonc761b38c1902(const std::string &origin) 1879 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1880 LOGW("origin is %s", origin.c_str());
1881 return origin + "1";
1882 });
1883 /**
1884 * @tc.steps:step2. sync and set the last record upload fail.
1885 * @tc.expected: step2 sync fail and upLoadInfo.failCount is 1.
1886 */
1887 int uploadFailId = 0;
1888 virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c1a02(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1889 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1890 uploadFailId++;
1891 if (uploadFailId == 10) { // 10 is the last record
1892 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1893 return CLOUD_ERROR;
1894 }
1895 return OK;
1896 });
1897 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1898 for (const auto &table : lastProcess_.tableProcess) {
1899 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1900 EXPECT_EQ(table.second.upLoadInfo.successCount, 9u);
1901 EXPECT_EQ(table.second.upLoadInfo.insertCount, 9u);
1902 EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
1903 }
1904 /**
1905 * @tc.steps:step3. get cloud version successfully.
1906 * @tc.expected: step3 OK.
1907 */
1908 auto result = kvDelegatePtrS1_->GetCloudVersion("");
1909 EXPECT_EQ(result.first, OK);
1910 for (const auto &item : result.second) {
1911 EXPECT_EQ(item.second, "1");
1912 }
1913 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1914 virtualCloudDb_->ForkUpload(nullptr);
1915 }
1916
1917 /**
1918 * @tc.name: NormalSync043
1919 * @tc.desc: Test some record upload fail in 1 batch and get cloud version failed.
1920 * @tc.type: FUNC
1921 * @tc.require:
1922 * @tc.author: caihaoting
1923 */
1924 HWTEST_F(DistributedDBCloudKvTest, NormalSync043, TestSize.Level0)
1925 {
1926 /**
1927 * @tc.steps:step1. put 10 records.
1928 * @tc.expected: step1 ok.
1929 */
1930 vector<Entry> entries;
1931 int count = 10; // put 10 records.
1932 for (int i = 0; i < count; i++) {
1933 std::string keyStr = "k_" + std::to_string(i);
1934 std::string valueStr = "v_" + std::to_string(i);
1935 Key key(keyStr.begin(), keyStr.end());
1936 Value value(valueStr.begin(), valueStr.end());
1937 entries.push_back({key, value});
1938 }
1939 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
__anonc761b38c1b02(const std::string &origin) 1940 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1941 LOGW("origin is %s", origin.c_str());
1942 return origin + "1";
1943 });
1944 /**
1945 * @tc.steps:step2. sync and set all record upload fail.
1946 * @tc.expected: step2 sync fail and upLoadInfo.failCount is 10.
1947 */
1948 virtualCloudDb_->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c1c02(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1949 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1950 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1951 return CLOUD_ERROR;
1952 });
1953 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1954 for (const auto &table : lastProcess_.tableProcess) {
1955 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1956 EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
1957 EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
1958 EXPECT_EQ(table.second.upLoadInfo.failCount, 10u);
1959 }
1960 /**
1961 * @tc.steps:step3. get cloud version failed.
1962 * @tc.expected: step3 NOT_FOUND.
1963 */
1964 auto result = kvDelegatePtrS1_->GetCloudVersion("");
1965 EXPECT_EQ(result.first, NOT_FOUND);
1966 for (const auto &item : result.second) {
1967 EXPECT_EQ(item.second, "");
1968 }
1969 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
1970 virtualCloudDb_->ForkUpload(nullptr);
1971 }
1972
1973 /**
1974 * @tc.name: NormalSync044
1975 * @tc.desc: Test RemoveDeviceData with FLAG_ONLY option
1976 * @tc.type: FUNC
1977 * @tc.require:
1978 * @tc.author: zhangtao
1979 */
1980 HWTEST_F(DistributedDBCloudKvTest, NormalSync044, TestSize.Level0)
1981 {
1982 /**
1983 * @tc.steps: step1. store1 put (k1,v1) and (k2,v2)
1984 * @tc.expected: step1. both put ok
1985 */
1986 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anonc761b38c1d02(const std::string &origin) 1987 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
1988 LOGW("origin is %s", origin.c_str());
1989 return origin + "1";
1990 });
1991 Key key1 = {'k', '1'};
1992 Value expectValue1 = {'v', '1'};
1993 Key key2 = {'k', '2'};
1994 Value expectValue2 = {'v', '2'};
1995 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
1996 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
1997 /**
1998 * @tc.steps: step2. DEVICE_A with store1 sync and DEVICE_B with store2 sync
1999 * @tc.expected: step2. both sync ok, and store2 got (k1,v1) and (k2,v2)
2000 */
2001 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2002 LOGW("Store1 sync end");
2003 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2004 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2005 LOGW("Store2 sync end");
2006 Value actualValue;
2007 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
2008 EXPECT_EQ(actualValue, expectValue1);
2009 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue), OK);
2010 EXPECT_EQ(actualValue, expectValue2);
2011 /**
2012 * @tc.steps: step3. store2 RevoveDeviceData with FLAG_ONLY option
2013 * @tc.expected: step3. store2 delete DEVICE_A's version CloudVersion data successfully
2014 */
2015 auto result = kvDelegatePtrS2_->GetCloudVersion("");
2016 EXPECT_EQ(result.first, OK);
2017 for (auto item : result.second) {
2018 EXPECT_EQ(item.second, "1");
2019 }
2020 EXPECT_EQ(kvDelegatePtrS2_->RemoveDeviceData("DEVICES_A", ClearMode::FLAG_ONLY), OK);
2021 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
2022 result = kvDelegatePtrS2_->GetCloudVersion("");
2023 EXPECT_EQ(result.first, NOT_FOUND);
2024 for (auto item : result.second) {
2025 EXPECT_EQ(item.second, "");
2026 }
2027 }
2028
2029 /**
2030 * @tc.name: NormalSync045
2031 * @tc.desc: Test some record upload fail in 1 batch and extend size greater than record size
2032 * @tc.type: FUNC
2033 * @tc.require:
2034 * @tc.author: zhangtao
2035 */
2036 HWTEST_F(DistributedDBCloudKvTest, NormalSync045, TestSize.Level0)
2037 {
2038 /**
2039 * @tc.steps:step1. put 10 records.
2040 * @tc.expected: step1 ok.
2041 */
2042 vector<Entry> entries;
2043 int count = 10; // put 10 records.
2044 for (int i = 0; i < count; i++) {
2045 std::string keyStr = "k_" + std::to_string(i);
2046 std::string valueStr = "v_" + std::to_string(i);
2047 Key key(keyStr.begin(), keyStr.end());
2048 Value value(valueStr.begin(), valueStr.end());
2049 entries.push_back({key, value});
2050 }
2051 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
2052 /**
2053 * @tc.steps:step2. sync and add one empty extend as result
2054 * @tc.expected: step2 sync fail and upLoadInfo.failCount is 10. 1 batch failed.
2055 */
2056 std::atomic<int> missCount = -1;
2057 virtualCloudDb_->SetClearExtend(missCount);
2058 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
2059 for (const auto &table : lastProcess_.tableProcess) {
2060 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
2061 EXPECT_EQ(table.second.upLoadInfo.successCount, 0u);
2062 EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
2063 EXPECT_EQ(table.second.upLoadInfo.failCount, 10u);
2064 }
2065 virtualCloudDb_->ForkUpload(nullptr);
2066 }
2067
2068 /**
2069 * @tc.name: NormalSync046
2070 * @tc.desc: Test RemoveDeviceData with FLAG_ONLY option and empty deviceName
2071 * @tc.type: FUNC
2072 * @tc.require:
2073 * @tc.author: chenghuitao
2074 */
2075 HWTEST_F(DistributedDBCloudKvTest, NormalSync046, TestSize.Level0)
2076 {
2077 /**
2078 * @tc.steps: step1. store1 put (k1,v1) and (k2,v2)
2079 * @tc.expected: step1. both put ok
2080 */
2081 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
__anonc761b38c1e02(const std::string &origin) 2082 kvDelegatePtrS1_->SetGenCloudVersionCallback([](const std::string &origin) {
2083 LOGW("origin is %s", origin.c_str());
2084 return origin + "1";
2085 });
2086 Key key1 = {'k', '1'};
2087 Value expectValue1 = {'v', '1'};
2088 Key key2 = {'k', '2'};
2089 Value expectValue2 = {'v', '2'};
2090 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
2091 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
2092 /**
2093 * @tc.steps: step2. DEVICE_A with store1 sync and DEVICE_B with store2 sync
2094 * @tc.expected: step2. both sync ok, and store2 got (k1,v1) and (k2,v2)
2095 */
2096 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2097 LOGW("Store1 sync end");
2098 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2099 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2100 LOGW("Store2 sync end");
2101 Value actualValue;
2102 EXPECT_EQ(kvDelegatePtrS2_->Get(key1, actualValue), OK);
2103 EXPECT_EQ(actualValue, expectValue1);
2104 EXPECT_EQ(kvDelegatePtrS2_->Get(key2, actualValue), OK);
2105 EXPECT_EQ(actualValue, expectValue2);
2106 /**
2107 * @tc.steps: step3. store2 RevoveDeviceData with FLAG_ONLY option
2108 * @tc.expected: step3. store2 delete DEVICE_A's version CloudVersion data successfully
2109 */
2110 auto result = kvDelegatePtrS2_->GetCloudVersion("");
2111 EXPECT_EQ(result.first, OK);
2112 for (auto item : result.second) {
2113 EXPECT_EQ(item.second, "1");
2114 }
2115 EXPECT_EQ(kvDelegatePtrS2_->RemoveDeviceData("", ClearMode::FLAG_ONLY), OK);
2116 kvDelegatePtrS1_->SetGenCloudVersionCallback(nullptr);
2117 result = kvDelegatePtrS2_->GetCloudVersion("");
2118 EXPECT_EQ(result.first, NOT_FOUND);
2119 for (auto item : result.second) {
2120 EXPECT_EQ(item.second, "");
2121 }
2122 }
2123
2124 /**
2125 * @tc.name: NormalSync047
2126 * @tc.desc: Test multi users sync when user1 sync fail.
2127 * @tc.type: FUNC
2128 * @tc.require:
2129 * @tc.author: suyue
2130 */
2131 HWTEST_F(DistributedDBCloudKvTest, NormalSync047, TestSize.Level0)
2132 {
2133 /**
2134 * @tc.steps: step1. put 20 records.
2135 * @tc.expected: step1. ok.
2136 */
2137 vector<Entry> entries;
2138 int count = 20; // put 20 records.
2139 for (int i = 0; i < count; i++) {
2140 std::string keyStr = "k_" + std::to_string(i);
2141 std::string valueStr = "v_" + std::to_string(i);
2142 Key key(keyStr.begin(), keyStr.end());
2143 Value value(valueStr.begin(), valueStr.end());
2144 entries.push_back({key, value});
2145 }
2146 EXPECT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
2147
2148 /**
2149 * @tc.steps: step2. multi users sync and set user1 fail.
2150 * @tc.expected: step2. user1 sync fail and other user sync success.
2151 */
2152 int uploadFailId = 0;
2153 virtualCloudDb_->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c1f02(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2154 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2155 uploadFailId++;
2156 if (uploadFailId > 15) { // the first 15 records success
2157 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
2158 return CLOUD_ERROR;
2159 }
2160 return OK;
2161 });
2162 CloudSyncOption option;
2163 option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2164 option.users.push_back(USER_ID);
2165 option.users.push_back(USER_ID_2);
2166 option.devices.push_back("cloud");
2167 SyncAndGetProcessInfo(kvDelegatePtrS1_, option);
2168
2169 vector<DBStatus> userStatus = {CLOUD_ERROR, OK};
2170 vector<Info> userExpectInfo = {{1u, 20u, 15u, 5u, 15u, 0u, 0u}, {1u, 20u, 20u, 0u, 20u, 0u, 0u}};
2171 EXPECT_TRUE(CheckUserSyncInfo(option.users, userStatus, userExpectInfo));
2172 virtualCloudDb_->ForkUpload(nullptr);
2173 }
2174
2175 /**
2176 * @tc.name: NormalSync048
2177 * @tc.desc: test sync data while cloud delete on record and local do not have this record.
2178 * @tc.type: FUNC
2179 * @tc.require:
2180 * @tc.author: tankaisheng
2181 */
2182 HWTEST_F(DistributedDBCloudKvTest, NormalSync048, TestSize.Level0)
2183 {
2184 /**
2185 * @tc.steps: step1. deviceB put {k1, v1} {k2, v2} and sync to cloud
2186 * @tc.expected: step1. ok.
2187 */
2188 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2189 Key key1 = {'k', '1'};
2190 Value expectValue1 = {'v', '1'};
2191 Key key2 = {'k', '2'};
2192 Value expectValue2 = {'v', '2'};
2193 ASSERT_EQ(kvDelegatePtrS1_->Put(key1, expectValue1), OK);
2194 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, expectValue2), OK);
2195 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2196
2197 /**
2198 * @tc.steps: step2. deviceB delete {k1, v1} and sync to cloud
2199 * @tc.expected: step2. ok.
2200 */
2201 communicatorAggregator_->SetLocalDeviceId("DEVICES_B");
2202 ASSERT_EQ(kvDelegatePtrS1_->Delete(key1), OK);
2203 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2204
2205 /**
2206 * @tc.steps: step3. deviceA sync to cloud
2207 * @tc.expected: step3. ok.
2208 */
2209 communicatorAggregator_->SetLocalDeviceId("DEVICES_A");
2210 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2211 Value actualValue2;
2212 ASSERT_EQ(kvDelegatePtrS2_->Get(key2, actualValue2), OK);
2213 ASSERT_EQ(actualValue2, expectValue2);
2214 }
2215
2216 /**
2217 * @tc.name: NormalSync049
2218 * @tc.desc: test sync data with invalid modify_time.
2219 * @tc.type: FUNC
2220 * @tc.require:
2221 * @tc.author: liaoyonghuang
2222 */
2223 HWTEST_F(DistributedDBCloudKvTest, NormalSync049, TestSize.Level0)
2224 {
2225 /**
2226 * @tc.steps: step1. put {k, v}
2227 * @tc.expected: step1. ok.
2228 */
2229 vector<Entry> entries;
2230 int count = 31; // put 31 records.
2231 for (int i = 0; i < count; i++) {
2232 std::string keyStr = "k_" + std::to_string(i);
2233 std::string valueStr = "v_" + std::to_string(i);
2234 Key key(keyStr.begin(), keyStr.end());
2235 Value value(valueStr.begin(), valueStr.end());
2236 entries.push_back({key, value});
2237 }
2238 kvDelegatePtrS1_->PutBatch(entries);
2239 /**
2240 * @tc.steps: step2. modify time in local and sync
2241 * @tc.expected: step2. ok.
2242 */
2243 sqlite3 *db_;
2244 uint64_t openFlag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
2245 std::string fileUrl = g_testDir + "/" \
2246 "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
2247 ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db_, openFlag, nullptr) == SQLITE_OK);
2248 std::string sql = "UPDATE sync_data SET modify_time = modify_time + modify_time where rowid>0";
2249 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
2250 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
2251 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2252 /**
2253 * @tc.steps: step3. put {k, v2} in another device
2254 * @tc.expected: step3. ok.
2255 */
2256 Value newValue = {'v'};
2257 for (int i = 0; i < count; i++) {
2258 std::string keyStr = "k_" + std::to_string(i);
2259 Key key(keyStr.begin(), keyStr.end());
2260 entries.push_back({key, newValue});
2261 }
2262 kvDelegatePtrS2_->PutBatch(entries);
2263 /**
2264 * @tc.steps: step4. sync and check data
2265 * @tc.expected: step4. ok.
2266 */
2267 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
2268 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2269 Value actualValue;
2270 for (int i = 0; i < count; i++) {
2271 std::string keyStr = "k_" + std::to_string(i);
2272 Key key(keyStr.begin(), keyStr.end());
2273 EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
2274 EXPECT_EQ(actualValue, newValue);
2275 }
2276 }
2277
2278 /**
2279 * @tc.name: NormalSync050
2280 * @tc.desc: test upload with conflic error and chekck upload info
2281 * @tc.type: FUNC
2282 * @tc.require:
2283 * @tc.author: liaoyonghuang
2284 */
2285 HWTEST_F(DistributedDBCloudKvTest, NormalSync050, TestSize.Level0)
2286 {
2287 /**
2288 * @tc.steps: step1. Set the retry count to 1
2289 * @tc.expected: step1. ok.
2290 */
2291 CloudSyncConfig config;
2292 config.maxRetryConflictTimes = 1;
2293 kvDelegatePtrS1_->SetCloudSyncConfig(config);
2294 /**
2295 * @tc.steps: step2. Put {k1, v1} {k2, v2} locally
2296 * @tc.expected: step2. ok.
2297 */
2298 kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2299 kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2300 /**
2301 * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2302 * @tc.expected: step3. ok.
2303 */
2304 int recordIndex = 0;
2305 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c2002(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2306 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2307 recordIndex++;
2308 if (recordIndex == 2) { // set 2nd record return CLOUD_RECORD_EXIST_CONFLICT
2309 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2310 return CLOUD_VERSION_CONFLICT;
2311 }
2312 return OK;
2313 });
2314 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2315 /**
2316 * @tc.steps: step4. Check last process
2317 * @tc.expected: step4. ok.
2318 */
2319 for (const auto &table : lastProcess_.tableProcess) {
2320 EXPECT_EQ(table.second.upLoadInfo.total, 2u);
2321 EXPECT_EQ(table.second.upLoadInfo.successCount, 2u);
2322 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
2323 EXPECT_EQ(table.second.upLoadInfo.insertCount, 2u);
2324 }
2325 virtualCloudDb_->ForkInsertConflict(nullptr);
2326 }
2327
2328 /**
2329 * @tc.name: NormalSync051
2330 * @tc.desc: test upload with conflict error and exceeds retry limit
2331 * @tc.type: FUNC
2332 * @tc.require:
2333 * @tc.author: suyuchen
2334 */
2335 HWTEST_F(DistributedDBCloudKvTest, NormalSync051, TestSize.Level0)
2336 {
2337 /**
2338 * @tc.steps: step1. Set the retry count to 0
2339 * @tc.expected: step1. ok.
2340 */
2341 CloudSyncConfig config;
2342 config.maxRetryConflictTimes = 0;
2343 kvDelegatePtrS1_->SetCloudSyncConfig(config);
2344 /**
2345 * @tc.steps: step2. Put {k1, v1} {k2, v2} locally
2346 * @tc.expected: step2. ok.
2347 */
2348 kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2349 kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2350 /**
2351 * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2352 * @tc.expected: step3. CLOUD_VERSION_CONFLICT.
2353 */
2354 int recordIndex = 0;
2355 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c2102(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2356 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2357 recordIndex++;
2358 if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2359 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2360 return CLOUD_VERSION_CONFLICT;
2361 }
2362 return OK;
2363 });
2364 BlockSync(kvDelegatePtrS1_, CLOUD_VERSION_CONFLICT, g_CloudSyncoption);
2365 /**
2366 * @tc.steps: step4. Check last process
2367 * @tc.expected: step4. ok.
2368 */
2369 for (const auto &table : lastProcess_.tableProcess) {
2370 EXPECT_EQ(table.second.upLoadInfo.total, 2u);
2371 EXPECT_EQ(table.second.upLoadInfo.successCount, 1u);
2372 EXPECT_EQ(table.second.upLoadInfo.failCount, 1u);
2373 EXPECT_EQ(table.second.upLoadInfo.insertCount, 1u);
2374 }
2375 virtualCloudDb_->ForkInsertConflict(nullptr);
2376 }
2377
2378 /**
2379 * @tc.name: NormalSync052
2380 * @tc.desc: test upload with version conflict error under force push mode
2381 * @tc.type: FUNC
2382 * @tc.require:
2383 * @tc.author: suyuchen
2384 */
2385 HWTEST_F(DistributedDBCloudKvTest, NormalSync052, TestSize.Level0)
2386 {
2387 /**
2388 * @tc.steps: step1. Set the retry count to 2
2389 * @tc.expected: step1. ok.
2390 */
2391 CloudSyncConfig config;
2392 config.maxRetryConflictTimes = 2;
2393 kvDelegatePtrS1_->SetCloudSyncConfig(config);
2394 /**
2395 * @tc.steps: step2. Put {k1, v1} {k2, v2} locally, then sync to cloud
2396 * @tc.expected: step2. ok.
2397 */
2398 kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2399 kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2400 /**
2401 * @tc.steps: step3. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2402 * @tc.expected: step3. OK.
2403 */
2404 int recordIndex = 0;
2405 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c2202(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2406 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2407 recordIndex++;
2408 if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2409 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2410 return CLOUD_VERSION_CONFLICT;
2411 }
2412 return OK;
2413 });
2414 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2415 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2416 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
2417 /**
2418 * @tc.steps: step4. Check last process
2419 * @tc.expected: step4. ok.
2420 */
2421 for (const auto &table : lastProcess_.tableProcess) {
2422 EXPECT_EQ(table.second.upLoadInfo.total, 3u);
2423 EXPECT_EQ(table.second.upLoadInfo.successCount, 3u);
2424 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
2425 EXPECT_EQ(table.second.upLoadInfo.insertCount, 2u);
2426 }
2427 virtualCloudDb_->ForkInsertConflict(nullptr);
2428 }
2429
2430 /**
2431 * @tc.name: NormalSync053
2432 * @tc.desc: test upload with version conflict error under force push mode, which has both update and insert
2433 * @tc.type: FUNC
2434 * @tc.require:
2435 * @tc.author: suyuchen
2436 */
2437 HWTEST_F(DistributedDBCloudKvTest, NormalSync053, TestSize.Level0)
2438 {
2439 /**
2440 * @tc.steps: step1. Set the retry count to 0
2441 * @tc.expected: step1. ok.
2442 */
2443 CloudSyncConfig config;
2444 config.maxRetryConflictTimes = 0;
2445 kvDelegatePtrS1_->SetCloudSyncConfig(config);
2446 /**
2447 * @tc.steps: step2. Put {k1, v1} {k2, v2} locally, then sync to cloud
2448 * @tc.expected: step2. ok.
2449 */
2450 kvDelegatePtrS1_->Put(KEY_1, VALUE_1);
2451 kvDelegatePtrS1_->Put(KEY_2, VALUE_2);
2452 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
2453 /**
2454 * @tc.steps: step3. Put {k3, v3} locally, and update k1 k2
2455 * @tc.expected: step3. ok.
2456 */
2457 kvDelegatePtrS1_->Put(KEY_1, VALUE_2);
2458 kvDelegatePtrS1_->Put(KEY_2, VALUE_3);
2459 kvDelegatePtrS1_->Put(KEY_3, VALUE_3);
2460 /**
2461 * @tc.steps: step4. Set CLOUD_VERSION_CONFLICT when upload 2nd record, and do sync
2462 * @tc.expected: step4. CLOUD_VERSION_CONFLICT.
2463 */
2464 int recordIndex = 0;
2465 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anonc761b38c2302(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 2466 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
2467 recordIndex++;
2468 if (recordIndex == 2) { // set 2nd record return CLOUD_VERSION_CONFLICT
2469 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
2470 return CLOUD_VERSION_CONFLICT;
2471 }
2472 return OK;
2473 });
2474 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
2475 BlockSync(kvDelegatePtrS1_, CLOUD_VERSION_CONFLICT, g_CloudSyncoption);
2476 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
2477 /**
2478 * @tc.steps: step5. Check last process
2479 * @tc.expected: step5. ok.
2480 */
2481 for (const auto &table : lastProcess_.tableProcess) {
2482 EXPECT_EQ(table.second.upLoadInfo.total, 3u);
2483 EXPECT_EQ(table.second.upLoadInfo.successCount, 1u);
2484 EXPECT_EQ(table.second.upLoadInfo.failCount, 2u);
2485 EXPECT_EQ(table.second.upLoadInfo.updateCount, 1u);
2486 EXPECT_EQ(table.second.upLoadInfo.insertCount, 0u);
2487 }
2488 virtualCloudDb_->ForkInsertConflict(nullptr);
2489 }
2490 }
2491