1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include <utility>
18 #include "cloud_db_constant.h"
19 #include "cloud_db_data_utils.h"
20 #include "cloud_db_types.h"
21 #include "cloud_db_proxy.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_store_errno.h"
24 #include "mock_icloud_sync_storage_interface.h"
25 #include "virtual_cloud_db.h"
26 #include "virtual_cloud_syncer.h"
27
28 using namespace std;
29 using namespace testing::ext;
30 using namespace DistributedDB;
31
32 namespace {
33 constexpr const char *TABLE_NAME = "Table";
GetFields()34 std::vector<Field> GetFields()
35 {
36 return {
37 {
38 .colName = "col1",
39 .type = TYPE_INDEX<int64_t>,
40 .primary = true,
41 .nullable = false
42 },
43 {
44 .colName = "col2",
45 .type = TYPE_INDEX<std::string>,
46 .primary = false
47 },
48 {
49 .colName = "col3",
50 .type = TYPE_INDEX<Bytes>,
51 .primary = false
52 }
53 };
54 }
55
ModifyRecords(std::vector<VBucket> & expectRecord)56 void ModifyRecords(std::vector<VBucket> &expectRecord)
57 {
58 std::vector<VBucket> tempRecord;
59 for (const auto &record: expectRecord) {
60 VBucket bucket;
61 for (auto &[field, val] : record) {
62 LOGD("modify field %s", field.c_str());
63 if (val.index() == TYPE_INDEX<int64_t>) {
64 int64_t v = std::get<int64_t>(val);
65 bucket.insert({ field, static_cast<int64_t>(v + 1) });
66 } else {
67 bucket.insert({ field, val });
68 }
69 }
70 tempRecord.push_back(bucket);
71 }
72 expectRecord = tempRecord;
73 }
74
Sync(CloudSyncer * cloudSyncer,int & callCount)75 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
76 {
77 std::mutex processMutex;
78 std::condition_variable cv;
79 SyncProcess syncProcess;
80 const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
81 const std::map<std::string, SyncProcess> &process) {
82 {
83 std::lock_guard<std::mutex> autoLock(processMutex);
84 syncProcess = process.begin()->second;
85 if (!process.empty()) {
86 syncProcess = process.begin()->second;
87 } else {
88 SyncProcess tmpProcess;
89 syncProcess = tmpProcess;
90 }
91 callCount++;
92 }
93 cv.notify_all();
94 };
95 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
96 {
97 LOGI("begin to wait sync");
98 std::unique_lock<std::mutex> uniqueLock(processMutex);
99 cv.wait(uniqueLock, [&syncProcess]() {
100 return syncProcess.process == ProcessStatus::FINISHED;
101 });
102 LOGI("end to wait sync");
103 }
104 return syncProcess.errCode;
105 }
106
107 class DistributedDBCloudDBProxyTest : public testing::Test {
108 public:
109 static void SetUpTestCase();
110 static void TearDownTestCase();
111 void SetUp() override;
112 void TearDown() override;
113
114 protected:
115 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
116 };
117
SetUpTestCase()118 void DistributedDBCloudDBProxyTest::SetUpTestCase()
119 {
120 }
121
TearDownTestCase()122 void DistributedDBCloudDBProxyTest::TearDownTestCase()
123 {
124 }
125
SetUp()126 void DistributedDBCloudDBProxyTest::SetUp()
127 {
128 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
129 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
130 }
131
TearDown()132 void DistributedDBCloudDBProxyTest::TearDown()
133 {
134 virtualCloudDb_ = nullptr;
135 }
136
137 /**
138 * @tc.name: CloudDBProxyTest001
139 * @tc.desc: Verify cloud db init and close function.
140 * @tc.type: FUNC
141 * @tc.require:
142 * @tc.author: zhangqiquan
143 */
144 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
145 {
146 /**
147 * @tc.steps: step1. set cloud db to proxy
148 * @tc.expected: step1. E_OK
149 */
150 CloudDBProxy proxy;
151 proxy.SetCloudDB(virtualCloudDb_);
152 /**
153 * @tc.steps: step2. proxy close cloud db with cloud error
154 * @tc.expected: step2. -E_CLOUD_ERROR
155 */
156 virtualCloudDb_->SetCloudError(true);
157 EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
158 /**
159 * @tc.steps: step3. proxy close cloud db again
160 * @tc.expected: step3. E_OK because cloud db has been set nullptr
161 */
162 EXPECT_EQ(proxy.Close(), E_OK);
163 virtualCloudDb_->SetCloudError(false);
164 EXPECT_EQ(proxy.Close(), E_OK);
165 }
166
167 /**
168 * @tc.name: CloudDBProxyTest002
169 * @tc.desc: Verify cloud db insert function.
170 * @tc.type: FUNC
171 * @tc.require:
172 * @tc.author: zhangqiquan
173 */
174 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
175 {
176 /**
177 * @tc.steps: step1. set cloud db to proxy
178 * @tc.expected: step1. E_OK
179 */
180 CloudDBProxy proxy;
181 proxy.SetCloudDB(virtualCloudDb_);
182 /**
183 * @tc.steps: step2. insert data to cloud db
184 * @tc.expected: step2. OK
185 */
186 TableSchema schema = {
187 .name = TABLE_NAME,
188 .fields = GetFields()
189 };
190 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
191 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
192 Info uploadInfo;
193 std::vector<VBucket> insert = expectRecords;
194 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), OK);
195
196 VBucket extend;
197 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
198 std::vector<VBucket> actualRecords;
199 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
200 /**
201 * @tc.steps: step3. proxy query data
202 * @tc.expected: step3. data is equal to expect
203 */
204 ASSERT_EQ(actualRecords.size(), expectRecords.size());
205 for (size_t i = 0; i < actualRecords.size(); ++i) {
206 for (const auto &field: schema.fields) {
207 Type expect = expectRecords[i][field.colName];
208 Type actual = actualRecords[i][field.colName];
209 EXPECT_EQ(expect.index(), actual.index());
210 }
211 }
212 /**
213 * @tc.steps: step4. proxy close cloud db
214 * @tc.expected: step4. E_OK
215 */
216 EXPECT_EQ(proxy.Close(), E_OK);
217 }
218
219 /**
220 * @tc.name: CloudDBProxyTest003
221 * @tc.desc: Verify cloud db update function.
222 * @tc.type: FUNC
223 * @tc.require:
224 * @tc.author: zhangqiquan
225 */
226 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
227 {
228 TableSchema schema = {
229 .name = TABLE_NAME,
230 .fields = GetFields()
231 };
232 /**
233 * @tc.steps: step1. set cloud db to proxy
234 * @tc.expected: step1. E_OK
235 */
236 CloudDBProxy proxy;
237 proxy.SetCloudDB(virtualCloudDb_);
238 /**
239 * @tc.steps: step2. insert data to cloud db
240 * @tc.expected: step2. OK
241 */
242 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
243 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
244 Info uploadInfo;
245 std::vector<VBucket> insert = expectRecords;
246 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), OK);
247 /**
248 * @tc.steps: step3. update data to cloud db
249 * @tc.expected: step3. E_OK
250 */
251 ModifyRecords(expectRecords);
252 std::vector<VBucket> update = expectRecords;
253 EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), OK);
254 /**
255 * @tc.steps: step3. proxy close cloud db
256 * @tc.expected: step3. E_OK
257 */
258 VBucket extend;
259 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
260 std::vector<VBucket> actualRecords;
261 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
262 ASSERT_EQ(actualRecords.size(), expectRecords.size());
263 for (size_t i = 0; i < actualRecords.size(); ++i) {
264 for (const auto &field: schema.fields) {
265 Type expect = expectRecords[i][field.colName];
266 Type actual = actualRecords[i][field.colName];
267 EXPECT_EQ(expect.index(), actual.index());
268 }
269 }
270 /**
271 * @tc.steps: step4. proxy close cloud db
272 * @tc.expected: step4. E_OK
273 */
274 EXPECT_EQ(proxy.Close(), E_OK);
275 }
276
277 /**
278 * @tc.name: CloudDBProxyTest004
279 * @tc.desc: Verify cloud db heartbeat and lock function.
280 * @tc.type: FUNC
281 * @tc.require:
282 * @tc.author: zhangqiquan
283 */
284 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest004, TestSize.Level3)
285 {
286 /**
287 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
288 * @tc.expected: step1. E_OK
289 */
290 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
291 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
292 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
293 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
294 ASSERT_NE(cloudSyncer, nullptr);
295 cloudSyncer->SetCloudDB(virtualCloudDb_);
296 cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310402() 297 cloudSyncer->SetDownloadFunc([]() {
298 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
299 return E_OK;
300 });
301 /**
302 * @tc.steps: step2. call sync and wait sync finish
303 * @tc.expected: step2. E_OK
304 */
305 std::mutex processMutex;
306 std::condition_variable cv;
307 SyncProcess syncProcess;
308 LOGI("[CloudDBProxyTest004] Call cloud sync");
__anon6a98ce310502(const std::map<std::string, SyncProcess> &process) 309 const auto callback = [&syncProcess, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
310 {
311 std::lock_guard<std::mutex> autoLock(processMutex);
312 if (process.size() >= 1u) {
313 syncProcess = std::move(process.begin()->second);
314 } else {
315 SyncProcess tmpProcess;
316 syncProcess = tmpProcess;
317 }
318 }
319 cv.notify_all();
320 };
321 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
322 std::this_thread::sleep_for(std::chrono::seconds(1));
323 EXPECT_TRUE(virtualCloudDb_->GetLockStatus());
324 {
325 LOGI("[CloudDBProxyTest004] begin to wait sync");
326 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon6a98ce310602() 327 cv.wait(uniqueLock, [&syncProcess]() {
328 return syncProcess.process == ProcessStatus::FINISHED;
329 });
330 LOGI("[CloudDBProxyTest004] end to wait sync");
331 EXPECT_EQ(syncProcess.errCode, OK);
332 }
333 /**
334 * @tc.steps: step3. get cloud lock status and heartbeat count
335 * @tc.expected: step3. cloud is unlock and more than twice heartbeat
336 */
337 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
338 EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 2);
339 virtualCloudDb_->ClearHeartbeatCount();
340 cloudSyncer->Close();
341 RefObject::KillAndDecObjRef(cloudSyncer);
342 }
343
344 /**
345 * @tc.name: CloudDBProxyTest005
346 * @tc.desc: Verify sync failed after cloud error.
347 * @tc.type: FUNC
348 * @tc.require:
349 * @tc.author: zhangqiquan
350 */
351 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
352 {
353 /**
354 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
355 * @tc.expected: step1. E_OK
356 */
357 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
358 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
359 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
360 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
361 ASSERT_NE(cloudSyncer, nullptr);
362 cloudSyncer->SetCloudDB(virtualCloudDb_);
363 cloudSyncer->SetSyncAction(false, false);
364 virtualCloudDb_->SetCloudError(true);
365 /**
366 * @tc.steps: step2. call sync and wait sync finish
367 * @tc.expected: step2. CLOUD_ERROR by lock error
368 */
369 int callCount = 0;
370 EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
371 /**
372 * @tc.steps: step3. get cloud lock status and heartbeat count
373 * @tc.expected: step3. cloud is unlock and no heartbeat
374 */
375 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
376 EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
377 virtualCloudDb_->ClearHeartbeatCount();
378 cloudSyncer->Close();
379 RefObject::KillAndDecObjRef(cloudSyncer);
380 }
381
382 /**
383 * @tc.name: CloudDBProxyTest006
384 * @tc.desc: Verify sync failed by heartbeat failed.
385 * @tc.type: FUNC
386 * @tc.require:
387 * @tc.author: zhangqiquan
388 */
389 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest006, TestSize.Level3)
390 {
391 /**
392 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
393 * @tc.expected: step1. E_OK
394 */
395 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
396 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
397 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
398 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
399 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
400 ASSERT_NE(cloudSyncer, nullptr);
401 cloudSyncer->SetCloudDB(virtualCloudDb_);
402 cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310702() 403 cloudSyncer->SetDownloadFunc([cloudSyncer]() {
404 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
405 cloudSyncer->Notify(false);
406 return E_OK;
407 });
408 virtualCloudDb_->SetHeartbeatError(true);
409 /**
410 * @tc.steps: step2. call sync and wait sync finish
411 * @tc.expected: step2. sync failed by heartbeat error
412 */
413 int callCount = 0;
414 EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
415 RuntimeContext::GetInstance()->StopTaskPool();
416 EXPECT_EQ(callCount, 1);
417 /**
418 * @tc.steps: step3. get cloud lock status and heartbeat count
419 * @tc.expected: step3. cloud is unlock and twice heartbeat
420 */
421 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
422 EXPECT_EQ(virtualCloudDb_->GetHeartbeatCount(), 2);
423 virtualCloudDb_->ClearHeartbeatCount();
424 cloudSyncer->Close();
425 RefObject::KillAndDecObjRef(cloudSyncer);
426 }
427
428 /**
429 * @tc.name: CloudDBProxyTest007
430 * @tc.desc: Verify syncer close after notify finish.
431 * @tc.type: FUNC
432 * @tc.require:
433 * @tc.author: zhangqiquan
434 */
435 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest007, TestSize.Level4)
436 {
437 /**
438 * @tc.steps: step1. set cloud db to proxy
439 * @tc.expected: step1. E_OK
440 */
441 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
442 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
443 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
444 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
445 ASSERT_NE(cloudSyncer, nullptr);
446 cloudSyncer->SetCloudDB(virtualCloudDb_);
447 cloudSyncer->SetSyncAction(false, false);
448 /**
449 * @tc.steps: step2. call sync and wait sync finish
450 * @tc.expected: step2. notify before close finished
451 */
452 std::atomic<bool> close = false;
453 int callCount = 0;
454 std::mutex callMutex;
455 std::condition_variable cv;
456 const auto callback = [&close, &callCount, &callMutex, &cv](
__anon6a98ce310802( const std::map<std::string, SyncProcess> &) 457 const std::map<std::string, SyncProcess> &) {
458 std::this_thread::sleep_for(std::chrono::seconds(5)); // block notify 5s
459 {
460 std::lock_guard<std::mutex> autoLock(callMutex);
461 callCount++;
462 }
463 cv.notify_all();
464 EXPECT_EQ(close, false);
465 };
466 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
467 /**
468 * @tc.steps: step3. wait notify finished
469 */
470 std::this_thread::sleep_for(std::chrono::seconds(2)); // block 2s
471 cloudSyncer->Close();
472 close = true;
473 {
474 LOGI("begin to wait sync");
475 std::unique_lock<std::mutex> uniqueLock(callMutex);
__anon6a98ce310902() 476 cv.wait(uniqueLock, [&callCount]() {
477 return callCount > 0;
478 });
479 LOGI("end to wait sync");
480 }
481 RefObject::KillAndDecObjRef(cloudSyncer);
482 }
483
484 /**
485 * @tc.name: CloudDBProxyTest008
486 * @tc.desc: Verify cloud db heartbeat with diff status.
487 * @tc.type: FUNC
488 * @tc.require:
489 * @tc.author: zhangqiquan
490 */
491 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
492 {
493 /**
494 * @tc.steps: step1. set cloud db to proxy
495 * @tc.expected: step1. E_OK
496 */
497 CloudDBProxy proxy;
498 proxy.SetCloudDB(virtualCloudDb_);
499 /**
500 * @tc.steps: step2. proxy heartbeat with diff status
501 */
502 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
503 int errCode = proxy.HeartBeat();
504 EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
505 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
506
507 virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
508 errCode = proxy.HeartBeat();
509 EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
510 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
511
512 virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
513 errCode = proxy.HeartBeat();
514 EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
515 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
516
517 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
518 errCode = proxy.HeartBeat();
519 EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
520 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
521
522 virtualCloudDb_->SetActionStatus(DB_ERROR);
523 errCode = proxy.HeartBeat();
524 EXPECT_EQ(errCode, -E_CLOUD_ERROR);
525 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
526
527 /**
528 * @tc.steps: step3. proxy close cloud db
529 * @tc.expected: step3. E_OK
530 */
531 EXPECT_EQ(proxy.Close(), E_OK);
532 }
533
534 /**
535 * @tc.name: CloudDBProxyTest009
536 * @tc.desc: Verify cloud db closed and current task exit .
537 * @tc.type: FUNC
538 * @tc.require:
539 * @tc.author: zhangqiquan
540 */
541 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
542 {
543 /**
544 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
545 * @tc.expected: step1. E_OK
546 */
547 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
548 ASSERT_NE(iCloud, nullptr);
549 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
550 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
551 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
552 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
553 ASSERT_NE(cloudSyncer, nullptr);
554 cloudSyncer->SetCloudDB(virtualCloudDb_);
555 cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310a02() 556 cloudSyncer->SetDownloadFunc([]() {
557 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
558 return -E_CLOUD_ERROR;
559 });
560 /**
561 * @tc.steps: step2. call sync and wait sync finish
562 * @tc.expected: step2. E_OK
563 */
564 std::mutex processMutex;
565 bool finished = false;
566 std::condition_variable cv;
567 LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon6a98ce310b02(const std::map<std::string, SyncProcess> &process) 568 const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
569 {
570 std::lock_guard<std::mutex> autoLock(processMutex);
571 for (const auto &item: process) {
572 if (item.second.process == DistributedDB::FINISHED) {
573 finished = true;
574 EXPECT_EQ(item.second.errCode, DB_CLOSED);
575 }
576 }
577 }
578 cv.notify_all();
579 };
580 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
581 std::this_thread::sleep_for(std::chrono::seconds(1));
582 cloudSyncer->Close();
583 {
584 LOGI("[CloudDBProxyTest009] begin to wait sync");
585 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon6a98ce310c02() 586 cv.wait(uniqueLock, [&finished]() {
587 return finished;
588 });
589 LOGI("[CloudDBProxyTest009] end to wait sync");
590 }
591 RefObject::KillAndDecObjRef(cloudSyncer);
592 }
593
594 /**
595 * @tc.name: CloudDBProxyTest010
596 * @tc.desc: Verify cloud db lock with diff status.
597 * @tc.type: FUNC
598 * @tc.require:
599 * @tc.author: zhangqiquan
600 */
601 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
602 {
603 /**
604 * @tc.steps: step1. set cloud db to proxy
605 * @tc.expected: step1. E_OK
606 */
607 CloudDBProxy proxy;
608 proxy.SetCloudDB(virtualCloudDb_);
609 /**
610 * @tc.steps: step2. proxy lock with diff status
611 */
612 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
613 auto ret = proxy.Lock();
614 EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
615 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
616
617 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
618 ret = proxy.Lock();
619 EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
620 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
621 /**
622 * @tc.steps: step3. proxy close cloud db
623 * @tc.expected: step3. E_OK
624 */
625 EXPECT_EQ(proxy.Close(), E_OK);
626 }
627
628 /**
629 * @tc.name: CloudSyncQueue001
630 * @tc.desc: Verify sync task count decrease after sync finished.
631 * @tc.type: FUNC
632 * @tc.require:
633 * @tc.author: zhangqiquan
634 */
635 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
636 {
637 /**
638 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
639 * @tc.expected: step1. E_OK
640 */
641 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
642 ASSERT_NE(iCloud, nullptr);
643 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
644 ASSERT_NE(cloudSyncer, nullptr);
645 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
646 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
647 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
648 cloudSyncer->SetCloudDB(virtualCloudDb_);
649 cloudSyncer->SetSyncAction(true, false);
__anon6a98ce310d02() 650 cloudSyncer->SetDownloadFunc([cloudSyncer]() {
651 EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
652 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
653 return E_OK;
654 });
655 /**
656 * @tc.steps: step2. call sync and wait sync finish
657 */
658 int callCount = 0;
659 EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
660 RuntimeContext::GetInstance()->StopTaskPool();
661 EXPECT_EQ(callCount, 1);
662 }
663
664 /**
665 * @tc.name: CloudSyncerTest001
666 * @tc.desc: Verify syncer notify by queue schedule.
667 * @tc.type: FUNC
668 * @tc.require:
669 * @tc.author: zhangqiquan
670 */
671 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
672 {
673 /**
674 * @tc.steps: step1. set cloud db to proxy
675 * @tc.expected: step1. E_OK
676 */
677 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
678 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
679 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
680 EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
681 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
682 std::atomic<int> callCount = 0;
__anon6a98ce310e02(const std::map<std::string, SyncProcess> &) 683 cloudSyncer->SetCurrentTaskInfo([&callCount](const std::map<std::string, SyncProcess> &) {
684 callCount++;
685 int before = callCount;
686 LOGD("on callback %d", before);
687 std::this_thread::sleep_for(std::chrono::seconds(1));
688 EXPECT_EQ(before, callCount);
689 }, 1u);
690 const int notifyCount = 2;
691 for (int i = 0; i < notifyCount; ++i) {
692 cloudSyncer->Notify();
693 }
694 cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
695 cloudSyncer->Close();
696 RefObject::KillAndDecObjRef(cloudSyncer);
697 }
698 }