1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "db_base64_utils.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_virtual_device.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kvdb_manager.h"
26 #include "platform_specific.h"
27 #include "process_system_api_adapter_impl.h"
28 #include "sqlite_cloud_kv_executor_utils.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "virtual_cloud_db.h"
31
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36
37 namespace {
38 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
39 string g_testDir;
40 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
41 CloudSyncOption g_CloudSyncoption;
42 const std::string USER_ID_2 = "user2";
43 const std::string USER_ID_3 = "user3";
44 const int64_t WAIT_TIME = 5;
45 class DistributedDBCloudKvSyncerTest : public testing::Test {
46 public:
47 static void SetUpTestCase();
48 static void TearDownTestCase();
49 void SetUp();
50 void TearDown();
51 protected:
52 DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
53 bool invalidSchema = false);
54 void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
55 void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
56 DBStatus expectSyncResult = OK);
57 static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
58 void GetSingleStore();
59 void ReleaseSingleStore();
60 void BlockCompensatedSync(int &actSyncCnt, int expSyncCnt);
61 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
62 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
63 std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
64 KvStoreConfig config_;
65 KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
66 KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
67 SyncProcess lastProcess_;
68 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
69 KvVirtualDevice *deviceB_ = nullptr;
70 SQLiteSingleVerNaturalStore *singleStore_ = nullptr;
71 std::mutex comSyncMutex;
72 std::condition_variable comSyncCv;
73 };
74
SetUpTestCase()75 void DistributedDBCloudKvSyncerTest::SetUpTestCase()
76 {
77 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
78 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
79 LOGE("rm test db files error!");
80 }
81 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
82 g_CloudSyncoption.users.push_back(USER_ID);
83 g_CloudSyncoption.devices.push_back("cloud");
84
85 string dir = g_testDir + "/single_ver";
86 DIR* dirTmp = opendir(dir.c_str());
87 if (dirTmp == nullptr) {
88 OS::MakeDBDirectory(dir);
89 } else {
90 closedir(dirTmp);
91 }
92 }
93
TearDownTestCase()94 void DistributedDBCloudKvSyncerTest::TearDownTestCase()
95 {
96 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
97 LOGE("rm test db files error!");
98 }
99 }
100
SetUp()101 void DistributedDBCloudKvSyncerTest::SetUp()
102 {
103 DistributedDBToolsUnitTest::PrintTestCaseInfo();
104 config_.dataDir = g_testDir;
105 /**
106 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
107 */
108 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
109 virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
110 g_mgr.SetKvStoreConfig(config_);
111 KvStoreNbDelegate::Option option1;
112 ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
113 // set aggregator after get store1, only store2 can sync with p2p
114 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
115 ASSERT_TRUE(communicatorAggregator_ != nullptr);
116 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
117 KvStoreNbDelegate::Option option2;
118 ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
119
120 deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
121 ASSERT_TRUE(deviceB_ != nullptr);
122 auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
123 ASSERT_TRUE(syncInterfaceB != nullptr);
124 ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
125 GetSingleStore();
126 }
127
TearDown()128 void DistributedDBCloudKvSyncerTest::TearDown()
129 {
130 ReleaseSingleStore();
131 CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
132 CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
133 virtualCloudDb_ = nullptr;
134 virtualCloudDb2_ = nullptr;
135 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
136 LOGE("rm test db files error!");
137 }
138
139 if (deviceB_ != nullptr) {
140 delete deviceB_;
141 deviceB_ = nullptr;
142 }
143
144 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
145 communicatorAggregator_ = nullptr;
146 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
147 }
148
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)149 void DistributedDBCloudKvSyncerTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus,
150 CloudSyncOption option, DBStatus expectSyncResult)
151 {
152 if (delegate == nullptr) {
153 return;
154 }
155 std::mutex dataMutex;
156 std::condition_variable cv;
157 bool finish = false;
158 SyncProcess last;
159 auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
160 SyncProcess> &process) {
161 size_t notifyCnt = 0;
162 for (const auto &item: process) {
163 LOGD("user = %s, status = %d, errCode = %d", item.first.c_str(), item.second.process, item.second.errCode);
164 if (item.second.process != DistributedDB::FINISHED) {
165 continue;
166 }
167 EXPECT_EQ(item.second.errCode, expectDBStatus);
168 {
169 std::lock_guard<std::mutex> autoLock(dataMutex);
170 notifyCnt++;
171 std::set<std::string> userSet(option.users.begin(), option.users.end());
172 if (notifyCnt == userSet.size()) {
173 finish = true;
174 last = item.second;
175 cv.notify_one();
176 }
177 }
178 }
179 };
180 auto actualRet = delegate->Sync(option, callback);
181 EXPECT_EQ(actualRet, expectSyncResult);
182 if (actualRet == OK) {
183 std::unique_lock<std::mutex> uniqueLock(dataMutex);
184 cv.wait(uniqueLock, [&finish]() {
185 return finish;
186 });
187 }
188 lastProcess_ = last;
189 }
190
GetDataBaseSchema(bool invalidSchema)191 DataBaseSchema DistributedDBCloudKvSyncerTest::GetDataBaseSchema(bool invalidSchema)
192 {
193 DataBaseSchema schema;
194 TableSchema tableSchema;
195 tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
196 Field field;
197 field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
198 field.type = TYPE_INDEX<std::string>;
199 field.primary = true;
200 tableSchema.fields.push_back(field);
201 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
202 field.primary = false;
203 tableSchema.fields.push_back(field);
204 field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
205 tableSchema.fields.push_back(field);
206 field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
207 tableSchema.fields.push_back(field);
208 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
209 field.type = TYPE_INDEX<int64_t>;
210 tableSchema.fields.push_back(field);
211 schema.tables.push_back(tableSchema);
212 return schema;
213 }
214
GetSingleStore()215 void DistributedDBCloudKvSyncerTest::GetSingleStore()
216 {
217 KvDBProperties prop;
218 prop.SetStringProp(KvDBProperties::USER_ID, USER_ID);
219 prop.SetStringProp(KvDBProperties::APP_ID, APP_ID);
220 prop.SetStringProp(KvDBProperties::STORE_ID, STORE_ID_1);
221
222 std::string hashIdentifier = DBCommon::TransferHashString(
223 DBCommon::GenerateIdentifierId(STORE_ID_1, APP_ID, USER_ID, "", 0));
224 prop.SetStringProp(DBProperties::IDENTIFIER_DATA, hashIdentifier);
225 prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::SINGLE_VER_TYPE_SQLITE);
226 int errCode = E_OK;
227 singleStore_ = static_cast<SQLiteSingleVerNaturalStore *>(KvDBManager::OpenDatabase(prop, errCode));
228 ASSERT_NE(singleStore_, nullptr);
229 }
230
ReleaseSingleStore()231 void DistributedDBCloudKvSyncerTest::ReleaseSingleStore()
232 {
233 RefObject::DecObjRef(singleStore_);
234 singleStore_ = nullptr;
235 }
236
BlockCompensatedSync(int & actSyncCnt,int expSyncCnt)237 void DistributedDBCloudKvSyncerTest::BlockCompensatedSync(int &actSyncCnt, int expSyncCnt)
238 {
239 {
240 std::unique_lock<std::mutex> lock(comSyncMutex);
241 bool result = comSyncCv.wait_for(lock, std::chrono::seconds(WAIT_TIME),
242 [&actSyncCnt, expSyncCnt]() { return actSyncCnt == expSyncCnt; });
243 ASSERT_EQ(result, true);
244 }
245 }
246
247
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)248 DBStatus DistributedDBCloudKvSyncerTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
249 KvStoreNbDelegate::Option option, bool invalidSchema)
250 {
251 DBStatus openRet = OK;
252 g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
253 openRet = status;
254 delegate = openDelegate;
255 });
256 EXPECT_EQ(openRet, OK);
257 EXPECT_NE(delegate, nullptr);
258
259 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
260 cloudDbs[USER_ID] = virtualCloudDb_;
261 cloudDbs[USER_ID_2] = virtualCloudDb2_;
262 delegate->SetCloudDB(cloudDbs);
263 std::map<std::string, DataBaseSchema> schemas;
264 schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
265 schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
266 return delegate->SetCloudDbSchema(schemas);
267 }
268
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)269 void DistributedDBCloudKvSyncerTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
270 {
271 if (delegate != nullptr) {
272 ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
273 delegate = nullptr;
274 DBStatus status = g_mgr.DeleteKvStore(storeId);
275 LOGD("delete kv store status %d store %s", status, storeId.c_str());
276 ASSERT_EQ(status, OK);
277 }
278 }
279
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)280 void DistributedDBCloudKvSyncerTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
281 {
282 sqlite3 *db_;
283 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
284 std::string fileUrl = g_testDir + "/" \
285 "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
286 EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db_, flag, nullptr), SQLITE_OK);
287
288 std::string sql = "SELECT count(*) FROM naturalbase_kv_aux_sync_data_log WHERE ";
289 switch (opType) {
290 case OpType::INSERT:
291 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
292 " cloud_gid != '' AND version !='' AND cloud_flag=cloud_flag|0x10";
293 break;
294 case OpType::UPDATE:
295 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
296 " cloud_gid == '' AND version =='' AND cloud_flag=cloud_flag|0x10";
297 break;
298 case OpType::DELETE:
299 sql += " cloud_gid == '' AND version ==''";
300 break;
301 default:
302 break;
303 }
304 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
305 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
306 sqlite3_close_v2(db_);
307 }
308
309 /**
310 * @tc.name: UploadAbnormalSync001
311 * @tc.desc: Test upload update record, cloud returned record not found.
312 * @tc.type: FUNC
313 * @tc.require:
314 * @tc.author: bty
315 */
316 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync001, TestSize.Level0)
317 {
318 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
319 ASSERT_NE(cloudHook, nullptr);
320
321 /**
322 * @tc.steps:step1. Device A inserts data and synchronizes
323 * @tc.expected: step1 OK.
324 */
325 Key key = {'k'};
326 Value value = {'v'};
327 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
328 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
329
330 /**
331 * @tc.steps:step2. Device A update data and synchronizes, cloud returned record not found
332 * @tc.expected: step2 OK.
333 */
334 Value value2 = {'x'};
335 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
336 int upIdx = 0;
__anon9d833e5f0602(const std::string &tableName, VBucket &extend) 337 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
338 LOGD("cloud db upload index:%d", ++upIdx);
339 if (upIdx == 1) { // 1 is index
340 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
341 }
342 });
343 int syncCnt = 0;
__anon9d833e5f0702null344 cloudHook->SetSyncFinishHook([&syncCnt, this] {
345 LOGD("sync finish times:%d", ++syncCnt);
346 if (syncCnt == 1) { // 1 is the first sync
347 CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
348 } else {
349 CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
350 }
351 comSyncCv.notify_all();
352 });
353 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
354 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
355 virtualCloudDb_->ForkUpload(nullptr);
356 cloudHook->SetSyncFinishHook(nullptr);
357 }
358
359 /**
360 * @tc.name: UploadAbnormalSync002
361 * @tc.desc: Test upload insert record, cloud returned record already existed.
362 * @tc.type: FUNC
363 * @tc.require:
364 * @tc.author: bty
365 */
366 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync002, TestSize.Level0)
367 {
368 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
369 ASSERT_NE(cloudHook, nullptr);
370
371 /**
372 * @tc.steps:step1. Device A inserts k-v and synchronizes
373 * @tc.expected: step1 OK.
374 */
375 Key key = {'k'};
376 Value value = {'v'};
377 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
378 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
379
380 /**
381 * @tc.steps:step2. Device A insert k2-v2 and synchronizes, Device B insert k2-v2 and sync before A upload
382 * @tc.expected: step2 OK.
383 */
384 Key key2 = {'x'};
385 Value value2 = {'y'};
386 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, value2), OK);
387 int upIdx = 0;
__anon9d833e5f0802(const std::string &tableName, VBucket &extend) 388 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
389 LOGD("cloud db upload index:%d", ++upIdx);
390 if (upIdx == 2) { // 2 is index
391 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
392 }
393 });
394 int doUpIdx = 0;
__anon9d833e5f0902null395 cloudHook->SetDoUploadHook([&doUpIdx, key2, value2, this] {
396 LOGD("begin upload index:%d", ++doUpIdx);
397 ASSERT_EQ(kvDelegatePtrS2_->Put(key2, value2), OK);
398 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
399 });
400 int syncCnt = 0;
__anon9d833e5f0a02null401 cloudHook->SetSyncFinishHook([&syncCnt, this] {
402 LOGD("sync finish times:%d", ++syncCnt);
403 if (syncCnt == 1) { // 1 is the normal sync
404 CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
405 } else {
406 CheckUploadAbnormal(OpType::INSERT, 2L, true); // 2 is expected count
407 }
408 comSyncCv.notify_all();
409 });
410 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
411 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
412 virtualCloudDb_->ForkUpload(nullptr);
413 cloudHook->SetSyncFinishHook(nullptr);
414 cloudHook->SetDoUploadHook(nullptr);
415 }
416
417 /**
418 * @tc.name: UploadAbnormalSync003
419 * @tc.desc: Test upload delete record, cloud returned record not found.
420 * @tc.type: FUNC
421 * @tc.require:
422 * @tc.author: bty
423 */
424 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync003, TestSize.Level0)
425 {
426 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
427 ASSERT_NE(cloudHook, nullptr);
428
429 /**
430 * @tc.steps:step1. Device A inserts data and synchronizes
431 * @tc.expected: step1 OK.
432 */
433 Key key = {'k'};
434 Value value = {'v'};
435 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
436 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
437 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
438
439 /**
440 * @tc.steps:step2. Device A delete data and synchronizes, Device B delete data and sync before A upload
441 * @tc.expected: step2 OK.
442 */
443 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
444 int upIdx = 0;
__anon9d833e5f0b02(const std::string &tableName, VBucket &extend) 445 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
446 LOGD("cloud db upload index:%d", ++upIdx);
447 if (upIdx == 2) { // 2 is index
448 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
449 }
450 });
451 int doUpIdx = 0;
__anon9d833e5f0c02null452 cloudHook->SetDoUploadHook([&doUpIdx, key, this] {
453 LOGD("begin upload index:%d", ++doUpIdx);
454 ASSERT_EQ(kvDelegatePtrS2_->Delete(key), OK);
455 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
456 });
457 int syncCnt = 0;
__anon9d833e5f0d02null458 cloudHook->SetSyncFinishHook([&syncCnt, this] {
459 LOGD("sync finish times:%d", ++syncCnt);
460 if (syncCnt == 1) { // 1 is the normal sync
461 CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
462 } else {
463 CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
464 }
465 comSyncCv.notify_all();
466 });
467 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
468 BlockCompensatedSync(syncCnt, 1); // 1 is sync times
469 virtualCloudDb_->ForkUpload(nullptr);
470 cloudHook->SetSyncFinishHook(nullptr);
471 cloudHook->SetDoUploadHook(nullptr);
472 }
473
474 /**
475 * @tc.name: QueryParsingProcessTest001
476 * @tc.desc: Test Query parsing process.
477 * @tc.type: FUNC
478 * @tc.require:
479 * @tc.author: luoguo
480 */
481 HWTEST_F(DistributedDBCloudKvSyncerTest, QueryParsingProcessTest001, TestSize.Level0)
482 {
483 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
484 ASSERT_NE(cloudHook, nullptr);
485
486 /**
487 * @tc.steps:step1. Device A inserts data and synchronizes
488 * @tc.expected: step1 OK.
489 */
490 Key key = {'k'};
491 Value value = {'v'};
492 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
493
494 /**
495 * @tc.steps:step2. Test Query parsing Process
496 * @tc.expected: step2 OK.
497 */
498 QuerySyncObject syncObject;
499 std::vector<VBucket> syncDataPk;
500 VBucket bucket;
501 bucket.insert_or_assign(std::string("k"), std::string("k"));
502 syncDataPk.push_back(bucket);
503 std::string tableName = "sync_data";
504 ASSERT_EQ(CloudStorageUtils::GetSyncQueryByPk(tableName, syncDataPk, true, syncObject), E_OK);
505
506 Bytes bytes;
507 bytes.resize(syncObject.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
508 Parcel parcel(bytes.data(), bytes.size());
509 ASSERT_EQ(syncObject.SerializeData(parcel, SOFTWARE_VERSION_CURRENT), E_OK);
510
511 /**
512 * @tc.steps:step3. Check Node's type is QueryNodeType::IN.
513 * @tc.expected: step3 OK.
514 */
515 std::vector<QueryNode> queryNodes;
516 syncObject.ParserQueryNodes(bytes, queryNodes);
517 ASSERT_EQ(queryNodes[0].type, QueryNodeType::IN);
518 }
519
520 /**
521 * @tc.name: DeviceCollaborationTest001
522 * @tc.desc: Check force override data
523 * @tc.type: FUNC
524 * @tc.require:
525 * @tc.author: zqq
526 */
527 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest001, TestSize.Level0)
528 {
529 /**
530 * @tc.steps: step1. open db with DEVICE_COLLABORATION.
531 * @tc.expected: step1. return E_OK.
532 */
533 KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
534 KvStoreNbDelegate::Option option;
535 option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
536 ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
537 ASSERT_NE(kvDelegatePtrS3, nullptr);
538 KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
539 ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
540 ASSERT_NE(kvDelegatePtrS4, nullptr);
541 /**
542 * @tc.steps: step2. db3 put (k1,v1) sync to db4.
543 * @tc.expected: step2. db4 get (k1,v1).
544 */
545 Key key = {'k'};
546 Value value = {'v'};
547 EXPECT_EQ(kvDelegatePtrS3->Put(key, value), OK);
548 communicatorAggregator_->SetLocalDeviceId("DB3");
549 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
550 communicatorAggregator_->SetLocalDeviceId("DB4");
551 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
552 Value actualValue;
553 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
554 EXPECT_EQ(actualValue, value);
555 /**
556 * @tc.steps: step3. db4 delete (k1,v1) db3 sync again to db4.
557 * @tc.expected: step3. db4 get (k1,v1).
558 */
559 EXPECT_EQ(kvDelegatePtrS4->Delete(key), OK);
560 communicatorAggregator_->SetLocalDeviceId("DB3");
561 EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
562 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
563 communicatorAggregator_->SetLocalDeviceId("DB4");
564 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
565 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
566 EXPECT_EQ(actualValue, value);
567 CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
568 CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
569 }
570 }