1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include "cloud/assets_download_manager.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/virtual_asset_loader.h"
20 #include "cloud/virtual_cloud_data_translate.h"
21 #include "cloud_db_sync_utils_test.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "res_finalizer.h"
25 #include "rdb_data_generator.h"
26 #include "relational_store_client.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "virtual_communicator_aggregator.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 string g_testDir;
38 const std::string QUERY_INCONSISTENT_SQL =
39 "select count(*) from naturalbase_rdb_aux_AsyncDownloadAssetsTest_log where flag&0x20!=0;";
40 class DistributedDBCloudAsyncDownloadAssetsTest : public testing::Test {
41 public:
42 static void SetUpTestCase();
43 static void TearDownTestCase();
44 void SetUp() override;
45 void TearDown() override;
46 protected:
47 static DataBaseSchema GetSchema(bool multiTables = false);
48 static TableSchema GetTableSchema(const std::string &tableName, bool withoutAsset = false);
49 static CloudSyncOption GetAsyncCloudSyncOption();
50 static int GetAssetFieldCount();
51 void InitStore();
52 void CloseDb();
53 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end);
54 std::string storePath_;
55 sqlite3 *db_ = nullptr;
56 RelationalStoreDelegate *delegate_ = nullptr;
57 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
58 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
59 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
60 };
61
SetUpTestCase()62 void DistributedDBCloudAsyncDownloadAssetsTest::SetUpTestCase()
63 {
64 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
65 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
66 LOGE("rm test db files error!");
67 }
68 }
69
TearDownTestCase()70 void DistributedDBCloudAsyncDownloadAssetsTest::TearDownTestCase()
71 {
72 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
73 LOGE("rm test db files error!");
74 }
75 }
76
SetUp()77 void DistributedDBCloudAsyncDownloadAssetsTest::SetUp()
78 {
79 DistributedDBToolsUnitTest::PrintTestCaseInfo();
80 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
81 InitStore();
82 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
83 ASSERT_TRUE(communicatorAggregator_ != nullptr);
84 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
85 }
86
TearDown()87 void DistributedDBCloudAsyncDownloadAssetsTest::TearDown()
88 {
89 CloseDb();
90 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
91 LOGE("rm test db files error.");
92 }
93 virtualCloudDb_ = nullptr;
94 virtualAssetLoader_ = nullptr;
95 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
96 communicatorAggregator_ = nullptr;
97 }
98
GetSchema(bool multiTables)99 DataBaseSchema DistributedDBCloudAsyncDownloadAssetsTest::GetSchema(bool multiTables)
100 {
101 DataBaseSchema schema;
102 schema.tables.push_back(GetTableSchema("AsyncDownloadAssetsTest"));
103 if (multiTables) {
104 schema.tables.push_back(GetTableSchema("TABLE1"));
105 schema.tables.push_back(GetTableSchema("TABLE2"));
106 }
107 return schema;
108 }
109
GetTableSchema(const std::string & tableName,bool withoutAsset)110 TableSchema DistributedDBCloudAsyncDownloadAssetsTest::GetTableSchema(const std::string &tableName, bool withoutAsset)
111 {
112 TableSchema tableSchema;
113 tableSchema.name = tableName;
114 Field field;
115 field.primary = true;
116 field.type = TYPE_INDEX<int64_t>;
117 field.colName = "pk";
118 tableSchema.fields.push_back(field);
119 field.primary = false;
120 field.colName = "int_field";
121 tableSchema.fields.push_back(field);
122 if (withoutAsset) {
123 return tableSchema;
124 }
125 field.type = TYPE_INDEX<Assets>;
126 field.colName = "assets_1";
127 tableSchema.fields.push_back(field);
128 field.colName = "asset_1";
129 field.type = TYPE_INDEX<Asset>;
130 tableSchema.fields.push_back(field);
131 return tableSchema;
132 }
133
GetAsyncCloudSyncOption()134 CloudSyncOption DistributedDBCloudAsyncDownloadAssetsTest::GetAsyncCloudSyncOption()
135 {
136 CloudSyncOption option;
137 std::vector<std::string> tables;
138 auto schema = GetSchema();
139 for (const auto &table : schema.tables) {
140 tables.push_back(table.name);
141 LOGW("[DistributedDBCloudAsyncDownloadAssetsTest] Sync with table %s", table.name.c_str());
142 }
143 option.devices = {"cloud"};
144 option.query = Query::Select().FromTable(tables);
145 option.mode = SYNC_MODE_CLOUD_MERGE;
146 option.asyncDownloadAssets = true;
147 return option;
148 }
149
GetAssetFieldCount()150 int DistributedDBCloudAsyncDownloadAssetsTest::GetAssetFieldCount()
151 {
152 int count = 0;
153 auto schema = GetSchema();
154 for (const auto &table : schema.tables) {
155 for (const auto &field : table.fields) {
156 if (field.type == TYPE_INDEX<Assets> || field.type == TYPE_INDEX<Asset>) {
157 count++;
158 }
159 }
160 }
161 return count;
162 }
163
InitStore()164 void DistributedDBCloudAsyncDownloadAssetsTest::InitStore()
165 {
166 if (storePath_.empty()) {
167 storePath_ = g_testDir + "/" + STORE_ID_1 + ".db";
168 }
169 db_ = RelationalTestUtils::CreateDataBase(storePath_);
170 ASSERT_NE(db_, nullptr);
171 auto schema = GetSchema(true);
172 EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
173 RelationalStoreManager mgr(APP_ID, USER_ID);
174 ASSERT_EQ(mgr.OpenStore(storePath_, STORE_ID_1, {}, delegate_), OK);
175 ASSERT_NE(delegate_, nullptr);
176 for (const auto &table : schema.tables) {
177 EXPECT_EQ(delegate_->CreateDistributedTable(table.name, TableSyncType::CLOUD_COOPERATION), OK);
178 LOGI("[DistributedDBCloudAsyncDownloadAssetsTest] CreateDistributedTable %s", table.name.c_str());
179 }
180 virtualCloudDb_ = make_shared<VirtualCloudDb>();
181 ASSERT_NE(virtualCloudDb_, nullptr);
182 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
183 virtualAssetLoader_ = make_shared<VirtualAssetLoader>();
184 ASSERT_NE(virtualAssetLoader_, nullptr);
185 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
186 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
187
188 ASSERT_EQ(delegate_->SetCloudDbSchema(schema), DBStatus::OK);
189 }
190
CheckInconsistentCount(sqlite3 * db,int64_t expectCount)191 void CheckInconsistentCount(sqlite3 *db, int64_t expectCount)
192 {
193 EXPECT_EQ(sqlite3_exec(db, QUERY_INCONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
194 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
195 }
196
CloseDb()197 void DistributedDBCloudAsyncDownloadAssetsTest::CloseDb()
198 {
199 if (db_ != nullptr) {
200 sqlite3_close_v2(db_);
201 db_ = nullptr;
202 }
203 if (delegate_ != nullptr) {
204 RelationalStoreManager mgr(APP_ID, USER_ID);
205 EXPECT_EQ(mgr.CloseStore(delegate_), OK);
206 delegate_ = nullptr;
207 }
208 }
209
210 /**
211 * @tc.name: AsyncDownloadAssetConfig001
212 * @tc.desc: Test config with valid and invalid param.
213 * @tc.type: FUNC
214 * @tc.require:
215 * @tc.author: zqq
216 */
217 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig001, TestSize.Level0)
218 {
219 /**
220 * @tc.steps: step1. Set valid param
221 * @tc.expected: step1.ok
222 */
223 AsyncDownloadAssetsConfig config;
224 AssetsDownloadManager manager;
225 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
226 config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
227 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
228 config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS;
229 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
230
231 /**
232 * @tc.steps: step2. Set invalid param
233 * @tc.expected: step2.invalid args
234 */
235 config.maxDownloadTask += 1u;
236 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
237 config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
238 config.maxDownloadAssetsCount += 1u;
239 EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
240 }
241
242 /**
243 * @tc.name: AsyncDownloadAssetConfig002
244 * @tc.desc: Test config work correctly.
245 * @tc.type: FUNC
246 * @tc.require:
247 * @tc.author: zqq
248 */
249 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig002, TestSize.Level0)
250 {
251 /**
252 * @tc.steps: step1. Set valid param twice
253 * @tc.expected: step1. ok
254 */
255 AsyncDownloadAssetsConfig config;
256 config.maxDownloadTask = 10;
257 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
258 config.maxDownloadTask = 1;
259 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
260 /**
261 * @tc.steps: step2. Insert cloud data
262 * @tc.expected: step2. ok
263 */
264 const int cloudCount = 20;
265 auto schema = GetSchema();
266 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
267 /**
268 * @tc.steps: step3. Begin download first, block async task
269 * @tc.expected: step3. ok
270 */
271 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
272 int finishCount = 0;
273 std::mutex finishMutex;
274 std::condition_variable cv;
__anonc27c8c720202(void *) 275 auto finishAction = [&finishCount, &finishMutex, &cv](void *) {
276 std::lock_guard<std::mutex> autoLock(finishMutex);
277 finishCount++;
278 cv.notify_all();
279 };
280 auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
281 ASSERT_EQ(errCode, E_OK);
282 ASSERT_EQ(listener, nullptr);
283 ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
284 std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
285 ASSERT_EQ(errCode, -E_MAX_LIMITS);
286 ASSERT_NE(listener, nullptr);
287 /**
288 * @tc.steps: step4. Async cloud data
289 * @tc.expected: step4. ok and async task still one
290 */
291 CloudSyncOption option = GetAsyncCloudSyncOption();
292 RelationalTestUtils::CloudBlockSync(option, delegate_);
293 EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
294 /**
295 * @tc.steps: step5. Notify async task finish
296 * @tc.expected: step5. wait util another async task finish
297 */
298 manager->FinishDownload();
299 std::unique_lock uniqueLock(finishMutex);
__anonc27c8c720302() 300 auto res = cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finishCount]() {
301 return finishCount >= 2; // 2 async task
302 });
303 EXPECT_TRUE(res);
304 listener->Drop(true);
305 }
306
307 /**
308 * @tc.name: AsyncDownloadAssetConfig003
309 * @tc.desc: Test asyncDownloadAssets and compensatedSyncOnly both true.
310 * @tc.type: FUNC
311 * @tc.require:
312 * @tc.author: tankaisheng
313 */
314 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig003, TestSize.Level0)
315 {
316 /**
317 * @tc.steps: step1. Insert cloud data
318 * @tc.expected: step1. ok
319 */
320 const int cloudCount = 10;
321 auto schema = GetSchema();
322 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
323 /**
324 * @tc.steps: step2. set compensatedSyncOnly true and sync return NOT_SUPPORT.
325 * @tc.expected: step2. NOT_SUPPORT
326 */
327 CloudSyncOption option = GetAsyncCloudSyncOption();
328 option.compensatedSyncOnly = true;
329 DBStatus result = delegate_->Sync(option, nullptr);
330 EXPECT_EQ(result, NOT_SUPPORT);
331 }
332
333 /**
334 * @tc.name: FinishListener001
335 * @tc.desc: Test listen download finish event.
336 * @tc.type: FUNC
337 * @tc.require:
338 * @tc.author: zqq
339 */
340 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, FinishListener001, TestSize.Level0)
341 {
342 /**
343 * @tc.steps: step1. Begin download first time
344 * @tc.expected: step1.ok
345 */
346 AssetsDownloadManager manager;
347 std::atomic<bool> finished = false;
__anonc27c8c720402(void *) 348 auto finishAction = [&finished](void *) {
349 EXPECT_TRUE(finished);
350 };
351 auto [errCode, listener] = manager.BeginDownloadWithListener(finishAction);
352 ASSERT_EQ(errCode, E_OK);
353 ASSERT_EQ(listener, nullptr);
354 /**
355 * @tc.steps: step2. Begin download twice
356 * @tc.expected: step2. -E_MAX_LIMITS because default one task
357 */
358 std::tie(errCode, listener) = manager.BeginDownloadWithListener(finishAction);
359 EXPECT_EQ(errCode, -E_MAX_LIMITS);
360 EXPECT_NE(listener, nullptr);
361 /**
362 * @tc.steps: step3. Finish download
363 * @tc.expected: step3. finished is true in listener
364 */
365 finished = true;
366 manager.FinishDownload();
367 listener->Drop(true);
368 }
369
370 /**
371 * @tc.name: AsyncComplexDownload001
372 * @tc.desc: Test complex async download.
373 * @tc.type: FUNC
374 * @tc.require:
375 * @tc.author: zqq
376 */
377 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload001, TestSize.Level0)
378 {
379 /**
380 * @tc.steps: step1. Set max download task 1
381 * @tc.expected: step1. ok
382 */
383 AsyncDownloadAssetsConfig config;
384 config.maxDownloadTask = 1;
385 config.maxDownloadAssetsCount = 1;
386 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
387 /**
388 * @tc.steps: step2. Insert cloud data
389 * @tc.expected: step2. ok
390 */
391 const int cloudCount = 10;
392 auto schema = GetSchema();
393 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
394 /**
395 * @tc.steps: step3. Async cloud data
396 * @tc.expected: step3. ok
397 */
398 CloudSyncOption option = GetAsyncCloudSyncOption();
399 RelationalTestUtils::CloudBlockSync(option, delegate_);
400 /**
401 * @tc.steps: step3. Block download cloud data
402 * @tc.expected: step3. ok
403 */
404 option.asyncDownloadAssets = false;
405 RelationalTestUtils::CloudBlockSync(option, delegate_);
406 }
407
408 /**
409 * @tc.name: AsyncComplexDownload002
410 * @tc.desc: Test complex async download.
411 * @tc.type: FUNC
412 * @tc.require:
413 * @tc.author: zqq
414 */
415 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload002, TestSize.Level0)
416 {
417 /**
418 * @tc.steps: step1. Set max download task 1
419 * @tc.expected: step1. ok
420 */
421 AsyncDownloadAssetsConfig config;
422 config.maxDownloadTask = 1;
423 config.maxDownloadAssetsCount = 1;
424 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
425 /**
426 * @tc.steps: step2. Insert cloud data
427 * @tc.expected: step2. ok
428 */
429 const int cloudCount = 10;
430 auto schema = GetSchema();
431 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
432 /**
433 * @tc.steps: step3. Complex cloud data
434 * @tc.expected: step3. ok
435 */
436 CloudSyncOption option = GetAsyncCloudSyncOption();
437 for (int i = 0; i < 10; ++i) { // loop 10 times
438 option.asyncDownloadAssets = false;
439 RelationalTestUtils::CloudBlockSync(option, delegate_);
440 option.asyncDownloadAssets = true;
441 RelationalTestUtils::CloudBlockSync(option, delegate_);
442 }
443 }
444
445 /**
446 * @tc.name: AsyncAbnormalDownload001
447 * @tc.desc: Test abnormal async download.
448 * @tc.type: FUNC
449 * @tc.require:
450 * @tc.author: zqq
451 */
452 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload001, TestSize.Level4)
453 {
454 /**
455 * @tc.steps: step1. Set max download task 1
456 * @tc.expected: step1. ok
457 */
458 AsyncDownloadAssetsConfig config;
459 config.maxDownloadTask = 1;
460 config.maxDownloadAssetsCount = 1;
461 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
462 /**
463 * @tc.steps: step2. Insert cloud data
464 * @tc.expected: step2. ok
465 */
466 const int cloudCount = 10;
467 auto schema = GetSchema();
468 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
469 /**
470 * @tc.steps: step3. Fork download abnormal
471 */
472 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
473 /**
474 * @tc.steps: step4. Async cloud data
475 * @tc.expected: step4. ok
476 */
477 CloudSyncOption option = GetAsyncCloudSyncOption();
478 RelationalTestUtils::CloudBlockSync(option, delegate_);
479 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
480 EXPECT_EQ(status, OK);
481 EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
482 EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
483 std::this_thread::sleep_for(std::chrono::seconds(1));
484 /**
485 * @tc.steps: step5. Async cloud data with download ok
486 * @tc.expected: step5. ok
487 */
488 virtualAssetLoader_->SetDownloadStatus(OK);
489 LOGW("set download ok");
490 int count = 0;
491 std::mutex countMutex;
492 std::condition_variable cv;
493 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c720502(const std::string &tableName, std::map<std::string, Assets> &) 494 std::map<std::string, Assets> &) {
495 std::lock_guard<std::mutex> autoLock(countMutex);
496 count++;
497 if (count == 1) {
498 std::this_thread::sleep_for(std::chrono::seconds(1));
499 }
500 cv.notify_all();
501 });
502 RelationalTestUtils::CloudBlockSync(option, delegate_);
503 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c720602() 504 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
505 return count >= cloudCount;
506 });
507 std::this_thread::sleep_for(std::chrono::seconds(1));
508 std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
509 EXPECT_EQ(status, OK);
510 EXPECT_EQ(downloadCount, 0);
511 virtualAssetLoader_->ForkDownload(nullptr);
512 }
513
514 /**
515 * @tc.name: AsyncAbnormalDownload002
516 * @tc.desc: Test abnormal sync download.
517 * @tc.type: FUNC
518 * @tc.require:
519 * @tc.author: zqq
520 */
521 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload002, TestSize.Level0)
522 {
523 /**
524 * @tc.steps: step1. Insert cloud data
525 * @tc.expected: step1. ok
526 */
527 const int cloudCount = 10;
528 auto schema = GetSchema();
529 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
530 /**
531 * @tc.steps: step2. Fork download abnormal
532 */
533 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
534 /**
535 * @tc.steps: step3. Sync cloud data
536 * @tc.expected: step3. DB_ERROR and not exist downloading assets count
537 */
538 CloudSyncOption option = GetAsyncCloudSyncOption();
539 option.asyncDownloadAssets = false;
540 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, CLOUD_ERROR);
541 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
542 EXPECT_EQ(status, OK);
543 EXPECT_EQ(downloadCount, 0);
544 virtualAssetLoader_->SetDownloadStatus(OK);
545 }
546
547 /**
548 * @tc.name: AsyncAbnormalDownload003
549 * @tc.desc: Test abnormal async download.
550 * @tc.type: FUNC
551 * @tc.require:
552 * @tc.author: zqq
553 */
554 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload003, TestSize.Level4)
555 {
556 /**
557 * @tc.steps: step1. Set max download task 1
558 * @tc.expected: step1. ok
559 */
560 AsyncDownloadAssetsConfig config;
561 config.maxDownloadTask = 1;
562 config.maxDownloadAssetsCount = 4; // 1 record has 2 asset, config 1 batch has 2 record
563 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
564 /**
565 * @tc.steps: step2. Insert cloud data
566 * @tc.expected: step2. ok
567 */
568 const int cloudCount = 5;
569 auto schema = GetSchema();
570 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
571 /**
572 * @tc.steps: step3. Fork download abnormal
573 */
574 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
575 /**
576 * @tc.steps: step4. Sync cloud data
577 * @tc.expected: step4. DB_ERROR and exist downloading assets count
578 */
579 CloudSyncOption option = GetAsyncCloudSyncOption();
580 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
581 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
582 EXPECT_EQ(status, OK);
583 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
584 CheckInconsistentCount(db_, 5);
585 std::this_thread::sleep_for(std::chrono::seconds(1));
586 /**
587 * @tc.steps: step5. Sync cloud data again and block upload
588 * @tc.expected: step5. DB_ERROR and exist downloading assets count
589 */
590 EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
591 virtualAssetLoader_->SetDownloadStatus(OK);
__anonc27c8c720702(const std::string &, VBucket &) 592 virtualCloudDb_->ForkUpload([delegate = delegate_](const std::string &, VBucket &) {
593 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
594 auto [ret, count] = delegate->GetDownloadingAssetsCount();
595 EXPECT_EQ(ret, OK);
596 EXPECT_EQ(count, 0);
597 });
598 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
599 virtualAssetLoader_->ForkBatchDownload(nullptr);
600 }
601
602 /**
603 * @tc.name: AsyncAbnormalDownload004
604 * @tc.desc: Test update trigger retain 0x1000 flag.
605 * @tc.type: FUNC
606 * @tc.require:
607 * @tc.author: zqq
608 */
609 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload004, TestSize.Level0)
610 {
611 /**
612 * @tc.steps: step1. Set async config
613 * @tc.expected: step1. ok
614 */
615 AsyncDownloadAssetsConfig config;
616 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
617 /**
618 * @tc.steps: step2. Insert cloud data
619 * @tc.expected: step2. ok
620 */
621 const int cloudCount = 1;
622 auto schema = GetSchema();
623 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
624 /**
625 * @tc.steps: step3. Fork download abnormal
626 */
627 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
628 /**
629 * @tc.steps: step4. Sync cloud data
630 * @tc.expected: step4. DB_ERROR and exist downloading assets count
631 */
632 CloudSyncOption option = GetAsyncCloudSyncOption();
633 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
634 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
635 EXPECT_EQ(status, OK);
636 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
637 /**
638 * @tc.steps: step5. Update local data
639 * @tc.expected: step5. Exist downloading assets count
640 */
641 EXPECT_EQ(RDBDataGenerator::UpsertLocalDBData(0, cloudCount, db_,
642 GetTableSchema("AsyncDownloadAssetsTest", true)), OK);
643 std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
644 EXPECT_EQ(status, OK);
645 EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
646 }
647
648 /**
649 * @tc.name: AsyncAbnormalDownload005
650 * @tc.desc: Test download assets which was locked
651 * @tc.type: FUNC
652 * @tc.require:
653 * @tc.author: liaoyonghuang
654 */
655 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload005, TestSize.Level1)
656 {
657 /**
658 * @tc.steps: step1. Set async config
659 * @tc.expected: step1. ok
660 */
661 AsyncDownloadAssetsConfig config;
662 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
663 /**
664 * @tc.steps: step2. Init data
665 * @tc.expected: step2. ok
666 */
667 const int cloudCount = 10;
668 auto schema = GetSchema();
669 ASSERT_TRUE(!schema.tables.empty());
670 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
671 CloudSyncOption option = GetAsyncCloudSyncOption();
672 option.asyncDownloadAssets = false;
673 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
674 /**
675 * @tc.steps: step3. Update cloud data and lock local data
676 * @tc.expected: step3. ok
677 */
678 auto [records, extends] =
679 RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, schema.tables.front().fields);
680 Asset asset = {.name = "asset_1", .hash = "new_hash"};
681 for (auto &record : records) {
682 record.insert_or_assign("asset_1", asset);
683 }
684 std::string table = schema.tables.front().name;
685 EXPECT_EQ(virtualCloudDb_->BatchUpdate(table, std::move(records), extends), OK);
686 virtualAssetLoader_->ForkDownload([&](const std::string &tableName,
__anonc27c8c720802(const std::string &tableName, std::map<std::string, Assets> &) 687 std::map<std::string, Assets> &) {
688 std::vector<std::vector<uint8_t>> hashKey;
689 CloudDBSyncUtilsTest::GetHashKey(table, "data_key < 5", db_, hashKey); // lock half of the data
690 EXPECT_EQ(Lock(table, hashKey, db_), OK);
691 });
692 /**
693 * @tc.steps: step4. Sync and check data
694 * @tc.expected: step4. ok
695 */
696 option.asyncDownloadAssets = true;
697 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
698 std::this_thread::sleep_for(std::chrono::seconds(1));
699 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
700 EXPECT_EQ(status, OK);
701 EXPECT_EQ(downloadCount, cloudCount / 2); // half of the data was not downloaded due to being locked
702 virtualAssetLoader_->ForkDownload(nullptr);
703 }
704
705 /**
706 * @tc.name: AsyncNormalDownload001
707 * @tc.desc: Test abnormal async download.
708 * @tc.type: FUNC
709 * @tc.require:
710 * @tc.author: zqq
711 */
712 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload001, TestSize.Level0)
713 {
714 /**
715 * @tc.steps: step1. Register observer
716 * @tc.expected: step1. ok
717 */
718 auto rdbObserver = new(std::nothrow) RelationalStoreObserverUnitTest();
719 ASSERT_NE(rdbObserver, nullptr);
__anonc27c8c720902() 720 ResFinalizer resFinalizer([rdbObserver, this]() {
721 delegate_->UnRegisterObserver(rdbObserver);
722 delete rdbObserver;
723 });
724 EXPECT_EQ(delegate_->RegisterObserver(rdbObserver), OK);
725 /**
726 * @tc.steps: step2. Insert cloud data
727 * @tc.expected: step2. ok
728 */
729 const int cloudCount = 1;
730 auto schema = GetSchema();
731 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
732 EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
733 /**
734 * @tc.steps: step3. Block upload while async donwload asset
735 * @tc.expected: step3. Sync ok
736 */
737 auto hook = RelationalTestUtils::GetRDBStorageHook(USER_ID, APP_ID, STORE_ID_1, storePath_);
738 ASSERT_NE(hook, nullptr);
__anonc27c8c720a02() 739 hook->SetBeforeUploadTransaction([]() {
740 int count = 1;
741 const int maxLoop = 5;
742 do {
743 std::this_thread::sleep_for(std::chrono::seconds(1));
744 count++;
745 } while (RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetCurrentDownloadCount() > 0 &&
746 count < maxLoop);
747 LOGW("AsyncNormalDownload001 End hook");
748 });
749 CloudSyncOption option = GetAsyncCloudSyncOption();
750 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
751 EXPECT_TRUE(rdbObserver->IsAssetChange(GetTableSchema("AsyncDownloadAssetsTest").name));
752 hook->SetBeforeUploadTransaction(nullptr);
753 }
754
755 /**
756 * @tc.name: AsyncNormalDownload002
757 * @tc.desc: Test sync download when download task pool is full
758 * @tc.type: FUNC
759 * @tc.require:
760 * @tc.author: lhy
761 */
762 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload002, TestSize.Level4)
763 {
764 /**
765 * @tc.steps: step1. Set max download task 1
766 * @tc.expected: step1. Ok
767 */
768 AsyncDownloadAssetsConfig config;
769 config.maxDownloadTask = 1;
770 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
771 /**
772 * @tc.steps: step2. Insert cloud data
773 * @tc.expected: step2. Ok
774 */
775 const int cloudCount = 1;
776 auto schema = GetSchema();
777 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
778 /**
779 * @tc.steps: step3. Fork download abnormal
780 */
781 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
782 /**
783 * @tc.steps: step4. Async cloud data, with abnormal result
784 * @tc.expected: step4. Ok
785 */
786 CloudSyncOption option = GetAsyncCloudSyncOption();
787 RelationalTestUtils::CloudBlockSync(option, delegate_);
788 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
789 EXPECT_EQ(status, OK);
790 EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
791 EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
792 /**
793 * @tc.steps: step5. Wait for failed download to finish
794 * @tc.expected: step5. Download task changes from 1 to 0
795 */
796 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
797 EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
798 std::this_thread::sleep_for(std::chrono::seconds(1));
799 EXPECT_EQ(manager->GetCurrentDownloadCount(), 0u);
800 /**
801 * @tc.steps: step6. Start a new download task to reach maxDownloadTask
802 * @tc.expected: step6. 1 task is downloading
803 */
804 auto [errCode, listener] = manager->BeginDownloadWithListener(nullptr);
805 ASSERT_EQ(errCode, E_OK);
806 ASSERT_EQ(listener, nullptr);
807 ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
808 /**
809 * @tc.steps: step7. Set download status to ok then try sync while task pool is full
810 * @tc.expected: step7. Download should be waiting instead of doing compensated sync
811 */
812 virtualAssetLoader_->SetDownloadStatus(OK);
813 LOGW("set download ok");
814 int count = 0;
815 std::mutex countMutex;
816 std::condition_variable cv;
817 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c720b02(const std::string &tableName, std::map<std::string, Assets> &) 818 std::map<std::string, Assets> &) {
819 std::lock_guard<std::mutex> autoLock(countMutex);
820 count++;
821 if (count == 1) {
822 std::this_thread::sleep_for(std::chrono::seconds(1));
823 }
824 cv.notify_all();
825 });
826 RelationalTestUtils::CloudBlockSync(option, delegate_);
827 std::this_thread::sleep_for(std::chrono::seconds(1));
828 EXPECT_EQ(count, 0);
829 /**
830 * @tc.steps: step8. Finish the download task, to let sync continue
831 * @tc.expected: step8. Only 1 actural download through virtualAssetLoader_
832 */
833 manager->FinishDownload();
834 std::this_thread::sleep_for(std::chrono::seconds(1));
835 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c720c02() 836 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
837 return count >= cloudCount;
838 });
839 EXPECT_EQ(count, 1);
840 virtualAssetLoader_->ForkDownload(nullptr);
841 }
842
843 /**
844 * @tc.name: AsyncNormalDownload003
845 * @tc.desc: Test:async asset task paused
846 * @tc.type: FUNC
847 * @tc.require:
848 * @tc.author: bty
849 */
850 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload003, TestSize.Level4)
851 {
852 /**
853 * @tc.steps: step1. Set max download task 1
854 * @tc.expected: step1. Ok
855 */
856 AsyncDownloadAssetsConfig config;
857 config.maxDownloadTask = 1;
858 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
859 /**
860 * @tc.steps: step2. Insert cloud data
861 * @tc.expected: step2. Ok
862 */
863 const int cloudCount = 1;
864 auto schema = GetSchema();
865 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
866 /**
867 * @tc.steps: step3. async asset task submit
868 * @tc.expected: step3. Ok
869 */
870 int count = 0;
871 int expQueryTimes = 3;
872 std::mutex mutex;
873 std::condition_variable cond;
__anonc27c8c720d02(const std::string &, VBucket &extend) 874 virtualCloudDb_->ForkQuery([&count, &cond, expQueryTimes](const std::string &, VBucket &extend) {
875 count++;
876 if (count == 1) {
877 cond.notify_all();
878 std::this_thread::sleep_for(std::chrono::seconds(1));
879 }
880 if (count == expQueryTimes) {
881 std::string cursor;
882 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
883 EXPECT_EQ(cursor, std::string("1"));
884 }
885 });
__anonc27c8c720e02null886 std::thread t1([this]{
887 CloudSyncOption option = GetAsyncCloudSyncOption();
888 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
889 });
890 /**
891 * @tc.steps: step4. wait for async task query
892 * @tc.expected: step4. Ok
893 */
894 {
895 std::unique_lock<std::mutex> lock(mutex);
__anonc27c8c720f02() 896 (void)cond.wait_for(lock, std::chrono::seconds(1), [&count]() {
897 return count == 1;
898 });
899 }
900 /**
901 * @tc.steps: step5. priority task submit
902 * @tc.expected: step5. Ok
903 */
904 CloudSyncOption priOption = GetAsyncCloudSyncOption();
905 std::vector<int64_t> inValue = {3, 4};
906 priOption.priorityTask = true;
907 priOption.query = Query::Select().From("AsyncDownloadAssetsTest").In("pk", inValue);
908 priOption.asyncDownloadAssets = false;
909 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(priOption, delegate_));
910 t1.join();
911 virtualCloudDb_->ForkQuery(nullptr);
912 EXPECT_EQ(count, expQueryTimes);
913 }
914
915 /**
916 * @tc.name: AsyncNormalDownload004
917 * @tc.desc: Test multiple tables and multiple batches of asset downloads
918 * @tc.type: FUNC
919 * @tc.require:
920 * @tc.author: liaoyonghuang
921 */
922 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload004, TestSize.Level1)
923 {
924 /**
925 * @tc.steps: step1. Set max download task 1
926 * @tc.expected: step1. Ok
927 */
928 AsyncDownloadAssetsConfig config;
929 config.maxDownloadAssetsCount = 25;
930 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
931 /**
932 * @tc.steps: step2. Insert cloud data
933 * @tc.expected: step2. Ok
934 */
935 const int cloudCount = 100;
936 std::string table1 = "TABLE1";
937 std::string table2 = "TABLE2";
938 DataBaseSchema schema;
939 schema.tables.push_back(GetTableSchema(table1));
940 schema.tables.push_back(GetTableSchema(table2));
941 EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
942 auto [record1, extend1] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, GetTableSchema(table1).fields);
943 EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table1, std::move(record1), extend1), OK);
944 auto [record2, extend2] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, cloudCount,
945 GetTableSchema(table2).fields);
946 EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table2, std::move(record2), extend2), OK);
947 /**
948 * @tc.steps: step3. async asset task submit
949 * @tc.expected: step3. Ok
950 */
951 int assetsDownloadTime = 0;
952 virtualAssetLoader_->ForkDownload([&table1, &table2, &assetsDownloadTime](const std::string &tableName,
__anonc27c8c721002(const std::string &tableName, std::map<std::string, Assets> &) 953 std::map<std::string, Assets> &) {
954 if (assetsDownloadTime < 100) { // 100 assets
955 EXPECT_EQ(tableName, table1);
956 } else {
957 EXPECT_EQ(tableName, table2);
958 }
959 assetsDownloadTime++;
960 });
961 CloudSyncOption option;
962 option.devices = {"cloud"};
963 option.asyncDownloadAssets = true;
964 Query query = Query::Select().FromTable({table1, table2});
965 option.query = query;
966 EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
967 std::this_thread::sleep_for(std::chrono::seconds(5));
968 EXPECT_EQ(assetsDownloadTime, 200);
969 virtualAssetLoader_->ForkDownload(nullptr);
970 }
971
972 /**
973 * @tc.name: AsyncAbnormalDownload006
974 * @tc.desc: Test abnormal async download.
975 * @tc.type: FUNC
976 * @tc.require:
977 * @tc.author: suyue
978 */
979 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload006, TestSize.Level4)
980 {
981 /**
982 * @tc.steps: step1. Set config and insert 70 cloud data
983 * @tc.expected: step1. ok
984 */
985 AsyncDownloadAssetsConfig config;
986 config.maxDownloadTask = 5;
987 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
988 const int cloudCount = 70;
989 auto schema = GetSchema();
990 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
991
992 /**
993 * @tc.steps: step2. Fork download abnormal for 0-10 records
994 * @tc.expected: step2. ok
995 */
996 virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
997 uint32_t failNum = 10;
998 const DownloadFailRange setRange = {.isAllFail = false, .failBeginIndex = 0, .failEndIndex = failNum};
999 virtualAssetLoader_->SetDownloadFailRange(setRange);
1000
1001 /**
1002 * @tc.steps: step3. Async cloud data
1003 * @tc.expected: step3. ok
1004 */
1005 int count = 0;
1006 std::mutex countMutex;
1007 std::condition_variable cv;
1008 virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c721102(const std::string &tableName, std::map<std::string, Assets> &) 1009 std::map<std::string, Assets> &) {
1010 std::lock_guard<std::mutex> autoLock(countMutex);
1011 count++;
1012 if (count == 1) {
1013 std::this_thread::sleep_for(std::chrono::seconds(1));
1014 }
1015 cv.notify_all();
1016 });
1017
1018 CloudSyncOption option = GetAsyncCloudSyncOption();
1019 RelationalTestUtils::CloudBlockSync(option, delegate_);
1020 std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c721202() 1021 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
1022 return count >= cloudCount;
1023 });
1024 std::this_thread::sleep_for(std::chrono::seconds(1));
1025 auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1026 EXPECT_EQ(status, OK);
1027 EXPECT_EQ(downloadCount, static_cast<int32_t>(failNum * 2)); // 1 record has 2 asset
1028
1029 virtualAssetLoader_->SetDownloadStatus(OK);
1030 virtualAssetLoader_->Reset();
1031 virtualAssetLoader_->ForkDownload(nullptr);
1032 }
1033
UpdateLocalData(sqlite3 * & db,const std::string & tableName,int32_t begin,int32_t end)1034 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalData(sqlite3 *&db,
1035 const std::string &tableName, int32_t begin, int32_t end)
1036 {
1037 const string sql = "update " + tableName + " set int_field = int_field+1 " + "where pk>=" + std::to_string(begin) +
1038 " and pk<=" + std::to_string(end) + ";";
1039 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1040 LOGW("update local data finished");
1041 }
1042
1043 /**
1044 * @tc.name: AsyncAbnormalDownload007
1045 * @tc.desc: Test assets is downloading then update local data can upload
1046 * @tc.type: FUNC
1047 * @tc.require:
1048 * @tc.author: tankaisheng
1049 */
1050 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload007, TestSize.Level1)
1051 {
1052 /**
1053 * @tc.steps: step1. Set async config
1054 * @tc.expected: step1. ok
1055 */
1056 AsyncDownloadAssetsConfig config;
1057 EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1058 /**
1059 * @tc.steps: step2. Init data and sync
1060 * @tc.expected: step2. ok
1061 */
1062 const int cloudCount = 10;
1063 auto schema = GetSchema();
1064 ASSERT_TRUE(!schema.tables.empty());
1065 EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1066 CloudSyncOption option = GetAsyncCloudSyncOption();
1067 option.asyncDownloadAssets = false;
1068 RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
1069 /**
1070 * @tc.steps: step3. update cloud data
1071 * @tc.expected: step3. ok
1072 */
1073 std::vector<VBucket> record;
1074 std::vector<VBucket> extend;
1075 Timestamp now = TimeHelper::GetSysCurrentTime();
1076 VBucket data;
1077 data.insert_or_assign("pk", 10L);
1078 data.insert_or_assign("int_field", 11L);
1079 Assets assets = {{.name = "new_asset_1", .hash = "new_hash"}};
1080 data.insert_or_assign("assets_1", assets);
1081 record.push_back(data);
1082 VBucket log;
1083 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
1084 now / CloudDbConstant::TEN_THOUSAND));
1085 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
1086 now / CloudDbConstant::TEN_THOUSAND));
1087 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1088 extend.push_back(log);
1089 std::string table = schema.tables.front().name;
1090 EXPECT_EQ(virtualCloudDb_->BatchInsert(table, std::move(record), extend), OK);
1091
1092 /**
1093 * @tc.steps: step4. Sync and update local data then check upload count
1094 * @tc.expected: step4. ok
1095 */
1096 option.asyncDownloadAssets = true;
1097 std::atomic<bool> syncInProgress(true);
__anonc27c8c721302() 1098 std::thread syncThread([&]() {
1099 RelationalTestUtils::CloudBlockSync(option, delegate_);
1100 syncInProgress.store(false);
1101 });
1102
1103 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1104 UpdateLocalData(db_, "AsyncDownloadAssetsTest", 0, cloudCount);
1105
1106 int count = 10;
__anonc27c8c721402(const std::string&, VBucket&) 1107 virtualCloudDb_->ForkUpload([&count, delegate = delegate_](const std::string&, VBucket&) {
1108 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
1109 auto [ret, count] = delegate->GetDownloadingAssetsCount();
1110 EXPECT_EQ(ret, OK);
1111 EXPECT_EQ(count, 0);
1112 });
1113 syncThread.join();
1114 }
1115 }