1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "db_base64_utils.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_virtual_device.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kvdb_manager.h"
26 #include "platform_specific.h"
27 #include "process_system_api_adapter_impl.h"
28 #include "sqlite_cloud_kv_executor_utils.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "virtual_cloud_db.h"
31
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36
37 namespace {
38 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
39 string g_testDir;
40 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
41 CloudSyncOption g_CloudSyncoption;
42 const std::string USER_ID_2 = "user2";
43 const std::string USER_ID_3 = "user3";
44 const int64_t WAIT_TIME = 5;
45 DistributedDBToolsUnitTest g_tool;
46 class DistributedDBCloudKvSyncerTest : public testing::Test {
47 public:
48 static void SetUpTestCase();
49 static void TearDownTestCase();
50 void SetUp();
51 void TearDown();
52 protected:
53 DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
54 bool invalidSchema = false);
55 void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
56 void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
57 DBStatus expectSyncResult = OK);
58 static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
59 void PutKvBatchDataAndSyncCloud(CloudSyncOption &syncOption);
60 void GetSingleStore();
61 void ReleaseSingleStore();
62 void BlockCompensatedSync(int &actSyncCnt, int expSyncCnt);
63 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
64 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
65 std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
66 KvStoreConfig config_;
67 KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
68 KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
69 SyncProcess lastProcess_;
70 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
71 KvVirtualDevice *deviceB_ = nullptr;
72 SQLiteSingleVerNaturalStore *singleStore_ = nullptr;
73 std::mutex comSyncMutex;
74 std::condition_variable comSyncCv;
75 };
76
SetUpTestCase()77 void DistributedDBCloudKvSyncerTest::SetUpTestCase()
78 {
79 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
80 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
81 LOGE("rm test db files error!");
82 }
83 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
84 g_CloudSyncoption.users.push_back(USER_ID);
85 g_CloudSyncoption.devices.push_back("cloud");
86
87 string dir = g_testDir + "/single_ver";
88 DIR* dirTmp = opendir(dir.c_str());
89 if (dirTmp == nullptr) {
90 OS::MakeDBDirectory(dir);
91 } else {
92 closedir(dirTmp);
93 }
94 }
95
TearDownTestCase()96 void DistributedDBCloudKvSyncerTest::TearDownTestCase()
97 {
98 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
99 LOGE("rm test db files error!");
100 }
101 }
102
SetUp()103 void DistributedDBCloudKvSyncerTest::SetUp()
104 {
105 DistributedDBToolsUnitTest::PrintTestCaseInfo();
106 config_.dataDir = g_testDir;
107 /**
108 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
109 */
110 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
111 virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
112 g_mgr.SetKvStoreConfig(config_);
113 KvStoreNbDelegate::Option option1;
114 ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
115 // set aggregator after get store1, only store2 can sync with p2p
116 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
117 ASSERT_TRUE(communicatorAggregator_ != nullptr);
118 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
119 KvStoreNbDelegate::Option option2;
120 ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
121
122 deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
123 ASSERT_TRUE(deviceB_ != nullptr);
124 auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
125 ASSERT_TRUE(syncInterfaceB != nullptr);
126 ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
127 GetSingleStore();
128 }
129
TearDown()130 void DistributedDBCloudKvSyncerTest::TearDown()
131 {
132 ReleaseSingleStore();
133 CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
134 CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
135 virtualCloudDb_ = nullptr;
136 virtualCloudDb2_ = nullptr;
137 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138 LOGE("rm test db files error!");
139 }
140
141 if (deviceB_ != nullptr) {
142 delete deviceB_;
143 deviceB_ = nullptr;
144 }
145
146 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
147 communicatorAggregator_ = nullptr;
148 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
149 }
150
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)151 void DistributedDBCloudKvSyncerTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus,
152 CloudSyncOption option, DBStatus expectSyncResult)
153 {
154 if (delegate == nullptr) {
155 return;
156 }
157 std::mutex dataMutex;
158 std::condition_variable cv;
159 bool finish = false;
160 SyncProcess last;
161 auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
162 SyncProcess> &process) {
163 size_t notifyCnt = 0;
164 for (const auto &item: process) {
165 LOGD("user = %s, status = %d, errCode = %d", item.first.c_str(), item.second.process, item.second.errCode);
166 if (item.second.process != DistributedDB::FINISHED) {
167 continue;
168 }
169 EXPECT_EQ(item.second.errCode, expectDBStatus);
170 {
171 std::lock_guard<std::mutex> autoLock(dataMutex);
172 notifyCnt++;
173 std::set<std::string> userSet(option.users.begin(), option.users.end());
174 if (notifyCnt == userSet.size()) {
175 finish = true;
176 last = item.second;
177 cv.notify_one();
178 }
179 }
180 }
181 };
182 auto actualRet = delegate->Sync(option, callback);
183 EXPECT_EQ(actualRet, expectSyncResult);
184 if (actualRet == OK) {
185 std::unique_lock<std::mutex> uniqueLock(dataMutex);
186 cv.wait(uniqueLock, [&finish]() {
187 return finish;
188 });
189 }
190 lastProcess_ = last;
191 }
192
GetDataBaseSchema(bool invalidSchema)193 DataBaseSchema DistributedDBCloudKvSyncerTest::GetDataBaseSchema(bool invalidSchema)
194 {
195 DataBaseSchema schema;
196 TableSchema tableSchema;
197 tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
198 Field field;
199 field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
200 field.type = TYPE_INDEX<std::string>;
201 field.primary = true;
202 tableSchema.fields.push_back(field);
203 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
204 field.primary = false;
205 tableSchema.fields.push_back(field);
206 field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
207 tableSchema.fields.push_back(field);
208 field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
209 tableSchema.fields.push_back(field);
210 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
211 field.type = TYPE_INDEX<int64_t>;
212 tableSchema.fields.push_back(field);
213 schema.tables.push_back(tableSchema);
214 return schema;
215 }
216
GetSingleStore()217 void DistributedDBCloudKvSyncerTest::GetSingleStore()
218 {
219 KvDBProperties prop;
220 prop.SetStringProp(KvDBProperties::USER_ID, USER_ID);
221 prop.SetStringProp(KvDBProperties::APP_ID, APP_ID);
222 prop.SetStringProp(KvDBProperties::STORE_ID, STORE_ID_1);
223
224 std::string hashIdentifier = DBCommon::TransferHashString(
225 DBCommon::GenerateIdentifierId(STORE_ID_1, APP_ID, USER_ID, "", 0));
226 prop.SetStringProp(DBProperties::IDENTIFIER_DATA, hashIdentifier);
227 prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::SINGLE_VER_TYPE_SQLITE);
228 int errCode = E_OK;
229 singleStore_ = static_cast<SQLiteSingleVerNaturalStore *>(KvDBManager::OpenDatabase(prop, errCode));
230 ASSERT_NE(singleStore_, nullptr);
231 }
232
ReleaseSingleStore()233 void DistributedDBCloudKvSyncerTest::ReleaseSingleStore()
234 {
235 RefObject::DecObjRef(singleStore_);
236 singleStore_ = nullptr;
237 }
238
BlockCompensatedSync(int & actSyncCnt,int expSyncCnt)239 void DistributedDBCloudKvSyncerTest::BlockCompensatedSync(int &actSyncCnt, int expSyncCnt)
240 {
241 {
242 std::unique_lock<std::mutex> lock(comSyncMutex);
243 bool result = comSyncCv.wait_for(lock, std::chrono::seconds(WAIT_TIME),
244 [&actSyncCnt, expSyncCnt]() { return actSyncCnt == expSyncCnt; });
245 ASSERT_EQ(result, true);
246 }
247 }
248
249
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)250 DBStatus DistributedDBCloudKvSyncerTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
251 KvStoreNbDelegate::Option option, bool invalidSchema)
252 {
253 DBStatus openRet = OK;
254 g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
255 openRet = status;
256 delegate = openDelegate;
257 });
258 EXPECT_EQ(openRet, OK);
259 EXPECT_NE(delegate, nullptr);
260
261 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
262 cloudDbs[USER_ID] = virtualCloudDb_;
263 cloudDbs[USER_ID_2] = virtualCloudDb2_;
264 delegate->SetCloudDB(cloudDbs);
265 std::map<std::string, DataBaseSchema> schemas;
266 schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
267 schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
268 return delegate->SetCloudDbSchema(schemas);
269 }
270
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)271 void DistributedDBCloudKvSyncerTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
272 {
273 if (delegate != nullptr) {
274 ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
275 delegate = nullptr;
276 DBStatus status = g_mgr.DeleteKvStore(storeId);
277 LOGD("delete kv store status %d store %s", status, storeId.c_str());
278 ASSERT_EQ(status, OK);
279 }
280 }
281
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)282 void DistributedDBCloudKvSyncerTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
283 {
284 sqlite3 *db_;
285 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
286 std::string fileUrl = g_testDir + "/" \
287 "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
288 EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db_, flag, nullptr), SQLITE_OK);
289
290 std::string sql = "SELECT count(*) FROM naturalbase_kv_aux_sync_data_log WHERE ";
291 switch (opType) {
292 case OpType::INSERT:
293 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
294 " cloud_gid != '' AND version !='' AND cloud_flag=cloud_flag|0x10";
295 break;
296 case OpType::UPDATE:
297 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
298 " cloud_gid == '' AND version =='' AND cloud_flag=cloud_flag|0x10";
299 break;
300 case OpType::DELETE:
301 sql += " cloud_gid == '' AND version ==''";
302 break;
303 default:
304 break;
305 }
306 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
307 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
308 sqlite3_close_v2(db_);
309 }
310
311 /**
312 * @tc.name: UploadAbnormalSync001
313 * @tc.desc: Test upload update record, cloud returned record not found.
314 * @tc.type: FUNC
315 * @tc.require:
316 * @tc.author: bty
317 */
318 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync001, TestSize.Level0)
319 {
320 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
321 ASSERT_NE(cloudHook, nullptr);
322
323 /**
324 * @tc.steps:step1. Device A inserts data and synchronizes
325 * @tc.expected: step1 OK.
326 */
327 Key key = {'k'};
328 Value value = {'v'};
329 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
330 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
331
332 /**
333 * @tc.steps:step2. Device A update data and synchronizes, cloud returned record not found
334 * @tc.expected: step2 OK.
335 */
336 Value value2 = {'x'};
337 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
338 int upIdx = 0;
__anon4869f29f0602(const std::string &tableName, VBucket &extend) 339 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
340 LOGD("cloud db upload index:%d", ++upIdx);
341 if (upIdx == 1) { // 1 is index
342 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
343 }
344 });
345 int syncCnt = 0;
__anon4869f29f0702null346 cloudHook->SetSyncFinishHook([&syncCnt, this] {
347 LOGD("sync finish times:%d", ++syncCnt);
348 if (syncCnt == 1) { // 1 is the first sync
349 CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
350 } else {
351 CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
352 }
353 comSyncCv.notify_all();
354 });
355 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
356 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
357 virtualCloudDb_->ForkUpload(nullptr);
358 cloudHook->SetSyncFinishHook(nullptr);
359 }
360
361 /**
362 * @tc.name: UploadAbnormalSync002
363 * @tc.desc: Test upload insert record, cloud returned record already existed.
364 * @tc.type: FUNC
365 * @tc.require:
366 * @tc.author: bty
367 */
368 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync002, TestSize.Level0)
369 {
370 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
371 ASSERT_NE(cloudHook, nullptr);
372
373 /**
374 * @tc.steps:step1. Device A inserts k-v and synchronizes
375 * @tc.expected: step1 OK.
376 */
377 Key key = {'k'};
378 Value value = {'v'};
379 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
380 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
381
382 /**
383 * @tc.steps:step2. Device A insert k2-v2 and synchronizes, Device B insert k2-v2 and sync before A upload
384 * @tc.expected: step2 OK.
385 */
386 Key key2 = {'x'};
387 Value value2 = {'y'};
388 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, value2), OK);
389 int upIdx = 0;
__anon4869f29f0802(const std::string &tableName, VBucket &extend) 390 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
391 LOGD("cloud db upload index:%d", ++upIdx);
392 if (upIdx == 2) { // 2 is index
393 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
394 }
395 });
396 int doUpIdx = 0;
__anon4869f29f0902null397 cloudHook->SetDoUploadHook([&doUpIdx, key2, value2, this] {
398 LOGD("begin upload index:%d", ++doUpIdx);
399 ASSERT_EQ(kvDelegatePtrS2_->Put(key2, value2), OK);
400 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
401 });
402 int syncCnt = 0;
__anon4869f29f0a02null403 cloudHook->SetSyncFinishHook([&syncCnt, this] {
404 LOGD("sync finish times:%d", ++syncCnt);
405 if (syncCnt == 1) { // 1 is the normal sync
406 CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
407 } else {
408 CheckUploadAbnormal(OpType::INSERT, 2L, true); // 2 is expected count
409 }
410 comSyncCv.notify_all();
411 });
412 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
413 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
414 virtualCloudDb_->ForkUpload(nullptr);
415 cloudHook->SetSyncFinishHook(nullptr);
416 cloudHook->SetDoUploadHook(nullptr);
417 }
418
419 /**
420 * @tc.name: UploadAbnormalSync003
421 * @tc.desc: Test upload delete record, cloud returned record not found.
422 * @tc.type: FUNC
423 * @tc.require:
424 * @tc.author: bty
425 */
426 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync003, TestSize.Level0)
427 {
428 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
429 ASSERT_NE(cloudHook, nullptr);
430
431 /**
432 * @tc.steps:step1. Device A inserts data and synchronizes
433 * @tc.expected: step1 OK.
434 */
435 Key key = {'k'};
436 Value value = {'v'};
437 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
438 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
439 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
440
441 /**
442 * @tc.steps:step2. Device A delete data and synchronizes, Device B delete data and sync before A upload
443 * @tc.expected: step2 OK.
444 */
445 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
446 int upIdx = 0;
__anon4869f29f0b02(const std::string &tableName, VBucket &extend) 447 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
448 LOGD("cloud db upload index:%d", ++upIdx);
449 if (upIdx == 2) { // 2 is index
450 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
451 }
452 });
453 int doUpIdx = 0;
__anon4869f29f0c02null454 cloudHook->SetDoUploadHook([&doUpIdx, key, this] {
455 LOGD("begin upload index:%d", ++doUpIdx);
456 ASSERT_EQ(kvDelegatePtrS2_->Delete(key), OK);
457 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
458 });
459 int syncCnt = 0;
__anon4869f29f0d02null460 cloudHook->SetSyncFinishHook([&syncCnt, this] {
461 LOGD("sync finish times:%d", ++syncCnt);
462 if (syncCnt == 1) { // 1 is the normal sync
463 CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
464 } else {
465 CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
466 }
467 comSyncCv.notify_all();
468 });
469 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
470 BlockCompensatedSync(syncCnt, 1); // 1 is sync times
471 virtualCloudDb_->ForkUpload(nullptr);
472 cloudHook->SetSyncFinishHook(nullptr);
473 cloudHook->SetDoUploadHook(nullptr);
474 }
475
476 /**
477 * @tc.name: UploadAbnormalSync004
478 * @tc.desc: Test sync errCode is not in [27328512, 27394048).
479 * @tc.type: FUNC
480 * @tc.require:
481 * @tc.author: chenghuitao
482 */
483 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync004, TestSize.Level0)
484 {
485 /**
486 * @tc.steps:step1. Device A inserts data and synchronizes
487 * @tc.expected: step1 errCode outside DBStatus should be kept.
488 */
489 int errCode = 27394048; // an error not in [27328512, 27394048)
490 virtualCloudDb_->SetActionStatus(static_cast<DBStatus>(errCode));
491 Key key = {'k'};
492 Value value = {'v'};
493 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
494 BlockSync(kvDelegatePtrS1_, static_cast<DBStatus>(errCode), g_CloudSyncoption);
495 virtualCloudDb_->SetActionStatus(OK);
496 }
497
498 /**
499 * @tc.name: UploadAbnormalSync005
500 * @tc.desc: Test sync return CLOUD_DISABLED
501 * @tc.type: FUNC
502 * @tc.require:
503 * @tc.author: liaoyonghuang
504 */
505 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync005, TestSize.Level0)
506 {
507 /**
508 * @tc.steps:step1. Put 1 record to cloud
509 * @tc.expected: step1 OK
510 */
511 EXPECT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
512 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
513 /**
514 * @tc.steps:step2. Set error code CLOUD_DISABLED when query cloud data, and sync
515 * @tc.expected: step2 return CLOUD_DISABLED
516 */
517 virtualCloudDb_->SetActionStatus(CLOUD_DISABLED);
518 BlockSync(kvDelegatePtrS2_, CLOUD_DISABLED, g_CloudSyncoption);
519 }
520
521 /**
522 * @tc.name: QueryParsingProcessTest001
523 * @tc.desc: Test Query parsing process.
524 * @tc.type: FUNC
525 * @tc.require:
526 * @tc.author: luoguo
527 */
528 HWTEST_F(DistributedDBCloudKvSyncerTest, QueryParsingProcessTest001, TestSize.Level0)
529 {
530 auto cloudHook = (ICloudSyncStorageHook *)singleStore_->GetCloudKvStore();
531 ASSERT_NE(cloudHook, nullptr);
532
533 /**
534 * @tc.steps:step1. Device A inserts data and synchronizes
535 * @tc.expected: step1 OK.
536 */
537 Key key = {'k'};
538 Value value = {'v'};
539 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
540
541 /**
542 * @tc.steps:step2. Test Query parsing Process
543 * @tc.expected: step2 OK.
544 */
545 std::vector<QuerySyncObject> syncObject;
546 std::vector<VBucket> syncDataPk;
547 VBucket bucket;
548 bucket.insert_or_assign(std::string("k"), std::string("k"));
549 syncDataPk.push_back(bucket);
550 std::string tableName = "sync_data";
551 ASSERT_EQ(CloudStorageUtils::GetSyncQueryByPk(tableName, syncDataPk, true, syncObject), E_OK);
552
553 Bytes bytes;
554 bytes.resize(syncObject[0].CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
555 Parcel parcel(bytes.data(), bytes.size());
556 ASSERT_EQ(syncObject[0].SerializeData(parcel, SOFTWARE_VERSION_CURRENT), E_OK);
557
558 /**
559 * @tc.steps:step3. Check Node's type is QueryNodeType::IN.
560 * @tc.expected: step3 OK.
561 */
562 std::vector<QueryNode> queryNodes;
563 syncObject[0].ParserQueryNodes(bytes, queryNodes);
564 ASSERT_EQ(queryNodes[0].type, QueryNodeType::IN);
565 }
566
567 /**
568 * @tc.name: UploadFinished001
569 * @tc.desc: Test upload update record when do update.
570 * @tc.type: FUNC
571 * @tc.require:
572 * @tc.author: zhangqiquan
573 */
574 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadFinished001, TestSize.Level0)
575 {
576 Key key = {'k'};
577 Value value = {'v'};
578 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
579 Value newValue = {'v', '1'};
580 // update [k,v] to [k,v1] when upload
__anon4869f29f0e02(const std::string &, VBucket &) 581 virtualCloudDb_->ForkUpload([kvDelegatePtrS1 = kvDelegatePtrS1_, key, newValue](const std::string &, VBucket &) {
582 EXPECT_EQ(kvDelegatePtrS1->Put(key, newValue), OK);
583 });
584 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
585 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
586 Value actualValue;
587 // cloud download [k,v]
588 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
589 EXPECT_EQ(actualValue, value);
590 // sync again and get [k,v1]
591 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
592 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
593 EXPECT_EQ(kvDelegatePtrS1_->Get(key, actualValue), OK);
594 EXPECT_EQ(actualValue, newValue);
595 virtualCloudDb_->ForkUpload(nullptr);
596 }
597
598 /**
599 * @tc.name: SyncWithMultipleUsers001.
600 * @tc.desc: Test sync data with multiple users.
601 * @tc.type: FUNC
602 * @tc.require:
603 * @tc.author: liufuchenxing
604 */
605 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers001, TestSize.Level0)
606 {
607 Key key = {'k'};
608 Value value = {'v'};
609 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
610 CloudSyncOption syncOption;
611 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
612 syncOption.users.push_back(USER_ID);
613 syncOption.users.push_back(USER_ID_2);
614 syncOption.devices.push_back("cloud");
615 BlockSync(kvDelegatePtrS1_, OK, syncOption);
616 BlockSync(kvDelegatePtrS2_, OK, syncOption);
617 Value actualValue;
618 // cloud download [k,v]
619 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
620 EXPECT_EQ(actualValue, value);
621 }
622
PutKvBatchDataAndSyncCloud(CloudSyncOption & syncOption)623 void DistributedDBCloudKvSyncerTest::PutKvBatchDataAndSyncCloud(CloudSyncOption &syncOption)
624 {
625 std::vector<Entry> entries;
626 for (int i = 0; i < 200; i++) { // 200 is number of data
627 std::string keyStr = "k_" + std::to_string(i);
628 std::string valueStr = "v_" + std::to_string(i);
629 Key key(keyStr.begin(), keyStr.end());
630 Value value(valueStr.begin(), valueStr.end());
631 Entry entry;
632 entry.key = key;
633 entry.value = value;
634 entries.push_back(entry);
635 }
636
637 ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
638 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
639 syncOption.users.push_back(USER_ID);
640 syncOption.users.push_back(USER_ID_2);
641 syncOption.devices.push_back("cloud");
642 BlockSync(kvDelegatePtrS1_, OK, syncOption);
643 }
644
645 /**
646 * @tc.name: SyncWithMultipleUsers002.
647 * @tc.desc: test whether upload to the cloud after delete local data that does not have a gid.
648 * @tc.type: FUNC
649 * @tc.require:
650 * @tc.author: luoguo
651 */
652 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers002, TestSize.Level0)
653 {
654 /**
655 * @tc.steps: step1. kvDelegatePtrS1_ put 200 data and sync to cloud.
656 * @tc.expected: step1. return ok.
657 */
658 CloudSyncOption syncOption;
659 PutKvBatchDataAndSyncCloud(syncOption);
660
661 /**
662 * @tc.steps: step2. kvDelegatePtrS2_ only sync user0 from cloud.
663 * @tc.expected: step2. return ok.
664 */
665 syncOption.users.clear();
666 syncOption.users.push_back(USER_ID);
667 BlockSync(kvDelegatePtrS2_, OK, syncOption);
668
669 /**
670 * @tc.steps: step3. kvDelegatePtrS2_ delete 100 data.
671 * @tc.expected: step3. return ok.
672 */
673 std::vector<Key> keys;
674 for (int i = 0; i < 100; i++) {
675 std::string keyStr = "k_" + std::to_string(i);
676 Key key(keyStr.begin(), keyStr.end());
677 keys.push_back(key);
678 }
679 ASSERT_EQ(kvDelegatePtrS2_->DeleteBatch(keys), OK);
680
681 /**
682 * @tc.steps: step4. kvDelegatePtrS2_ sync to cloud with user0 user2.
683 * @tc.expected: step4. return ok.
684 */
685 syncOption.users.clear();
686 syncOption.users.push_back(USER_ID);
687 syncOption.users.push_back(USER_ID_2);
688
689 std::mutex dataMutex;
690 std::condition_variable cv;
691 bool finish = false;
692 uint32_t insertCount = 0;
693 auto callback = [&dataMutex, &syncOption, &finish, &insertCount, &cv](const std::map<std::string,
__anon4869f29f0f02(const std::map<std::string, SyncProcess> &process) 694 SyncProcess> &process) {
695 size_t notifyCnt = 0;
696 for (const auto &item : process) {
697 LOGD("user = %s, status = %d, errCode=%d", item.first.c_str(), item.second.process, item.second.errCode);
698 if (item.second.process != DistributedDB::FINISHED) {
699 continue;
700 }
701 EXPECT_EQ(item.second.errCode, OK);
702 {
703 std::lock_guard<std::mutex> autoLock(dataMutex);
704 notifyCnt++;
705 std::set<std::string> userSet(syncOption.users.begin(), syncOption.users.end());
706 if (notifyCnt == userSet.size()) {
707 finish = true;
708 std::map<std::string, TableProcessInfo> tableProcess(item.second.tableProcess);
709 insertCount = tableProcess["sync_data"].downLoadInfo.insertCount;
710 cv.notify_one();
711 }
712 }
713 }
714 };
715 auto actualRet = kvDelegatePtrS2_->Sync(syncOption, callback);
716 EXPECT_EQ(actualRet, OK);
717 if (actualRet == OK) {
718 std::unique_lock<std::mutex> uniqueLock(dataMutex);
__anon4869f29f1002() 719 cv.wait(uniqueLock, [&finish]() { return finish; });
720 }
721 /**
722 * @tc.steps: step5. check process info, user2's insertCount should be 0.
723 * @tc.expected: step5. return ok.
724 */
725 EXPECT_EQ(insertCount, 0u);
726 }
727
728 /**
729 * @tc.name: SyncWithMultipleUsers003.
730 * @tc.desc: Test sync data with multiple users same key.
731 * @tc.type: FUNC
732 * @tc.require:
733 * @tc.author: wangxiangdong
734 */
735 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers003, TestSize.Level0)
736 {
737 /**
738 * @tc.steps: step1. put k v by user1.
739 * @tc.expected: step1. return ok.
740 */
741 Key key = {'k', '1'};
742 Value value = {'v', '1'};
743 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
744 CloudSyncOption syncOption;
745 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
746 syncOption.users.push_back(USER_ID);
747 syncOption.devices.push_back("cloud");
748 BlockSync(kvDelegatePtrS1_, OK, syncOption);
749 /**
750 * @tc.steps: step2. put k v2 by user1.
751 * @tc.expected: step2. return ok.
752 */
753 Value value2 = {'v', '2'};
754 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
755 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
756 syncOption.users.clear();
757 syncOption.users.push_back(USER_ID_2);
758 BlockSync(kvDelegatePtrS1_, OK, syncOption);
759 /**
760 * @tc.steps: step3. sync by user1.
761 * @tc.expected: step3. return ok.
762 */
763 syncOption.users.clear();
764 syncOption.users.push_back(USER_ID_2);
765 BlockSync(kvDelegatePtrS2_, OK, syncOption);
766 Value actualValue1;
767 /**
768 * @tc.steps: step4. sync by user2.
769 * @tc.expected: step4. return ok.
770 */
771 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue1), OK);
772 syncOption.users.clear();
773 syncOption.users.push_back(USER_ID);
774 BlockSync(kvDelegatePtrS2_, OK, syncOption);
775 Value actualValue2;
776 /**
777 * @tc.steps: step5. get k1.
778 * @tc.expected: step5. get v2.
779 */
780 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue2), OK);
781 EXPECT_EQ(actualValue2, value2);
782 }
783
784 /**
785 * @tc.name: SyncWithMultipleUsers004.
786 * @tc.desc: Test sync data with same users.
787 * @tc.type: FUNC
788 * @tc.require:
789 * @tc.author: wangxiangdong
790 */
791 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers004, TestSize.Level0)
792 {
793 /**
794 * @tc.steps: step1. put k v by user1 and sync to cloud.
795 * @tc.expected: step1. return ok.
796 */
797 Key key = {'k', '1'};
798 Value value = {'v', '1'};
799 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
800 CloudSyncOption syncOption;
801 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
802 syncOption.users.push_back(USER_ID);
803 syncOption.devices.push_back("cloud");
804 BlockSync(kvDelegatePtrS1_, OK, syncOption);
805 /**
806 * @tc.steps: step2. data p2p to device B.
807 * @tc.expected: step2. return ok.
808 */
809 std::vector<std::string> devices;
810 devices.push_back(deviceB_->GetDeviceId());
811 Query query = Query::Select().PrefixKey(key);
812 std::map<std::string, DBStatus> result;
813 DBStatus status = g_tool.SyncTest(kvDelegatePtrS2_, devices, SYNC_MODE_PUSH_ONLY, result, query);
814 ASSERT_TRUE(status == OK);
815 /**
816 * @tc.steps: step3. sync by user1.
817 * @tc.expected: step3. return ok.
818 */
819 syncOption.users.clear();
820 syncOption.users.push_back(USER_ID);
821 BlockSync(kvDelegatePtrS2_, OK, syncOption);
822 /**
823 * @tc.steps: step4. get k1.
824 * @tc.expected: step4. get v2.
825 */
826 Value actualValue;
827 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
828 EXPECT_EQ(actualValue, value);
829 }
830
831 /**
832 * @tc.name: AbnormalCloudKvExecutorTest001
833 * @tc.desc: Check SqliteCloudKvExecutorUtils interfaces abnormal scene.
834 * @tc.type: FUNC
835 * @tc.require:
836 * @tc.author: suyue
837 */
838 HWTEST_F(DistributedDBCloudKvSyncerTest, AbnormalCloudKvExecutorTest001, TestSize.Level0)
839 {
840 /**
841 * @tc.steps: step1. Call PutCloudData interface with different opType when para is invalid.
842 * @tc.expected: step1. return errCode.
843 */
844 SqliteCloudKvExecutorUtils cloudKvObj;
845 DownloadData downloadData;
846 downloadData.data = {{}};
847 int ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
848 EXPECT_EQ(ret, -E_CLOUD_ERROR);
849 downloadData.opType = {OpType::UPDATE_VERSION};
850 ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
851 EXPECT_EQ(ret, -E_CLOUD_ERROR);
852
853 downloadData.opType = {OpType::UPDATE_TIMESTAMP};
854 ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
855 EXPECT_EQ(ret, -E_INVALID_DB);
856 downloadData.opType = {OpType::DELETE};
857 ret = cloudKvObj.PutCloudData(nullptr, true, downloadData);
858 EXPECT_EQ(ret, -E_INVALID_DB);
859
860 /**
861 * @tc.steps: step2. Call CountAllCloudData interface when para is invalid.
862 * @tc.expected: step2. return -E_INVALID_ARGS.
863 */
864 QuerySyncObject querySyncObject;
865 std::pair<int, int64_t> res = cloudKvObj.CountAllCloudData({nullptr, true}, {}, "", true, querySyncObject);
866 EXPECT_EQ(res.first, -E_INVALID_ARGS);
867
868 /**
869 * @tc.steps: step3. Call SqliteCloudKvExecutorUtils interfaces when db is nullptr.
870 * @tc.expected: step3. return -E_INVALID_DB.
871 */
872 res = cloudKvObj.CountCloudData(nullptr, true, 0, "", true);
873 EXPECT_EQ(res.first, -E_INVALID_DB);
874 std::pair<int, CloudSyncData> ver = cloudKvObj.GetLocalCloudVersion(nullptr, true, "");
875 EXPECT_EQ(ver.first, -E_INVALID_DB);
876 std::vector<VBucket> dataVector;
877 ret = cloudKvObj.GetCloudVersionFromCloud(nullptr, true, "", dataVector);
878 EXPECT_EQ(ret, -E_INVALID_DB);
879 }
880
881 /**
882 * @tc.name: AbnormalCloudKvExecutorTest002
883 * @tc.desc: Check FillCloudLog interface
884 * @tc.type: FUNC
885 * @tc.require:
886 * @tc.author: suyue
887 */
888 HWTEST_F(DistributedDBCloudKvSyncerTest, AbnormalCloudKvExecutorTest002, TestSize.Level0)
889 {
890 /**
891 * @tc.steps: step1. Call FillCloudLog interface when db and para is nullptr.
892 * @tc.expected: step1. return -E_INVALID_ARGS.
893 */
894 SqliteCloudKvExecutorUtils cloudKvObj;
895 sqlite3 *db = nullptr;
896 CloudSyncData data;
897 CloudUploadRecorder recorder;
898 int ret = cloudKvObj.FillCloudLog({db, true}, OpType::INSERT, data, "", recorder);
899 EXPECT_EQ(ret, -E_INVALID_ARGS);
900
901 /**
902 * @tc.steps: step2. open db and Call FillCloudLog.
903 * @tc.expected: step2. return E_OK.
904 */
905 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
906 std::string fileUrl = g_testDir + "/test.db";
907 EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr), SQLITE_OK);
908 ASSERT_NE(db, nullptr);
909
910 data.isCloudVersionRecord = true;
911 ret = cloudKvObj.FillCloudLog({db, true}, OpType::INSERT, data, "", recorder);
912 EXPECT_EQ(ret, E_OK);
913 ret = cloudKvObj.FillCloudLog({db, true}, OpType::DELETE, data, "", recorder);
914 EXPECT_EQ(ret, E_OK);
915 EXPECT_EQ(sqlite3_close_v2(db), E_OK);
916 }
917
918 /**
919 * @tc.name: DeviceCollaborationTest001
920 * @tc.desc: Check force override data
921 * @tc.type: FUNC
922 * @tc.require:
923 * @tc.author: zqq
924 */
925 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest001, TestSize.Level0)
926 {
927 /**
928 * @tc.steps: step1. open db with DEVICE_COLLABORATION.
929 * @tc.expected: step1. return E_OK.
930 */
931 KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
932 KvStoreNbDelegate::Option option;
933 option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
934 ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
935 ASSERT_NE(kvDelegatePtrS3, nullptr);
936 KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
937 ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
938 ASSERT_NE(kvDelegatePtrS4, nullptr);
939 /**
940 * @tc.steps: step2. db3 put (k1,v1) sync to db4.
941 * @tc.expected: step2. db4 get (k1,v1).
942 */
943 Key key = {'k'};
944 Value value = {'v'};
945 EXPECT_EQ(kvDelegatePtrS3->Put(key, value), OK);
946 communicatorAggregator_->SetLocalDeviceId("DB3");
947 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
948 communicatorAggregator_->SetLocalDeviceId("DB4");
949 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
950 Value actualValue;
951 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
952 EXPECT_EQ(actualValue, value);
953 /**
954 * @tc.steps: step3. db4 delete (k1,v1) db3 sync again to db4.
955 * @tc.expected: step3. db4 get (k1,v1).
956 */
957 EXPECT_EQ(kvDelegatePtrS4->Delete(key), OK);
958 communicatorAggregator_->SetLocalDeviceId("DB3");
959 EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
960 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
961 communicatorAggregator_->SetLocalDeviceId("DB4");
962 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
963 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
964 EXPECT_EQ(actualValue, value);
965 CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
966 CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
967 }
968
969 /**
970 * @tc.name: DeviceCollaborationTest002
971 * @tc.desc: Check concurrent removeDeviceData and sync
972 * @tc.type: FUNC
973 * @tc.require:
974 * @tc.author: wangxiangdong
975 */
976 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest002, TestSize.Level0)
977 {
978 /**
979 * @tc.steps: step1. open db with DEVICE_COLLABORATION.
980 * @tc.expected: step1. return E_OK.
981 */
982 KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
983 KvStoreNbDelegate::Option option;
984 option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
985 ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
986 ASSERT_NE(kvDelegatePtrS3, nullptr);
987 KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
988 ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
989 ASSERT_NE(kvDelegatePtrS4, nullptr);
990 /**
991 * @tc.steps: step2. db3 put 200 k-v data.
992 * @tc.expected: step2. OK.
993 */
994 std::vector<Entry> entries;
995 for (int i = 0; i < 200; i++) {
996 std::string keyStr = "k_" + std::to_string(i);
997 std::string valueStr = "v_" + std::to_string(i);
998 Key key(keyStr.begin(), keyStr.end());
999 Value value(valueStr.begin(), valueStr.end());
1000 Entry entry;
1001 entry.key = key;
1002 entry.value = value;
1003 entries.push_back(entry);
1004 }
1005
1006 ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
1007 communicatorAggregator_->SetLocalDeviceId("DB3");
1008 /**
1009 * @tc.steps: step3. when sync, try to removeDeviceData.
1010 * @tc.expected: step3. sync stop.
1011 */
__anon4869f29f1102(const std::map<std::string, SyncProcess> &process) 1012 auto callback = [](const std::map<std::string, SyncProcess> &process) {
1013 LOGD("process finished");
1014 };
1015 auto actualRet = kvDelegatePtrS3->Sync(g_CloudSyncoption, callback);
1016 EXPECT_EQ(actualRet, OK);
1017 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait for 100ms
1018 EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1019 communicatorAggregator_->SetLocalDeviceId("DB4");
1020 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
1021 std::string keyStr = "k_1";
1022 Key key(keyStr.begin(), keyStr.end());
1023 Value actualValue;
1024 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), NOT_FOUND);
1025 CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
1026 CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
1027 }
1028
1029 /**
1030 * @tc.name: DeviceCollaborationTest003
1031 * @tc.desc: Check concurrent removeDeviceData and sync
1032 * @tc.type: FUNC
1033 * @tc.require:
1034 * @tc.author: liaoyonghuang
1035 */
1036 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest003, TestSize.Level1)
1037 {
1038 /**
1039 * @tc.steps: step1. Block in sync
1040 * @tc.expected: step1. return E_OK.
1041 */
1042 ASSERT_EQ(kvDelegatePtrS1_->Put(KEY_1, VALUE_1), OK);
__anon4869f29f1202(const std::string &, VBucket &) 1043 virtualCloudDb_->ForkUpload([](const std::string &, VBucket &) {
1044 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1045 });
__anon4869f29f1302() 1046 std::thread syncThread([&]() {
1047 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
1048 });
1049 /**
1050 * @tc.steps: step2. RemoveDeviceData concurrently
1051 * @tc.expected: step2. return E_OK.
1052 */
1053 std::this_thread::sleep_for(std::chrono::milliseconds(100));
__anon4869f29f1402() 1054 std::thread removeThread1([&]() {
1055 EXPECT_EQ(kvDelegatePtrS1_->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1056 });
__anon4869f29f1502() 1057 std::thread removeThread2([&]() {
1058 EXPECT_EQ(kvDelegatePtrS1_->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
1059 });
1060 syncThread.join();
1061 removeThread1.join();
1062 removeThread2.join();
1063 }
1064
1065 /**
1066 * @tc.name: SyncWithKvAndCloud001.
1067 * @tc.desc: Test sync data with same key and different sync way.
1068 * @tc.type: FUNC
1069 * @tc.require:
1070 * @tc.author: wangxiangdong
1071 */
1072 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithKvAndCloud001, TestSize.Level0)
1073 {
1074 /**
1075 * @tc.steps: step1. put k1 v1 by user1 and sync between devices.
1076 * @tc.expected: step1. return ok.
1077 */
1078 Key key = {'k', '1'};
1079 Value value = {'v', '1'};
1080 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
1081 std::vector<std::string> devices;
1082 devices.push_back(deviceB_->GetDeviceId());
1083 Query query = Query::Select().PrefixKey(key);
1084 std::map<std::string, DBStatus> result;
1085 DBStatus status = g_tool.SyncTest(kvDelegatePtrS2_, devices, SYNC_MODE_PUSH_PULL, result, query);
1086 ASSERT_TRUE(status == OK);
1087
1088 /**
1089 * @tc.steps: step2. insert k1 v2 and sync data to cloud.
1090 * @tc.expected: step2. return ok.
1091 */
1092 Value value2 = {'v', '2'};
1093 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
1094 CloudSyncOption syncOption;
1095 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
1096 syncOption.users.push_back(USER_ID);
1097 syncOption.devices.push_back("cloud");
1098 BlockSync(kvDelegatePtrS1_, OK, syncOption);
1099 /**
1100 * @tc.steps: step3. sync by device2.
1101 * @tc.expected: step3. return ok.
1102 */
1103 syncOption.users.clear();
1104 syncOption.users.push_back(USER_ID);
1105 BlockSync(kvDelegatePtrS2_, OK, syncOption);
1106 /**
1107 * @tc.steps: step4. get k1.
1108 * @tc.expected: step4. get v2.
1109 */
1110 Value actualValue;
1111 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
1112 EXPECT_EQ(actualValue, value2);
1113 }
1114 }