1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include "cloud/assets_download_manager.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/mock_icloud_sync_storage_interface.h"
20 #include "cloud/virtual_asset_loader.h"
21 #include "cloud/virtual_cloud_data_translate.h"
22 #include "cloud/virtual_cloud_syncer.h"
23 #include "cloud_db_sync_utils_test.h"
24 #include "distributeddb_data_generate_unit_test.h"
25 #include "distributeddb_tools_unit_test.h"
26 #include "res_finalizer.h"
27 #include "rdb_data_generator.h"
28 #include "relational_store_client.h"
29 #include "relational_store_manager.h"
30 #include "runtime_config.h"
31 #include "virtual_communicator_aggregator.h"
32
33 using namespace testing::ext;
34 using namespace DistributedDB;
35 using namespace DistributedDBUnitTest;
36 using namespace std;
37
38 namespace {
39 string g_testDir;
40 const std::string QUERY_INCONSISTENT_SQL =
41 "select count(*) from naturalbase_rdb_aux_AsyncDownloadAssetsTest_log where flag&0x20!=0;";
42 IRelationalStore *g_store = nullptr;
43 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
44 RelationalStoreManager g_mgr(APP_ID, USER_ID);
45 typedef struct SkipAssetTestParam {
46 DBStatus downloadRes;
47 bool useBatch;
48 bool useAsync;
49 int startIndex;
50 int expectInconsistentCount;
51 DBStatus expectSyncRes;
52 } SkipAssetTestParamT;
53 class DistributedDBCloudAsyncDownloadAssetsTest : public testing::Test {
54 public:
55 static void SetUpTestCase();
56 static void TearDownTestCase();
57 void SetUp() override;
58 void TearDown() override;
59 protected:
60 static DataBaseSchema GetSchema(bool multiTables = false);
61 static TableSchema GetTableSchema(const std::string &tableName, bool withoutAsset = false);
62 static CloudSyncOption GetAsyncCloudSyncOption();
63 static int GetAssetFieldCount();
64 void InitStore();
65 void CloseDb();
66 void DoSkipAssetDownload(SkipAssetTestParamT param);
67 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end);
68 void DeleteLocalData(sqlite3 *&db, const std::string &tableName);
69 void CheckLogTable(sqlite3 *&db, const std::string &tableName, int count);
70 void UpdateLocalAndCheckUploadCount(const bool &isAsync, const int &dataCount, const int &expectCount);
71 std::string storePath_;
72 sqlite3 *db_ = nullptr;
73 RelationalStoreDelegate *delegate_ = nullptr;
74 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
75 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
76 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
77 };
78
SetUpTestCase()79 void DistributedDBCloudAsyncDownloadAssetsTest::SetUpTestCase()
80 {
81 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
83 LOGE("rm test db files error!");
84 }
85 }
86
TearDownTestCase()87 void DistributedDBCloudAsyncDownloadAssetsTest::TearDownTestCase()
88 {
89 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
90 LOGE("rm test db files error!");
91 }
92 }
93
SetUp()94 void DistributedDBCloudAsyncDownloadAssetsTest::SetUp()
95 {
96 DistributedDBToolsUnitTest::PrintTestCaseInfo();
97 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
98 InitStore();
99 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
100 ASSERT_TRUE(communicatorAggregator_ != nullptr);
101 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
102 }
103
TearDown()104 void DistributedDBCloudAsyncDownloadAssetsTest::TearDown()
105 {
106 RefObject::DecObjRef(g_store);
107 CloseDb();
108 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
109 LOGE("rm test db files error.");
110 }
111 virtualCloudDb_ = nullptr;
112 virtualAssetLoader_ = nullptr;
113 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
114 communicatorAggregator_ = nullptr;
115 }
116
GetSchema(bool multiTables)117 DataBaseSchema DistributedDBCloudAsyncDownloadAssetsTest::GetSchema(bool multiTables)
118 {
119 DataBaseSchema schema;
120 schema.tables.push_back(GetTableSchema("AsyncDownloadAssetsTest"));
121 if (multiTables) {
122 schema.tables.push_back(GetTableSchema("TABLE1"));
123 schema.tables.push_back(GetTableSchema("TABLE2"));
124 }
125 return schema;
126 }
127
GetTableSchema(const std::string & tableName,bool withoutAsset)128 TableSchema DistributedDBCloudAsyncDownloadAssetsTest::GetTableSchema(const std::string &tableName, bool withoutAsset)
129 {
130 TableSchema tableSchema;
131 tableSchema.name = tableName;
132 Field field;
133 field.primary = true;
134 field.type = TYPE_INDEX<int64_t>;
135 field.colName = "pk";
136 tableSchema.fields.push_back(field);
137 field.primary = false;
138 field.colName = "int_field";
139 tableSchema.fields.push_back(field);
140 if (withoutAsset) {
141 return tableSchema;
142 }
143 field.type = TYPE_INDEX<Assets>;
144 field.colName = "assets_1";
145 tableSchema.fields.push_back(field);
146 field.colName = "asset_1";
147 field.type = TYPE_INDEX<Asset>;
148 tableSchema.fields.push_back(field);
149 return tableSchema;
150 }
151
GetAsyncCloudSyncOption()152 CloudSyncOption DistributedDBCloudAsyncDownloadAssetsTest::GetAsyncCloudSyncOption()
153 {
154 CloudSyncOption option;
155 std::vector<std::string> tables;
156 auto schema = GetSchema();
157 for (const auto &table : schema.tables) {
158 tables.push_back(table.name);
159 LOGW("[DistributedDBCloudAsyncDownloadAssetsTest] Sync with table %s", table.name.c_str());
160 }
161 option.devices = {"cloud"};
162 option.query = Query::Select().FromTable(tables);
163 option.mode = SYNC_MODE_CLOUD_MERGE;
164 option.asyncDownloadAssets = true;
165 return option;
166 }
167
GetAssetFieldCount()168 int DistributedDBCloudAsyncDownloadAssetsTest::GetAssetFieldCount()
169 {
170 int count = 0;
171 auto schema = GetSchema();
172 for (const auto &table : schema.tables) {
173 for (const auto &field : table.fields) {
174 if (field.type == TYPE_INDEX<Assets> || field.type == TYPE_INDEX<Asset>) {
175 count++;
176 }
177 }
178 }
179 return count;
180 }
181
GetRelationalStore()182 const RelationalSyncAbleStorage *GetRelationalStore()
183 {
184 RelationalDBProperties properties;
185 CloudDBSyncUtilsTest::InitStoreProp(g_testDir + "/" + STORE_ID_1 + ".db", APP_ID, USER_ID, STORE_ID_1, properties);
186 int errCode = E_OK;
187 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
188 if (g_store == nullptr) {
189 LOGE("Get db failed:%d", errCode);
190 return nullptr;
191 }
192 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
193 }
194
InitStore()195 void DistributedDBCloudAsyncDownloadAssetsTest::InitStore()
196 {
197 if (storePath_.empty()) {
198 storePath_ = g_testDir + "/" + STORE_ID_1 + ".db";
199 }
200 db_ = RelationalTestUtils::CreateDataBase(storePath_);
201 ASSERT_NE(db_, nullptr);
202 auto schema = GetSchema(true);
203 EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
204 ASSERT_EQ(g_mgr.OpenStore(storePath_, STORE_ID_1, {}, delegate_), OK);
205 ASSERT_NE(delegate_, nullptr);
206 for (const auto &table : schema.tables) {
207 EXPECT_EQ(delegate_->CreateDistributedTable(table.name, TableSyncType::CLOUD_COOPERATION), OK);
208 LOGI("[DistributedDBCloudAsyncDownloadAssetsTest] CreateDistributedTable %s", table.name.c_str());
209 }
210 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
211 ASSERT_NE(g_cloudStoreHook, nullptr);
212 virtualCloudDb_ = make_shared<VirtualCloudDb>();
213 ASSERT_NE(virtualCloudDb_, nullptr);
214 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
215 virtualAssetLoader_ = make_shared<VirtualAssetLoader>();
216 ASSERT_NE(virtualAssetLoader_, nullptr);
217 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
218 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
219
220 ASSERT_EQ(delegate_->SetCloudDbSchema(schema), DBStatus::OK);
221 }
222
CheckInconsistentCount(sqlite3 * db,int64_t expectCount)223 void CheckInconsistentCount(sqlite3 *db, int64_t expectCount)
224 {
225 EXPECT_EQ(sqlite3_exec(db, QUERY_INCONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
226 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
227 }
228
CloseDb()229 void DistributedDBCloudAsyncDownloadAssetsTest::CloseDb()
230 {
231 if (db_ != nullptr) {
232 sqlite3_close_v2(db_);
233 db_ = nullptr;
234 }
235 if (delegate_ != nullptr) {
236 EXPECT_EQ(g_mgr.CloseStore(delegate_), OK);
237 delegate_ = nullptr;
238 }
239 }
240
241 /**
242 * @tc.name: AsyncDownloadAssetConfig001
243 * @tc.desc: Test config with valid and invalid param.
244 * @tc.type: FUNC
245 * @tc.require:
246 * @tc.author: zqq
247 */
248 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig001, TestSize.Level0)
249 {
250 /**
251 * @tc.steps: step1. Set valid param
252 * @tc.expected: step1.ok
253 */
254 AsyncDownloadAssetsConfig config;
255 AssetsDownloadManager manager;
256 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
257 config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
258 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
259 config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS;
260 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
261
262 /**
263 * @tc.steps: step2. Set invalid param
264 * @tc.expected: step2.invalid args
265 */
266 config.maxDownloadTask += 1u;
267 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
268 config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
269 config.maxDownloadAssetsCount += 1u;
270 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
271 }
272
273 /**
274 * @tc.name: AsyncDownloadAssetConfig002
275 * @tc.desc: Test config work correctly.
276 * @tc.type: FUNC
277 * @tc.require:
278 * @tc.author: zqq
279 */
280 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig002, TestSize.Level1)
281 {
282 /**
283 * @tc.steps: step1. Set valid param twice
284 * @tc.expected: step1. ok
285 */
286 AsyncDownloadAssetsConfig config;
287 config.maxDownloadTask = 10;
288 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
289 config.maxDownloadTask = 1;
290 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
291 /**
292 * @tc.steps: step2. Insert cloud data
293 * @tc.expected: step2. ok
294 */
295 const int cloudCount = 20;
296 auto schema = GetSchema();
297 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
298 /**
299 * @tc.steps: step3. Begin download first, block async task
300 * @tc.expected: step3. ok
301 */
302 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
303 int finishCount = 0;
304 std::mutex finishMutex;
305 std::condition_variable cv;
__anonb165af940202(void *) 306 auto finishAction = [&finishCount, &finishMutex, &cv](void *) {
307 std::lock_guard<std::mutex> autoLock(finishMutex);
308 finishCount++;
309 cv.notify_all();
310 };
311 auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
312 ASSERT_EQ(errCode, E_OK);
313 ASSERT_EQ(listener, nullptr);
314 ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
315 std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
316 ASSERT_EQ(errCode, -E_MAX_LIMITS);
317 ASSERT_NE(listener, nullptr);
318 /**
319 * @tc.steps: step4. Async cloud data
320 * @tc.expected: step4. ok and async task still one
321 */
322 CloudSyncOption option = GetAsyncCloudSyncOption();
323 RelationalTestUtils::CloudBlockSync(option, delegate_);
324 EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
325 /**
326 * @tc.steps: step5. Notify async task finish
327 * @tc.expected: step5. wait util another async task finish
328 */
329 manager->FinishDownload();
330 std::unique_lock uniqueLock(finishMutex);
__anonb165af940302() 331 auto res = cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finishCount]() {
332 return finishCount >= 2; // 2 async task
333 });
334 EXPECT_TRUE(res);
335 listener->Drop(true);
336 }
337
338 /**
339 * @tc.name: AsyncDownloadAssetConfig003
340 * @tc.desc: Test asyncDownloadAssets and compensatedSyncOnly both true.
341 * @tc.type: FUNC
342 * @tc.require:
343 * @tc.author: tankaisheng
344 */
345 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig003, TestSize.Level1)
346 {
347 /**
348 * @tc.steps: step1. Insert cloud data
349 * @tc.expected: step1. ok
350 */
351 const int cloudCount = 10;
352 auto schema = GetSchema();
353 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
354 /**
355 * @tc.steps: step2. set compensatedSyncOnly true and sync return NOT_SUPPORT.
356 * @tc.expected: step2. NOT_SUPPORT
357 */
358 CloudSyncOption option = GetAsyncCloudSyncOption();
359 option.compensatedSyncOnly = true;
360 DBStatus result = delegate_->Sync(option, nullptr);
361 EXPECT_EQ(result, NOT_SUPPORT);
362 }
363
364 /**
365 * @tc.name: FinishListener001
366 * @tc.desc: Test listen download finish event.
367 * @tc.type: FUNC
368 * @tc.require:
369 * @tc.author: zqq
370 */
371 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, FinishListener001, TestSize.Level0)
372 {
373 /**
374 * @tc.steps: step1. Begin download first time
375 * @tc.expected: step1.ok
376 */
377 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
378 std::atomic<bool> finished = false;
__anonb165af940402(void *) 379 auto finishAction = [&finished](void *) {
380 EXPECT_TRUE(finished);
381 };
382 auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
383 ASSERT_EQ(errCode, E_OK);
384 ASSERT_EQ(listener, nullptr);
385 /**
386 * @tc.steps: step2. Begin download twice
387 * @tc.expected: step2. -E_MAX_LIMITS because default one task
388 */
389 std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
390 EXPECT_EQ(errCode, -E_MAX_LIMITS);
391 EXPECT_NE(listener, nullptr);
392 /**
393 * @tc.steps: step3. Finish download
394 * @tc.expected: step3. finished is true in listener
395 */
396 finished = true;
397 manager->FinishDownload();
398 listener->Drop(true);
399 }
400
401 /**
402 * @tc.name: AsyncComplexDownload001
403 * @tc.desc: Test complex async download.
404 * @tc.type: FUNC
405 * @tc.require:
406 * @tc.author: zqq
407 */
408 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload001, TestSize.Level1)
409 {
410 /**
411 * @tc.steps: step1. Set max download task 1
412 * @tc.expected: step1. ok
413 */
414 AsyncDownloadAssetsConfig config;
415 config.maxDownloadTask = 1;
416 config.maxDownloadAssetsCount = 1;
417 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
418 /**
419 * @tc.steps: step2. Insert cloud data
420 * @tc.expected: step2. ok
421 */
422 const int cloudCount = 10;
423 auto schema = GetSchema();
424 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
425 /**
426 * @tc.steps: step3. Async cloud data
427 * @tc.expected: step3. ok
428 */
429 CloudSyncOption option = GetAsyncCloudSyncOption();
430 RelationalTestUtils::CloudBlockSync(option, delegate_);
431 /**
432 * @tc.steps: step3. Block download cloud data
433 * @tc.expected: step3. ok
434 */
435 option.asyncDownloadAssets = false;
436 RelationalTestUtils::CloudBlockSync(option, delegate_);
437 }
438
439 /**
440 * @tc.name: AsyncComplexDownload002
441 * @tc.desc: Test complex async download.
442 * @tc.type: FUNC
443 * @tc.require:
444 * @tc.author: zqq
445 */
446 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload002, TestSize.Level1)
447 {
448 /**
449 * @tc.steps: step1. Set max download task 1
450 * @tc.expected: step1. ok
451 */
452 AsyncDownloadAssetsConfig config;
453 config.maxDownloadTask = 1;
454 config.maxDownloadAssetsCount = 1;
455 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
456 /**
457 * @tc.steps: step2. Insert cloud data
458 * @tc.expected: step2. ok
459 */
460 const int cloudCount = 10;
461 auto schema = GetSchema();
462 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
463 /**
464 * @tc.steps: step3. Complex cloud data
465 * @tc.expected: step3. ok
466 */
467 CloudSyncOption option = GetAsyncCloudSyncOption();
468 for (int i = 0; i < 10; ++i) { // loop 10 times
469 option.asyncDownloadAssets = false;
470 RelationalTestUtils::CloudBlockSync(option, delegate_);
471 option.asyncDownloadAssets = true;
472 RelationalTestUtils::CloudBlockSync(option, delegate_);
473 }
474 }
475
476 /**
477 * @tc.name: AsyncAbnormalDownload001
478 * @tc.desc: Test abnormal async download.
479 * @tc.type: FUNC
480 * @tc.require:
481 * @tc.author: zqq
482 */
483 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload001, TestSize.Level4)
484 {
485 /**
486 * @tc.steps: step1. Set max download task 1
487 * @tc.expected: step1. ok
488 */
489 AsyncDownloadAssetsConfig config;
490 config.maxDownloadTask = 1;
491 config.maxDownloadAssetsCount = 1;
492 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
493 /**
494 * @tc.steps: step2. Insert cloud data
495 * @tc.expected: step2. ok
496 */
497 const int cloudCount = 10;
498 auto schema = GetSchema();
499 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
500 /**
501 * @tc.steps: step3. Fork download abnormal
502 */
503 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
504 /**
505 * @tc.steps: step4. Async cloud data
506 * @tc.expected: step4. ok
507 */
508 CloudSyncOption option = GetAsyncCloudSyncOption();
509 RelationalTestUtils::CloudBlockSync(option, delegate_);
510 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
511 EXPECT_EQ(status, OK);
512 EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
513 EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
514 std::this_thread::sleep_for(std::chrono::seconds(1));
515 /**
516 * @tc.steps: step5. Async cloud data with download ok
517 * @tc.expected: step5. ok
518 */
519 virtualAssetLoader_->SetDownloadStatus(OK);
520 LOGW("set download ok");
521 int count = 0;
522 std::mutex countMutex;
523 std::condition_variable cv;
524 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940502(const std::string &tableName, std::map<std::string, Assets> &) 525 std::map<std::string, Assets> &) {
526 std::lock_guard<std::mutex> autoLock(countMutex);
527 count++;
528 if (count == 1) {
529 std::this_thread::sleep_for(std::chrono::seconds(1));
530 }
531 cv.notify_all();
532 });
533 RelationalTestUtils::CloudBlockSync(option, delegate_);
534 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af940602() 535 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
536 return count >= cloudCount;
537 });
538 std::this_thread::sleep_for(std::chrono::seconds(1));
539 std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
540 EXPECT_EQ(status, OK);
541 EXPECT_EQ(downloadCount, 0);
542 virtualAssetLoader_->ForkDownload(nullptr);
543 }
544
545 /**
546 * @tc.name: AsyncAbnormalDownload002
547 * @tc.desc: Test abnormal sync download.
548 * @tc.type: FUNC
549 * @tc.require:
550 * @tc.author: zqq
551 */
552 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload002, TestSize.Level1)
553 {
554 /**
555 * @tc.steps: step1. Insert cloud data
556 * @tc.expected: step1. ok
557 */
558 const int cloudCount = 10;
559 auto schema = GetSchema();
560 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
561 /**
562 * @tc.steps: step2. Fork download abnormal
563 */
564 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
565 /**
566 * @tc.steps: step3. Sync cloud data
567 * @tc.expected: step3. DB_ERROR and not exist downloading assets count
568 */
569 CloudSyncOption option = GetAsyncCloudSyncOption();
570 option.asyncDownloadAssets = false;
571 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, CLOUD_ERROR);
572 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
573 EXPECT_EQ(status, OK);
574 EXPECT_EQ(downloadCount, 0);
575 virtualAssetLoader_->SetDownloadStatus(OK);
576 }
577
578 /**
579 * @tc.name: AsyncAbnormalDownload003
580 * @tc.desc: Test abnormal async download.
581 * @tc.type: FUNC
582 * @tc.require:
583 * @tc.author: zqq
584 */
585 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload003, TestSize.Level4)
586 {
587 /**
588 * @tc.steps: step1. Set max download task 1
589 * @tc.expected: step1. ok
590 */
591 AsyncDownloadAssetsConfig config;
592 config.maxDownloadTask = 1;
593 config.maxDownloadAssetsCount = 4; // 1 record has 2 asset, config 1 batch has 2 record
594 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
595 /**
596 * @tc.steps: step2. Insert cloud data
597 * @tc.expected: step2. ok
598 */
599 const int cloudCount = 5;
600 auto schema = GetSchema();
601 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
602 /**
603 * @tc.steps: step3. Fork download abnormal
604 */
605 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
606 /**
607 * @tc.steps: step4. Sync cloud data
608 * @tc.expected: step4. DB_ERROR and exist downloading assets count
609 */
610 CloudSyncOption option = GetAsyncCloudSyncOption();
611 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
612 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
613 EXPECT_EQ(status, OK);
614 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
615 CheckInconsistentCount(db_, 5);
616 std::this_thread::sleep_for(std::chrono::seconds(1));
617 /**
618 * @tc.steps: step5. Sync cloud data again and block upload
619 * @tc.expected: step5. DB_ERROR and exist downloading assets count
620 */
621 EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
622 virtualAssetLoader_->SetDownloadStatus(OK);
__anonb165af940702(const std::string &, VBucket &) 623 virtualCloudDb_->ForkUpload([delegate = delegate_](const std::string &, VBucket &) {
624 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
625 auto [ret, count] = delegate->GetDownloadingAssetsCount();
626 EXPECT_EQ(ret, OK);
627 EXPECT_EQ(count, 0);
628 });
629 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
630 virtualAssetLoader_->ForkBatchDownload(nullptr);
631 }
632
633 /**
634 * @tc.name: AsyncAbnormalDownload004
635 * @tc.desc: Test update trigger retain 0x1000 flag.
636 * @tc.type: FUNC
637 * @tc.require:
638 * @tc.author: zqq
639 */
640 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload004, TestSize.Level1)
641 {
642 /**
643 * @tc.steps: step1. Set async config
644 * @tc.expected: step1. ok
645 */
646 AsyncDownloadAssetsConfig config;
647 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
648 /**
649 * @tc.steps: step2. Insert cloud data
650 * @tc.expected: step2. ok
651 */
652 const int cloudCount = 1;
653 auto schema = GetSchema();
654 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
655 /**
656 * @tc.steps: step3. Fork download abnormal
657 */
658 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
659 /**
660 * @tc.steps: step4. Sync cloud data
661 * @tc.expected: step4. DB_ERROR and exist downloading assets count
662 */
663 CloudSyncOption option = GetAsyncCloudSyncOption();
664 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
665 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
666 EXPECT_EQ(status, OK);
667 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
668 /**
669 * @tc.steps: step5. Update local data
670 * @tc.expected: step5. Exist downloading assets count
671 */
672 EXPECT_EQ(RDBDataGenerator::UpsertLocalDBData(0, cloudCount, db_,
673 GetTableSchema("AsyncDownloadAssetsTest", true)), OK);
674 std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
675 EXPECT_EQ(status, OK);
676 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
677 }
678
679 /**
680 * @tc.name: AsyncAbnormalDownload005
681 * @tc.desc: Test download assets which was locked
682 * @tc.type: FUNC
683 * @tc.require:
684 * @tc.author: liaoyonghuang
685 */
686 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload005, TestSize.Level1)
687 {
688 /**
689 * @tc.steps: step1. Set async config
690 * @tc.expected: step1. ok
691 */
692 AsyncDownloadAssetsConfig config;
693 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
694 /**
695 * @tc.steps: step2. Init data
696 * @tc.expected: step2. ok
697 */
698 const int cloudCount = 10;
699 auto schema = GetSchema();
700 ASSERT_TRUE(!schema.tables.empty());
701 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
702 CloudSyncOption option = GetAsyncCloudSyncOption();
703 option.asyncDownloadAssets = false;
704 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
705 /**
706 * @tc.steps: step3. Update cloud data and lock local data
707 * @tc.expected: step3. ok
708 */
709 auto [records, extends] =
710 RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, schema.tables.front().fields);
711 Asset asset = {.name = "asset_1", .hash = "new_hash"};
712 for (auto &record : records) {
713 record.insert_or_assign("asset_1", asset);
714 }
715 std::string table = schema.tables.front().name;
716 EXPECT_EQ(virtualCloudDb_->BatchUpdate(table, std::move(records), extends), OK);
717 virtualAssetLoader_->ForkDownload([&](const std::string &tableName,
__anonb165af940802(const std::string &tableName, std::map<std::string, Assets> &) 718 std::map<std::string, Assets> &) {
719 std::vector<std::vector<uint8_t>> hashKey;
720 CloudDBSyncUtilsTest::GetHashKey(table, "data_key < 5", db_, hashKey); // lock half of the data
721 EXPECT_EQ(Lock(table, hashKey, db_), OK);
722 });
723 /**
724 * @tc.steps: step4. Sync and check data
725 * @tc.expected: step4. ok
726 */
727 option.asyncDownloadAssets = true;
728 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
729 std::this_thread::sleep_for(std::chrono::seconds(1));
730 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
731 EXPECT_EQ(status, OK);
732 EXPECT_EQ(downloadCount, cloudCount / 2); // half of the data was not downloaded due to being locked
733 virtualAssetLoader_->ForkDownload(nullptr);
734 }
735
736 /**
737 * @tc.name: AsyncNormalDownload001
738 * @tc.desc: Test abnormal async download.
739 * @tc.type: FUNC
740 * @tc.require:
741 * @tc.author: zqq
742 */
743 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload001, TestSize.Level1)
744 {
745 /**
746 * @tc.steps: step1. Register observer
747 * @tc.expected: step1. ok
748 */
749 auto rdbObserver = new(std::nothrow) RelationalStoreObserverUnitTest();
750 ASSERT_NE(rdbObserver, nullptr);
__anonb165af940902() 751 ResFinalizer resFinalizer([rdbObserver, this]() {
752 delegate_->UnRegisterObserver(rdbObserver);
753 delete rdbObserver;
754 });
755 EXPECT_EQ(delegate_->RegisterObserver(rdbObserver), OK);
756 /**
757 * @tc.steps: step2. Insert cloud data
758 * @tc.expected: step2. ok
759 */
760 const int cloudCount = 1;
761 auto schema = GetSchema();
762 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
763 EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
764 /**
765 * @tc.steps: step3. Block upload while async donwload asset
766 * @tc.expected: step3. Sync ok
767 */
768 auto hook = RelationalTestUtils::GetRDBStorageHook(USER_ID, APP_ID, STORE_ID_1, storePath_);
769 ASSERT_NE(hook, nullptr);
__anonb165af940a02() 770 hook->SetBeforeUploadTransaction([]() {
771 int count = 1;
772 const int maxLoop = 5;
773 do {
774 std::this_thread::sleep_for(std::chrono::seconds(1));
775 count++;
776 } while (RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetCurrentDownloadCount() > 0 &&
777 count < maxLoop);
778 LOGW("AsyncNormalDownload001 End hook");
779 });
780 CloudSyncOption option = GetAsyncCloudSyncOption();
781 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
782 EXPECT_TRUE(rdbObserver->IsAssetChange(GetTableSchema("AsyncDownloadAssetsTest").name));
783 hook->SetBeforeUploadTransaction(nullptr);
784 }
785
786 /**
787 * @tc.name: AsyncNormalDownload002
788 * @tc.desc: Test sync download when download task pool is full
789 * @tc.type: FUNC
790 * @tc.require:
791 * @tc.author: lhy
792 */
793 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload002, TestSize.Level4)
794 {
795 /**
796 * @tc.steps: step1. Set max download task 1
797 * @tc.expected: step1. Ok
798 */
799 AsyncDownloadAssetsConfig config;
800 config.maxDownloadTask = 1;
801 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
802 /**
803 * @tc.steps: step2. Insert cloud data
804 * @tc.expected: step2. Ok
805 */
806 const int cloudCount = 1;
807 auto schema = GetSchema();
808 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
809 /**
810 * @tc.steps: step3. Fork download abnormal
811 */
812 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
813 /**
814 * @tc.steps: step4. Async cloud data, with abnormal result
815 * @tc.expected: step4. Ok
816 */
817 CloudSyncOption option = GetAsyncCloudSyncOption();
818 RelationalTestUtils::CloudBlockSync(option, delegate_);
819 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
820 EXPECT_EQ(status, OK);
821 EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
822 EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
823 /**
824 * @tc.steps: step5. Wait for failed download to finish
825 * @tc.expected: step5. Download task changes from 1 to 0
826 */
827 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
828 std::this_thread::sleep_for(std::chrono::seconds(1));
829 EXPECT_EQ(manager->GetCurrentDownloadCount(), 0u);
830 /**
831 * @tc.steps: step6. Start a new download task to reach maxDownloadTask
832 * @tc.expected: step6. 1 task is downloading
833 */
834 auto [errCode, listener] = manager->BeginDownloadWithListener(nullptr);
835 ASSERT_EQ(errCode, E_OK);
836 ASSERT_EQ(listener, nullptr);
837 ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
838 /**
839 * @tc.steps: step7. Set download status to ok then try sync while task pool is full
840 * @tc.expected: step7. Download should be waiting instead of doing compensated sync
841 */
842 virtualAssetLoader_->SetDownloadStatus(OK);
843 LOGW("set download ok");
844 int count = 0;
845 std::mutex countMutex;
846 std::condition_variable cv;
847 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940b02(const std::string &tableName, std::map<std::string, Assets> &) 848 std::map<std::string, Assets> &) {
849 std::lock_guard<std::mutex> autoLock(countMutex);
850 count++;
851 if (count == 1) {
852 std::this_thread::sleep_for(std::chrono::seconds(1));
853 }
854 cv.notify_all();
855 });
856 RelationalTestUtils::CloudBlockSync(option, delegate_);
857 std::this_thread::sleep_for(std::chrono::seconds(1));
858 EXPECT_EQ(count, 0);
859 /**
860 * @tc.steps: step8. Finish the download task, to let sync continue
861 * @tc.expected: step8. Only 1 actural download through virtualAssetLoader_
862 */
863 manager->FinishDownload();
864 std::this_thread::sleep_for(std::chrono::seconds(1));
865 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af940c02() 866 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
867 return count >= cloudCount;
868 });
869 EXPECT_EQ(count, 1);
870 virtualAssetLoader_->ForkDownload(nullptr);
871 }
872
873 /**
874 * @tc.name: AsyncNormalDownload004
875 * @tc.desc: Test multiple tables and multiple batches of asset downloads
876 * @tc.type: FUNC
877 * @tc.require:
878 * @tc.author: liaoyonghuang
879 */
880 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload004, TestSize.Level1)
881 {
882 /**
883 * @tc.steps: step1. Set max download task 1
884 * @tc.expected: step1. Ok
885 */
886 AsyncDownloadAssetsConfig config;
887 config.maxDownloadAssetsCount = 25;
888 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
889 /**
890 * @tc.steps: step2. Insert cloud data
891 * @tc.expected: step2. Ok
892 */
893 const int cloudCount = 100;
894 std::string table1 = "TABLE1";
895 std::string table2 = "TABLE2";
896 DataBaseSchema schema;
897 schema.tables.push_back(GetTableSchema(table1));
898 schema.tables.push_back(GetTableSchema(table2));
899 EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
900 auto [record1, extend1] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, GetTableSchema(table1).fields);
901 EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table1, std::move(record1), extend1), OK);
902 auto [record2, extend2] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, cloudCount,
903 GetTableSchema(table2).fields);
904 EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table2, std::move(record2), extend2), OK);
905 /**
906 * @tc.steps: step3. async asset task submit
907 * @tc.expected: step3. Ok
908 */
909 int assetsDownloadTime = 0;
910 virtualAssetLoader_->ForkDownload([&table1, &table2, &assetsDownloadTime](const std::string &tableName,
__anonb165af940d02(const std::string &tableName, std::map<std::string, Assets> &) 911 std::map<std::string, Assets> &) {
912 if (assetsDownloadTime < 100) { // 100 assets
913 EXPECT_EQ(tableName, table1);
914 } else {
915 EXPECT_EQ(tableName, table2);
916 }
917 assetsDownloadTime++;
918 });
919 CloudSyncOption option;
920 option.devices = {"cloud"};
921 option.asyncDownloadAssets = true;
922 Query query = Query::Select().FromTable({table1, table2});
923 option.query = query;
924 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
925 std::this_thread::sleep_for(std::chrono::seconds(5));
926 EXPECT_EQ(assetsDownloadTime, 200);
927 virtualAssetLoader_->ForkDownload(nullptr);
928 }
929
930 /**
931 * @tc.name: AsyncNormalDownload005
932 * @tc.desc: Test concurrent async download of assets and sync data.
933 * @tc.type: FUNC
934 * @tc.require:
935 * @tc.author: liaoyonghuang
936 */
937 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload005, TestSize.Level1)
938 {
939 /**
940 * @tc.steps: step1. Set max download task 1
941 * @tc.expected: step1. ok
942 */
943 AsyncDownloadAssetsConfig config;
944 config.maxDownloadTask = 12;
945 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
946 /**
947 * @tc.steps: step2. Insert cloud data and sync concurrently
948 * @tc.expected: step2. ok
949 */
950 const int cloudCount = 200;
951 auto schema = GetSchema();
952 int threadNum = 10;
953 CloudSyncOption option = GetAsyncCloudSyncOption();
954 thread *syncThreads[threadNum];
955 for (int i = 0; i < threadNum; i++) {
__anonb165af940e02() 956 syncThreads[i] = new thread([&]() {
957 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
958 RelationalTestUtils::CloudBlockSync(option, delegate_);
959 DeleteLocalData(db_, "AsyncDownloadAssetsTest");
960 });
961 }
962 for (auto &thread : syncThreads) {
963 thread->join();
964 delete thread;
965 }
966 }
967
968 /**
969 * @tc.name: AsyncAbnormalDownload006
970 * @tc.desc: Test abnormal async download.
971 * @tc.type: FUNC
972 * @tc.require:
973 * @tc.author: suyue
974 */
975 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload006, TestSize.Level4)
976 {
977 /**
978 * @tc.steps: step1. Set config and insert 70 cloud data
979 * @tc.expected: step1. ok
980 */
981 AsyncDownloadAssetsConfig config;
982 config.maxDownloadTask = 5;
983 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
984 const int cloudCount = 70;
985 auto schema = GetSchema();
986 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
987
988 /**
989 * @tc.steps: step2. Fork download abnormal for 0-10 records
990 * @tc.expected: step2. ok
991 */
992 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
993 uint32_t failNum = 10;
994 const DownloadFailRange setRange = {.isAllFail = false, .failBeginIndex = 0, .failEndIndex = failNum};
995 virtualAssetLoader_->SetDownloadFailRange(setRange);
996
997 /**
998 * @tc.steps: step3. Async cloud data
999 * @tc.expected: step3. ok
1000 */
1001 int count = 0;
1002 std::mutex countMutex;
1003 std::condition_variable cv;
1004 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940f02(const std::string &tableName, std::map<std::string, Assets> &) 1005 std::map<std::string, Assets> &) {
1006 std::lock_guard<std::mutex> autoLock(countMutex);
1007 count++;
1008 if (count == 1) {
1009 std::this_thread::sleep_for(std::chrono::seconds(1));
1010 }
1011 cv.notify_all();
1012 });
1013
1014 CloudSyncOption option = GetAsyncCloudSyncOption();
1015 RelationalTestUtils::CloudBlockSync(option, delegate_);
1016 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af941002() 1017 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
1018 return count >= cloudCount;
1019 });
1020 std::this_thread::sleep_for(std::chrono::seconds(1));
1021 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1022 EXPECT_EQ(status, OK);
1023 EXPECT_EQ(downloadCount, static_cast<int32_t>(failNum * 2)); // 1 record has 2 asset
1024
1025 virtualAssetLoader_->SetDownloadStatus(OK);
1026 virtualAssetLoader_->Reset();
1027 virtualAssetLoader_->ForkDownload(nullptr);
1028 }
1029
UpdateLocalData(sqlite3 * & db,const std::string & tableName,int32_t begin,int32_t end)1030 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalData(
1031 sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end)
1032 {
1033 const string sql = "update " + tableName + " set int_field = int_field+1 " + "where pk>=" + std::to_string(begin) +
1034 " and pk<=" + std::to_string(end) + ";";
1035 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1036 LOGW("update local data finished");
1037 }
1038
DeleteLocalData(sqlite3 * & db,const std::string & tableName)1039 void DistributedDBCloudAsyncDownloadAssetsTest::DeleteLocalData(sqlite3 *&db, const std::string &tableName)
1040 {
1041 const string sql = "delete from " + tableName + " where pk >= 0;";
1042 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1043 LOGW("delete local data finished");
1044 }
1045
CheckLogTable(sqlite3 * & db,const std::string & tableName,int count)1046 void DistributedDBCloudAsyncDownloadAssetsTest::CheckLogTable(sqlite3 *&db, const std::string &tableName, int count)
1047 {
1048 const string sql = "select COUNT(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key>0;";
1049 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1050 reinterpret_cast<void *>(count), nullptr), SQLITE_OK);
1051 LOGW("check log table finished");
1052 }
1053
UpdateLocalAndCheckUploadCount(const bool & isAsync,const int & dataCount,const int & expectCount)1054 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalAndCheckUploadCount(const bool &isAsync,
1055 const int &dataCount, const int &expectCount)
1056 {
1057 /**
1058 * @tc.steps: step1. Set async config
1059 * @tc.expected: step1. ok
1060 */
1061 AsyncDownloadAssetsConfig config;
1062 config.maxDownloadTask = 12;
1063 config.maxDownloadAssetsCount = 100;
1064 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1065 /**
1066 * @tc.steps: step2. Init data, set download status false and sync
1067 * @tc.expected: step2. async download will return OK and sync download will return CLOUD_ERROR
1068 */
1069 const int cloudCount = dataCount;
1070 auto schema = GetSchema();
1071 ASSERT_TRUE(!schema.tables.empty());
1072 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1073 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
1074 CloudSyncOption option = GetAsyncCloudSyncOption();
1075 option.asyncDownloadAssets = isAsync;
1076 DBStatus expectStatus = isAsync ? OK : CLOUD_ERROR;
1077 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, expectStatus);
1078 std::this_thread::sleep_for(std::chrono::seconds(1));
1079 /**
1080 * @tc.steps: step3. Modify all local data
1081 * @tc.expected: step3. OK
1082 */
1083 UpdateLocalData(db_, "AsyncDownloadAssetsTest", 0, cloudCount);
1084 std::this_thread::sleep_for(std::chrono::seconds(1));
1085 /**
1086 * @tc.steps: step4. Fork upload to count number of uploaded records
1087 * @tc.expected: step4. OK
1088 */
1089 int count = 0;
1090 std::mutex countMutex;
1091 std::condition_variable cv;
1092 virtualCloudDb_->ForkUpload([&count, &countMutex, &cv](const std::string&, VBucket&) {
1093 std::lock_guard<std::mutex> autoLock(countMutex);
1094 count++;
1095 cv.notify_all();
1096 });
1097 virtualAssetLoader_->SetDownloadStatus(OK);
1098 virtualAssetLoader_->Reset();
1099 /**
1100 * @tc.steps: step5. Sync again and check upload count
1101 * @tc.expected: step5. OK
1102 */
1103 RelationalTestUtils::CloudBlockSync(option, delegate_);
1104 std::unique_lock<std::mutex> uniqueLock(countMutex);
1105 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count, &cloudCount, &isAsync]() {
1106 return !isAsync || count >= cloudCount;
1107 });
1108 EXPECT_EQ(count, expectCount);
1109 /**
1110 * @tc.steps: step6. Release resources
1111 * @tc.expected: step6. OK
1112 */
1113 virtualAssetLoader_->Reset();
1114 virtualAssetLoader_->ForkDownload(nullptr);
1115 virtualCloudDb_->ForkUpload(nullptr);
1116 }
1117
1118 /**
1119 * @tc.name: AsyncAbnormalDownload007
1120 * @tc.desc: Test in async download mode and asset is not downloaded, the local data can be uploaded
1121 * @tc.type: FUNC
1122 * @tc.require:
1123 * @tc.author: tankaisheng
1124 */
1125 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload007, TestSize.Level1)
1126 {
1127 int cloudCount = 10;
1128 EXPECT_NO_FATAL_FAILURE(UpdateLocalAndCheckUploadCount(true, cloudCount, cloudCount));
1129 }
1130
1131 /**
1132 * @tc.name: AsyncAbnormalDownload009
1133 * @tc.desc: Test in sync download mode and asset is not downloaded, the local data will be ignored when upload
1134 * @tc.type: FUNC
1135 * @tc.require:
1136 * @tc.author: liuhongyang
1137 */
1138 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload009, TestSize.Level1)
1139 {
1140 int cloudCount = 10;
1141 UpdateLocalAndCheckUploadCount(false, cloudCount, 0);
1142 }
1143
ModifySkippedAsset(int rowIndex,std::map<std::string,Assets> & assets,DBStatus fakeStatus)1144 DBStatus ModifySkippedAsset(int rowIndex, std::map<std::string, Assets> &assets, DBStatus fakeStatus)
1145 {
1146 if (rowIndex != 1) {
1147 return OK;
1148 }
1149 for (auto &asset : assets) {
1150 for (auto &item : asset.second) {
1151 if (item.name == "asset_1" + std::to_string(rowIndex)) {
1152 item.status = static_cast<uint32_t>(AssetStatus::ABNORMAL);
1153 }
1154 }
1155 }
1156 return fakeStatus;
1157 }
1158
DoSkipAssetDownload(SkipAssetTestParamT param)1159 void DistributedDBCloudAsyncDownloadAssetsTest::DoSkipAssetDownload(SkipAssetTestParamT param)
1160 {
1161 /**
1162 * @tc.steps: step1 change max download task
1163 * @tc.expected: step1. Ok
1164 */
1165 AsyncDownloadAssetsConfig config;
1166 config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK; // maximum of tasks
1167 config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS; // maximum of asset counts
1168 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1169 /**
1170 * @tc.steps: step2 Insert cloud data
1171 * @tc.expected: step2. Ok
1172 */
1173 const int cloudCount = 5;
1174 auto schema = GetSchema();
1175 std::string tableName = "AsyncDownloadAssetsTest";
1176 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(param.startIndex, cloudCount, 0, schema, virtualCloudDb_), OK);
1177 CloudDBSyncUtilsTest::CheckLocalRecordNum(db_, tableName, param.startIndex);
1178 /**
1179 * @tc.steps: step3 Fork download abnormal
1180 * @tc.expected: step3. Ok
1181 */
1182 int count = 0;
1183 std::mutex countMutex;
1184 std::condition_variable cv;
1185 if (param.useBatch) {
1186 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1187 virtualAssetLoader_->ForkBatchDownload([&count, &countMutex, &cv, param](int rowIndex,
1188 std::map<std::string, Assets> &assets) {
1189 std::lock_guard<std::mutex> autoLock(countMutex);
1190 count++;
1191 auto ret = ModifySkippedAsset(rowIndex, assets, param.downloadRes);
1192 if (count == 1) {
1193 std::this_thread::sleep_for(std::chrono::seconds(1));
1194 }
1195 cv.notify_all();
1196 return ret;
1197 });
1198 } else {
1199 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1200 virtualAssetLoader_->SetDownloadStatus(param.downloadRes);
1201 }
1202 /**
1203 * @tc.steps: step4. sync cloud data
1204 * @tc.expected: step4. Ok
1205 */
1206 CloudSyncOption option = GetAsyncCloudSyncOption();
1207 option.asyncDownloadAssets = param.useAsync;
1208 RelationalTestUtils::CloudBlockSync(option, delegate_, param.expectSyncRes);
1209 /**
1210 * @tc.steps: step5. wait for sync to finish
1211 * @tc.expected: step5. check local record number and inconsistent count
1212 */
1213 std::unique_lock<std::mutex> uniqueLock(countMutex);
1214 auto res = cv.wait_for(uniqueLock, std::chrono::seconds(DBConstant::MAX_SYNC_TIMEOUT),
1215 [&count, param, cloudCount] {
1216 return count >= cloudCount || !param.useBatch;
1217 });
1218 EXPECT_TRUE(res);
1219 std::this_thread::sleep_for(std::chrono::seconds(1));
1220 CloudDBSyncUtilsTest::CheckLocalRecordNum(db_, tableName, param.startIndex + cloudCount);
1221 CheckInconsistentCount(db_, param.expectInconsistentCount);
1222 /**
1223 * @tc.steps: step6. clear
1224 */
1225 virtualAssetLoader_->SetDownloadStatus(OK);
1226 virtualAssetLoader_->Reset();
1227 virtualAssetLoader_->ForkBatchDownload(nullptr);
1228 }
1229
1230 /**
1231 * @tc.name: SkipAssetDownloadTest001
1232 * @tc.desc: Test async batch download returns Skip_Assets
1233 * @tc.type: FUNC
1234 * @tc.require:
1235 * @tc.author: liuhongyang
1236 */
1237 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest001, TestSize.Level1)
1238 {
1239 /**
1240 * @tc.expected: step1. sync return OK, and has 1 inconsistent records
1241 */
1242 SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = true, .useAsync = true,
1243 .startIndex = 0, .expectInconsistentCount = 1, .expectSyncRes = OK};
1244 DoSkipAssetDownload(param);
1245 }
1246
1247 /**
1248 * @tc.name: SkipAssetDownloadTest002
1249 * @tc.desc: Test sync batch download returns Skip_Assets
1250 * @tc.type: FUNC
1251 * @tc.require:
1252 * @tc.author: liuhongyang
1253 */
1254 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest002, TestSize.Level1)
1255 {
1256 /**
1257 * @tc.expected: step1. sync return OK, and has 1 inconsistent records
1258 */
1259 SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = true, .useAsync = false,
1260 .startIndex = 0, .expectInconsistentCount = 1, .expectSyncRes = OK};
1261 DoSkipAssetDownload(param);
1262 }
1263
1264 /**
1265 * @tc.name: SkipAssetDownloadTest003
1266 * @tc.desc: Test sync one-by-one download returns Skip_Assets
1267 * @tc.type: FUNC
1268 * @tc.require:
1269 * @tc.author: liuhongyang
1270 */
1271 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest003, TestSize.Level1)
1272 {
1273 /**
1274 * @tc.expected: step1. sync return OK, and has 5 inconsistent records
1275 */
1276 SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = false, .useAsync = false,
1277 .startIndex = 0, .expectInconsistentCount = 5, .expectSyncRes = OK};
1278 DoSkipAssetDownload(param);
1279 }
1280
1281 /**
1282 * @tc.name: AsyncAbnormalDownload008
1283 * @tc.desc: Test the total count of download after download failure
1284 * @tc.type: FUNC
1285 * @tc.require:
1286 * @tc.author: bty
1287 */
1288 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload008, TestSize.Level1)
1289 {
1290 /**
1291 * @tc.steps: step1. Set max download task 1
1292 * @tc.expected: step1. ok
1293 */
1294 AsyncDownloadAssetsConfig config;
1295 config.maxDownloadTask = 1;
1296 config.maxDownloadAssetsCount = 2;
1297 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1298 /**
1299 * @tc.steps: step2. Insert cloud data
1300 * @tc.expected: step2. ok
1301 */
1302 const int cloudCount = 4;
1303 auto schema = GetSchema();
1304 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1305 /**
1306 * @tc.steps: step3. Fork download return cloud error
1307 * @tc.expected: step3. CLOUD_ERROR
1308 */
1309 int downloadIndex = 0;
1310 int failedCount = 2;
1311 virtualAssetLoader_->ForkBatchDownload([&downloadIndex, failedCount](
__anonb165af941502( int rowIndex, std::map<std::string, Assets> &assets) 1312 int rowIndex, std::map<std::string, Assets> &assets) {
1313 downloadIndex++;
1314 if (downloadIndex <= failedCount) {
1315 return CLOUD_ERROR;
1316 }
1317 return OK;
1318 });
1319 /**
1320 * @tc.steps: step4. Sync
1321 * @tc.expected: step4. ok
1322 */
1323 CloudSyncOption option = GetAsyncCloudSyncOption();
1324 RelationalTestUtils::CloudBlockSync(option, delegate_);
1325 std::this_thread::sleep_for(std::chrono::seconds(1));
1326 /**
1327 * @tc.steps: step5. Check download count
1328 * @tc.expected: step5. ok
1329 */
1330 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1331 EXPECT_EQ(status, OK);
1332 EXPECT_EQ(downloadCount, cloudCount);
1333 virtualAssetLoader_->ForkBatchDownload(nullptr);
1334 }
1335
1336 /**
1337 * @tc.name: AsyncAbnormalDownload010
1338 * @tc.desc: Test assets is async downloading when delete local data.
1339 * @tc.type: FUNC
1340 * @tc.require:
1341 * @tc.author: tankaisheng
1342 */
1343 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload010, TestSize.Level1)
1344 {
1345 /**
1346 * @tc.steps: step1. Set max download task 1
1347 * @tc.expected: step1. ok
1348 */
1349 AsyncDownloadAssetsConfig config;
1350 config.maxDownloadTask = 1;
1351 config.maxDownloadAssetsCount = 2;
1352 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1353 /**
1354 * @tc.steps: step2. Insert cloud data
1355 * @tc.expected: step2. ok
1356 */
1357 const int cloudCount = 2000;
1358 auto schema = GetSchema();
1359 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1360 /**
1361 * @tc.steps: step3. Sync
1362 * @tc.expected: step3. ok
1363 */
1364 CloudSyncOption option = GetAsyncCloudSyncOption();
1365 RelationalTestUtils::CloudBlockSync(option, delegate_);
1366 /**
1367 * @tc.steps: step4. Delete local data.
1368 * @tc.expected: step4. ok
1369 */
1370 DeleteLocalData(db_, "AsyncDownloadAssetsTest");
1371 /**
1372 * @tc.steps: step5. Check log table.
1373 * @tc.expected: step5. ok
1374 */
1375 CheckLogTable(db_, "AsyncDownloadAssetsTest", 0);
1376 }
1377
1378 /**
1379 * @tc.name: TriggerAsyncTask001
1380 * @tc.desc: Test trigger async task.
1381 * @tc.type: FUNC
1382 * @tc.require:
1383 * @tc.author: zqq
1384 */
1385 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, TriggerAsyncTask001, TestSize.Level1)
1386 {
1387 auto storage = std::make_shared<MockICloudSyncStorageInterface>();
1388 ASSERT_NE(storage, nullptr);
1389 auto proxy = StorageProxy::GetCloudDb(storage.get());
1390 ASSERT_NE(proxy, nullptr);
1391 auto syncer = new(std::nothrow) VirtualCloudSyncer(proxy);
1392 ASSERT_NE(syncer, nullptr);
1393 /**
1394 * @tc.steps: step1. Trigger async task with not exist table contain assets.
1395 */
1396 syncer->TriggerAsyncTask();
1397 /**
1398 * @tc.steps: step2. Trigger async task with exist table contain assets.
1399 */
1400 EXPECT_CALL(*storage, IsExistTableContainAssets).WillRepeatedly(testing::Return(true));
__anonb165af941602() 1401 EXPECT_CALL(*storage, GetDownloadAssetTable).WillRepeatedly([]() {
1402 std::pair<int, std::vector<std::string>> res;
1403 return res;
1404 });
1405 syncer->TriggerAsyncTask();
1406 syncer->WaitTaskFinished();
1407 /**
1408 * @tc.steps: step3. Async trigger and wait finished.
1409 */
__anonb165af941702() 1410 std::thread t1([syncer]() {
1411 for (int i = 0; i < 1000; ++i) {
1412 syncer->TriggerAsyncTask();
1413 }
1414 });
__anonb165af941802() 1415 std::thread t2([syncer]() {
1416 for (int i = 0; i < 1000; ++i) {
1417 syncer->WaitTaskFinished();
1418 }
1419 });
1420 t1.join();
1421 t2.join();
1422 syncer->Close();
1423 RefObject::KillAndDecObjRef(syncer);
1424 }
1425
1426 /**
1427 * @tc.name: AsyncAbnormalDownload011
1428 * @tc.desc: Test set reference when async downloading.
1429 * @tc.type: FUNC
1430 * @tc.require:
1431 * @tc.author: tankaisheng
1432 */
1433 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload011, TestSize.Level1)
1434 {
1435 /**
1436 * @tc.steps: step1. Set max download task 1
1437 * @tc.expected: step1. ok
1438 */
1439 AsyncDownloadAssetsConfig config;
1440 config.maxDownloadTask = 1;
1441 config.maxDownloadAssetsCount = 2;
1442 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1443 /**
1444 * @tc.steps: step2. Insert cloud data
1445 * @tc.expected: step2. ok
1446 */
1447 const int cloudCount = 100;
1448 auto schema = GetSchema();
1449 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1450 /**
1451 * @tc.steps: step3. Fork download set reference
1452 * @tc.expected: step3. ok
1453 */
1454 TableReferenceProperty tableReferenceProperty;
1455 tableReferenceProperty.sourceTableName = "AsyncDownloadAssetsTest";
1456 tableReferenceProperty.targetTableName = "TABLE1";
1457 std::map<std::string, std::string> columns;
1458 columns["int_field"] = "int_field";
1459 tableReferenceProperty.columns = columns;
1460 CloudSyncOption option = GetAsyncCloudSyncOption();
1461
1462 std::mutex mtx;
1463 std::condition_variable cv;
1464 bool t2_started = false;
1465
__anonb165af941902() 1466 std::thread t1([this, option, &mtx, &cv, &t2_started]() {
1467 {
1468 std::unique_lock<std::mutex> lock(mtx);
1469 t2_started = true;
1470 cv.notify_one();
1471 }
1472 RelationalTestUtils::CloudBlockSync(option, delegate_);
1473 });
1474
__anonb165af941a02null1475 std::thread t2([this, tableReferenceProperty, &mtx, &cv, &t2_started] {
1476 std::unique_lock<std::mutex> lock(mtx);
1477 cv.wait(lock, [&]{ return t2_started; });
1478 EXPECT_EQ(delegate_->SetReference({tableReferenceProperty}), OK);
1479 });
1480
1481 t1.join();
1482 t2.join();
1483 /**
1484 * @tc.steps: step5. Check download count
1485 * @tc.expected: step5. ok
1486 */
1487 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1488 EXPECT_EQ(status, OK);
1489 EXPECT_NE(downloadCount, 0);
1490 virtualAssetLoader_->ForkBatchDownload(nullptr);
1491 }
1492 }