• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 
17 #include "cloud/assets_download_manager.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/virtual_asset_loader.h"
20 #include "cloud/virtual_cloud_data_translate.h"
21 #include "cloud_db_sync_utils_test.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "res_finalizer.h"
25 #include "rdb_data_generator.h"
26 #include "relational_store_client.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "virtual_communicator_aggregator.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37 string g_testDir;
38 const std::string QUERY_INCONSISTENT_SQL =
39     "select count(*) from naturalbase_rdb_aux_AsyncDownloadAssetsTest_log where flag&0x20!=0;";
40 class DistributedDBCloudAsyncDownloadAssetsTest : public testing::Test {
41 public:
42     static void SetUpTestCase();
43     static void TearDownTestCase();
44     void SetUp() override;
45     void TearDown() override;
46 protected:
47     static DataBaseSchema GetSchema(bool multiTables = false);
48     static TableSchema GetTableSchema(const std::string &tableName, bool withoutAsset = false);
49     static CloudSyncOption GetAsyncCloudSyncOption();
50     static int GetAssetFieldCount();
51     void InitStore();
52     void CloseDb();
53     void UpdateLocalData(sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end);
54     std::string storePath_;
55     sqlite3 *db_ = nullptr;
56     RelationalStoreDelegate *delegate_ = nullptr;
57     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
58     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
59     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
60 };
61 
SetUpTestCase()62 void DistributedDBCloudAsyncDownloadAssetsTest::SetUpTestCase()
63 {
64     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
65     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
66         LOGE("rm test db files error!");
67     }
68 }
69 
TearDownTestCase()70 void DistributedDBCloudAsyncDownloadAssetsTest::TearDownTestCase()
71 {
72     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
73         LOGE("rm test db files error!");
74     }
75 }
76 
SetUp()77 void DistributedDBCloudAsyncDownloadAssetsTest::SetUp()
78 {
79     DistributedDBToolsUnitTest::PrintTestCaseInfo();
80     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
81     InitStore();
82     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
83     ASSERT_TRUE(communicatorAggregator_ != nullptr);
84     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
85 }
86 
TearDown()87 void DistributedDBCloudAsyncDownloadAssetsTest::TearDown()
88 {
89     CloseDb();
90     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
91         LOGE("rm test db files error.");
92     }
93     virtualCloudDb_ = nullptr;
94     virtualAssetLoader_ = nullptr;
95     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
96     communicatorAggregator_ = nullptr;
97 }
98 
GetSchema(bool multiTables)99 DataBaseSchema DistributedDBCloudAsyncDownloadAssetsTest::GetSchema(bool multiTables)
100 {
101     DataBaseSchema schema;
102     schema.tables.push_back(GetTableSchema("AsyncDownloadAssetsTest"));
103     if (multiTables) {
104         schema.tables.push_back(GetTableSchema("TABLE1"));
105         schema.tables.push_back(GetTableSchema("TABLE2"));
106     }
107     return schema;
108 }
109 
GetTableSchema(const std::string & tableName,bool withoutAsset)110 TableSchema DistributedDBCloudAsyncDownloadAssetsTest::GetTableSchema(const std::string &tableName, bool withoutAsset)
111 {
112     TableSchema tableSchema;
113     tableSchema.name = tableName;
114     Field field;
115     field.primary = true;
116     field.type = TYPE_INDEX<int64_t>;
117     field.colName = "pk";
118     tableSchema.fields.push_back(field);
119     field.primary = false;
120     field.colName = "int_field";
121     tableSchema.fields.push_back(field);
122     if (withoutAsset) {
123         return tableSchema;
124     }
125     field.type = TYPE_INDEX<Assets>;
126     field.colName = "assets_1";
127     tableSchema.fields.push_back(field);
128     field.colName = "asset_1";
129     field.type = TYPE_INDEX<Asset>;
130     tableSchema.fields.push_back(field);
131     return tableSchema;
132 }
133 
GetAsyncCloudSyncOption()134 CloudSyncOption DistributedDBCloudAsyncDownloadAssetsTest::GetAsyncCloudSyncOption()
135 {
136     CloudSyncOption option;
137     std::vector<std::string> tables;
138     auto schema = GetSchema();
139     for (const auto &table : schema.tables) {
140         tables.push_back(table.name);
141         LOGW("[DistributedDBCloudAsyncDownloadAssetsTest] Sync with table %s", table.name.c_str());
142     }
143     option.devices = {"cloud"};
144     option.query = Query::Select().FromTable(tables);
145     option.mode = SYNC_MODE_CLOUD_MERGE;
146     option.asyncDownloadAssets = true;
147     return option;
148 }
149 
GetAssetFieldCount()150 int DistributedDBCloudAsyncDownloadAssetsTest::GetAssetFieldCount()
151 {
152     int count = 0;
153     auto schema = GetSchema();
154     for (const auto &table : schema.tables) {
155         for (const auto &field : table.fields) {
156             if (field.type == TYPE_INDEX<Assets> || field.type == TYPE_INDEX<Asset>) {
157                 count++;
158             }
159         }
160     }
161     return count;
162 }
163 
InitStore()164 void DistributedDBCloudAsyncDownloadAssetsTest::InitStore()
165 {
166     if (storePath_.empty()) {
167         storePath_ = g_testDir + "/" + STORE_ID_1 + ".db";
168     }
169     db_ = RelationalTestUtils::CreateDataBase(storePath_);
170     ASSERT_NE(db_, nullptr);
171     auto schema = GetSchema(true);
172     EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
173     RelationalStoreManager mgr(APP_ID, USER_ID);
174     ASSERT_EQ(mgr.OpenStore(storePath_, STORE_ID_1, {}, delegate_), OK);
175     ASSERT_NE(delegate_, nullptr);
176     for (const auto &table : schema.tables) {
177         EXPECT_EQ(delegate_->CreateDistributedTable(table.name, TableSyncType::CLOUD_COOPERATION), OK);
178         LOGI("[DistributedDBCloudAsyncDownloadAssetsTest] CreateDistributedTable %s", table.name.c_str());
179     }
180     virtualCloudDb_ = make_shared<VirtualCloudDb>();
181     ASSERT_NE(virtualCloudDb_, nullptr);
182     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
183     virtualAssetLoader_ = make_shared<VirtualAssetLoader>();
184     ASSERT_NE(virtualAssetLoader_, nullptr);
185     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
186     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
187 
188     ASSERT_EQ(delegate_->SetCloudDbSchema(schema), DBStatus::OK);
189 }
190 
CheckInconsistentCount(sqlite3 * db,int64_t expectCount)191 void CheckInconsistentCount(sqlite3 *db, int64_t expectCount)
192 {
193     EXPECT_EQ(sqlite3_exec(db, QUERY_INCONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
194         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
195 }
196 
CloseDb()197 void DistributedDBCloudAsyncDownloadAssetsTest::CloseDb()
198 {
199     if (db_ != nullptr) {
200         sqlite3_close_v2(db_);
201         db_ = nullptr;
202     }
203     if (delegate_ != nullptr) {
204         RelationalStoreManager mgr(APP_ID, USER_ID);
205         EXPECT_EQ(mgr.CloseStore(delegate_), OK);
206         delegate_ = nullptr;
207     }
208 }
209 
210 /**
211  * @tc.name: AsyncDownloadAssetConfig001
212  * @tc.desc: Test config with valid and invalid param.
213  * @tc.type: FUNC
214  * @tc.require:
215  * @tc.author: zqq
216  */
217 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig001, TestSize.Level0)
218 {
219     /**
220      * @tc.steps: step1. Set valid param
221      * @tc.expected: step1.ok
222      */
223     AsyncDownloadAssetsConfig config;
224     AssetsDownloadManager manager;
225     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
226     config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
227     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
228     config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS;
229     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
230 
231     /**
232      * @tc.steps: step2. Set invalid param
233      * @tc.expected: step2.invalid args
234      */
235     config.maxDownloadTask += 1u;
236     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
237     config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
238     config.maxDownloadAssetsCount += 1u;
239     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
240 }
241 
242 /**
243  * @tc.name: AsyncDownloadAssetConfig002
244  * @tc.desc: Test config work correctly.
245  * @tc.type: FUNC
246  * @tc.require:
247  * @tc.author: zqq
248  */
249 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig002, TestSize.Level0)
250 {
251     /**
252      * @tc.steps: step1. Set valid param twice
253      * @tc.expected: step1. ok
254      */
255     AsyncDownloadAssetsConfig config;
256     config.maxDownloadTask = 10;
257     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
258     config.maxDownloadTask = 1;
259     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
260     /**
261      * @tc.steps: step2. Insert cloud data
262      * @tc.expected: step2. ok
263      */
264     const int cloudCount = 20;
265     auto schema = GetSchema();
266     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
267     /**
268      * @tc.steps: step3. Begin download first, block async task
269      * @tc.expected: step3. ok
270      */
271     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
272     int finishCount = 0;
273     std::mutex finishMutex;
274     std::condition_variable cv;
__anonc27c8c720202(void *) 275     auto finishAction = [&finishCount, &finishMutex, &cv](void *) {
276         std::lock_guard<std::mutex> autoLock(finishMutex);
277         finishCount++;
278         cv.notify_all();
279     };
280     auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
281     ASSERT_EQ(errCode, E_OK);
282     ASSERT_EQ(listener, nullptr);
283     ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
284     std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
285     ASSERT_EQ(errCode, -E_MAX_LIMITS);
286     ASSERT_NE(listener, nullptr);
287     /**
288      * @tc.steps: step4. Async cloud data
289      * @tc.expected: step4. ok and async task still one
290      */
291     CloudSyncOption option = GetAsyncCloudSyncOption();
292     RelationalTestUtils::CloudBlockSync(option, delegate_);
293     EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
294     /**
295      * @tc.steps: step5. Notify async task finish
296      * @tc.expected: step5. wait util another async task finish
297      */
298     manager->FinishDownload();
299     std::unique_lock uniqueLock(finishMutex);
__anonc27c8c720302() 300     auto res = cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finishCount]() {
301         return finishCount >= 2; // 2 async task
302     });
303     EXPECT_TRUE(res);
304     listener->Drop(true);
305 }
306 
307 /**
308  * @tc.name: AsyncDownloadAssetConfig003
309  * @tc.desc: Test asyncDownloadAssets and compensatedSyncOnly both true.
310  * @tc.type: FUNC
311  * @tc.require:
312  * @tc.author: tankaisheng
313  */
314 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig003, TestSize.Level0)
315 {
316     /**
317      * @tc.steps: step1. Insert cloud data
318      * @tc.expected: step1. ok
319      */
320     const int cloudCount = 10;
321     auto schema = GetSchema();
322     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
323     /**
324      * @tc.steps: step2. set compensatedSyncOnly true and sync return NOT_SUPPORT.
325      * @tc.expected: step2. NOT_SUPPORT
326      */
327     CloudSyncOption option = GetAsyncCloudSyncOption();
328     option.compensatedSyncOnly = true;
329     DBStatus result = delegate_->Sync(option, nullptr);
330     EXPECT_EQ(result, NOT_SUPPORT);
331 }
332 
333 /**
334  * @tc.name: FinishListener001
335  * @tc.desc: Test listen download finish event.
336  * @tc.type: FUNC
337  * @tc.require:
338  * @tc.author: zqq
339  */
340 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, FinishListener001, TestSize.Level0)
341 {
342     /**
343      * @tc.steps: step1. Begin download first time
344      * @tc.expected: step1.ok
345      */
346     AssetsDownloadManager manager;
347     std::atomic<bool> finished = false;
__anonc27c8c720402(void *) 348     auto finishAction = [&finished](void *) {
349         EXPECT_TRUE(finished);
350     };
351     auto [errCode, listener] = manager.BeginDownloadWithListener(finishAction);
352     ASSERT_EQ(errCode, E_OK);
353     ASSERT_EQ(listener, nullptr);
354     /**
355      * @tc.steps: step2. Begin download twice
356      * @tc.expected: step2. -E_MAX_LIMITS because default one task
357      */
358     std::tie(errCode, listener) = manager.BeginDownloadWithListener(finishAction);
359     EXPECT_EQ(errCode, -E_MAX_LIMITS);
360     EXPECT_NE(listener, nullptr);
361     /**
362      * @tc.steps: step3. Finish download
363      * @tc.expected: step3. finished is true in listener
364      */
365     finished = true;
366     manager.FinishDownload();
367     listener->Drop(true);
368 }
369 
370 /**
371  * @tc.name: AsyncComplexDownload001
372  * @tc.desc: Test complex async download.
373  * @tc.type: FUNC
374  * @tc.require:
375  * @tc.author: zqq
376  */
377 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload001, TestSize.Level0)
378 {
379     /**
380      * @tc.steps: step1. Set max download task 1
381      * @tc.expected: step1. ok
382      */
383     AsyncDownloadAssetsConfig config;
384     config.maxDownloadTask = 1;
385     config.maxDownloadAssetsCount = 1;
386     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
387     /**
388      * @tc.steps: step2. Insert cloud data
389      * @tc.expected: step2. ok
390      */
391     const int cloudCount = 10;
392     auto schema = GetSchema();
393     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
394     /**
395      * @tc.steps: step3. Async cloud data
396      * @tc.expected: step3. ok
397      */
398     CloudSyncOption option = GetAsyncCloudSyncOption();
399     RelationalTestUtils::CloudBlockSync(option, delegate_);
400     /**
401      * @tc.steps: step3. Block download cloud data
402      * @tc.expected: step3. ok
403      */
404     option.asyncDownloadAssets = false;
405     RelationalTestUtils::CloudBlockSync(option, delegate_);
406 }
407 
408 /**
409  * @tc.name: AsyncComplexDownload002
410  * @tc.desc: Test complex async download.
411  * @tc.type: FUNC
412  * @tc.require:
413  * @tc.author: zqq
414  */
415 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload002, TestSize.Level0)
416 {
417     /**
418      * @tc.steps: step1. Set max download task 1
419      * @tc.expected: step1. ok
420      */
421     AsyncDownloadAssetsConfig config;
422     config.maxDownloadTask = 1;
423     config.maxDownloadAssetsCount = 1;
424     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
425     /**
426      * @tc.steps: step2. Insert cloud data
427      * @tc.expected: step2. ok
428      */
429     const int cloudCount = 10;
430     auto schema = GetSchema();
431     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
432     /**
433      * @tc.steps: step3. Complex cloud data
434      * @tc.expected: step3. ok
435      */
436     CloudSyncOption option = GetAsyncCloudSyncOption();
437     for (int i = 0; i < 10; ++i) { // loop 10 times
438         option.asyncDownloadAssets = false;
439         RelationalTestUtils::CloudBlockSync(option, delegate_);
440         option.asyncDownloadAssets = true;
441         RelationalTestUtils::CloudBlockSync(option, delegate_);
442     }
443 }
444 
445 /**
446  * @tc.name: AsyncAbnormalDownload001
447  * @tc.desc: Test abnormal async download.
448  * @tc.type: FUNC
449  * @tc.require:
450  * @tc.author: zqq
451  */
452 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload001, TestSize.Level4)
453 {
454     /**
455      * @tc.steps: step1. Set max download task 1
456      * @tc.expected: step1. ok
457      */
458     AsyncDownloadAssetsConfig config;
459     config.maxDownloadTask = 1;
460     config.maxDownloadAssetsCount = 1;
461     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
462     /**
463      * @tc.steps: step2. Insert cloud data
464      * @tc.expected: step2. ok
465      */
466     const int cloudCount = 10;
467     auto schema = GetSchema();
468     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
469     /**
470      * @tc.steps: step3. Fork download abnormal
471      */
472     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
473     /**
474      * @tc.steps: step4. Async cloud data
475      * @tc.expected: step4. ok
476      */
477     CloudSyncOption option = GetAsyncCloudSyncOption();
478     RelationalTestUtils::CloudBlockSync(option, delegate_);
479     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
480     EXPECT_EQ(status, OK);
481     EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
482     EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
483     std::this_thread::sleep_for(std::chrono::seconds(1));
484     /**
485      * @tc.steps: step5. Async cloud data with download ok
486      * @tc.expected: step5. ok
487      */
488     virtualAssetLoader_->SetDownloadStatus(OK);
489     LOGW("set download ok");
490     int count = 0;
491     std::mutex countMutex;
492     std::condition_variable cv;
493     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c720502(const std::string &tableName, std::map<std::string, Assets> &) 494         std::map<std::string, Assets> &) {
495         std::lock_guard<std::mutex> autoLock(countMutex);
496         count++;
497         if (count == 1) {
498             std::this_thread::sleep_for(std::chrono::seconds(1));
499         }
500         cv.notify_all();
501     });
502     RelationalTestUtils::CloudBlockSync(option, delegate_);
503     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c720602() 504     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
505         return count >= cloudCount;
506     });
507     std::this_thread::sleep_for(std::chrono::seconds(1));
508     std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
509     EXPECT_EQ(status, OK);
510     EXPECT_EQ(downloadCount, 0);
511     virtualAssetLoader_->ForkDownload(nullptr);
512 }
513 
514 /**
515  * @tc.name: AsyncAbnormalDownload002
516  * @tc.desc: Test abnormal sync download.
517  * @tc.type: FUNC
518  * @tc.require:
519  * @tc.author: zqq
520  */
521 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload002, TestSize.Level0)
522 {
523     /**
524      * @tc.steps: step1. Insert cloud data
525      * @tc.expected: step1. ok
526      */
527     const int cloudCount = 10;
528     auto schema = GetSchema();
529     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
530     /**
531      * @tc.steps: step2. Fork download abnormal
532      */
533     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
534     /**
535      * @tc.steps: step3. Sync cloud data
536      * @tc.expected: step3. DB_ERROR and not exist downloading assets count
537      */
538     CloudSyncOption option = GetAsyncCloudSyncOption();
539     option.asyncDownloadAssets = false;
540     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, CLOUD_ERROR);
541     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
542     EXPECT_EQ(status, OK);
543     EXPECT_EQ(downloadCount, 0);
544     virtualAssetLoader_->SetDownloadStatus(OK);
545 }
546 
547 /**
548  * @tc.name: AsyncAbnormalDownload003
549  * @tc.desc: Test abnormal async download.
550  * @tc.type: FUNC
551  * @tc.require:
552  * @tc.author: zqq
553  */
554 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload003, TestSize.Level4)
555 {
556     /**
557      * @tc.steps: step1. Set max download task 1
558      * @tc.expected: step1. ok
559      */
560     AsyncDownloadAssetsConfig config;
561     config.maxDownloadTask = 1;
562     config.maxDownloadAssetsCount = 4; // 1 record has 2 asset, config 1 batch has 2 record
563     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
564     /**
565      * @tc.steps: step2. Insert cloud data
566      * @tc.expected: step2. ok
567      */
568     const int cloudCount = 5;
569     auto schema = GetSchema();
570     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
571     /**
572      * @tc.steps: step3. Fork download abnormal
573      */
574     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
575     /**
576      * @tc.steps: step4. Sync cloud data
577      * @tc.expected: step4. DB_ERROR and exist downloading assets count
578      */
579     CloudSyncOption option = GetAsyncCloudSyncOption();
580     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
581     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
582     EXPECT_EQ(status, OK);
583     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
584     CheckInconsistentCount(db_, 5);
585     std::this_thread::sleep_for(std::chrono::seconds(1));
586     /**
587      * @tc.steps: step5. Sync cloud data again and block upload
588      * @tc.expected: step5. DB_ERROR and exist downloading assets count
589      */
590     EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
591     virtualAssetLoader_->SetDownloadStatus(OK);
__anonc27c8c720702(const std::string &, VBucket &) 592     virtualCloudDb_->ForkUpload([delegate = delegate_](const std::string &, VBucket &) {
593         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
594         auto [ret, count] = delegate->GetDownloadingAssetsCount();
595         EXPECT_EQ(ret, OK);
596         EXPECT_EQ(count, 0);
597     });
598     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
599     virtualAssetLoader_->ForkBatchDownload(nullptr);
600 }
601 
602 /**
603  * @tc.name: AsyncAbnormalDownload004
604  * @tc.desc: Test update trigger retain 0x1000 flag.
605  * @tc.type: FUNC
606  * @tc.require:
607  * @tc.author: zqq
608  */
609 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload004, TestSize.Level0)
610 {
611     /**
612      * @tc.steps: step1. Set async config
613      * @tc.expected: step1. ok
614      */
615     AsyncDownloadAssetsConfig config;
616     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
617     /**
618      * @tc.steps: step2. Insert cloud data
619      * @tc.expected: step2. ok
620      */
621     const int cloudCount = 1;
622     auto schema = GetSchema();
623     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
624     /**
625      * @tc.steps: step3. Fork download abnormal
626      */
627     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
628     /**
629      * @tc.steps: step4. Sync cloud data
630      * @tc.expected: step4. DB_ERROR and exist downloading assets count
631      */
632     CloudSyncOption option = GetAsyncCloudSyncOption();
633     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
634     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
635     EXPECT_EQ(status, OK);
636     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
637     /**
638      * @tc.steps: step5. Update local data
639      * @tc.expected: step5. Exist downloading assets count
640      */
641     EXPECT_EQ(RDBDataGenerator::UpsertLocalDBData(0, cloudCount, db_,
642         GetTableSchema("AsyncDownloadAssetsTest", true)), OK);
643     std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
644     EXPECT_EQ(status, OK);
645     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
646 }
647 
648 /**
649  * @tc.name: AsyncAbnormalDownload005
650  * @tc.desc: Test download assets which was locked
651  * @tc.type: FUNC
652  * @tc.require:
653  * @tc.author: liaoyonghuang
654  */
655 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload005, TestSize.Level1)
656 {
657     /**
658      * @tc.steps: step1. Set async config
659      * @tc.expected: step1. ok
660      */
661     AsyncDownloadAssetsConfig config;
662     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
663     /**
664      * @tc.steps: step2. Init data
665      * @tc.expected: step2. ok
666      */
667     const int cloudCount = 10;
668     auto schema = GetSchema();
669     ASSERT_TRUE(!schema.tables.empty());
670     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
671     CloudSyncOption option = GetAsyncCloudSyncOption();
672     option.asyncDownloadAssets = false;
673     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
674     /**
675      * @tc.steps: step3. Update cloud data and lock local data
676      * @tc.expected: step3. ok
677      */
678     auto [records, extends] =
679         RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, schema.tables.front().fields);
680     Asset asset = {.name = "asset_1", .hash = "new_hash"};
681     for (auto &record : records) {
682         record.insert_or_assign("asset_1", asset);
683     }
684     std::string table = schema.tables.front().name;
685     EXPECT_EQ(virtualCloudDb_->BatchUpdate(table, std::move(records), extends), OK);
686     virtualAssetLoader_->ForkDownload([&](const std::string &tableName,
__anonc27c8c720802(const std::string &tableName, std::map<std::string, Assets> &) 687         std::map<std::string, Assets> &) {
688         std::vector<std::vector<uint8_t>> hashKey;
689         CloudDBSyncUtilsTest::GetHashKey(table, "data_key < 5", db_, hashKey); // lock half of the data
690         EXPECT_EQ(Lock(table, hashKey, db_), OK);
691     });
692     /**
693      * @tc.steps: step4. Sync and check data
694      * @tc.expected: step4. ok
695      */
696     option.asyncDownloadAssets = true;
697     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
698     std::this_thread::sleep_for(std::chrono::seconds(1));
699     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
700     EXPECT_EQ(status, OK);
701     EXPECT_EQ(downloadCount, cloudCount / 2); // half of the data was not downloaded due to being locked
702     virtualAssetLoader_->ForkDownload(nullptr);
703 }
704 
705 /**
706  * @tc.name: AsyncNormalDownload001
707  * @tc.desc: Test abnormal async download.
708  * @tc.type: FUNC
709  * @tc.require:
710  * @tc.author: zqq
711  */
712 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload001, TestSize.Level0)
713 {
714     /**
715      * @tc.steps: step1. Register observer
716      * @tc.expected: step1. ok
717      */
718     auto rdbObserver = new(std::nothrow) RelationalStoreObserverUnitTest();
719     ASSERT_NE(rdbObserver, nullptr);
__anonc27c8c720902() 720     ResFinalizer resFinalizer([rdbObserver, this]() {
721         delegate_->UnRegisterObserver(rdbObserver);
722         delete rdbObserver;
723     });
724     EXPECT_EQ(delegate_->RegisterObserver(rdbObserver), OK);
725     /**
726      * @tc.steps: step2. Insert cloud data
727      * @tc.expected: step2. ok
728      */
729     const int cloudCount = 1;
730     auto schema = GetSchema();
731     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
732     EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
733     /**
734      * @tc.steps: step3. Block upload while async donwload asset
735      * @tc.expected: step3. Sync ok
736      */
737     auto hook = RelationalTestUtils::GetRDBStorageHook(USER_ID, APP_ID, STORE_ID_1, storePath_);
738     ASSERT_NE(hook, nullptr);
__anonc27c8c720a02() 739     hook->SetBeforeUploadTransaction([]() {
740         int count = 1;
741         const int maxLoop = 5;
742         do {
743             std::this_thread::sleep_for(std::chrono::seconds(1));
744             count++;
745         } while (RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetCurrentDownloadCount() > 0 &&
746             count < maxLoop);
747         LOGW("AsyncNormalDownload001 End hook");
748     });
749     CloudSyncOption option = GetAsyncCloudSyncOption();
750     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
751     EXPECT_TRUE(rdbObserver->IsAssetChange(GetTableSchema("AsyncDownloadAssetsTest").name));
752     hook->SetBeforeUploadTransaction(nullptr);
753 }
754 
755 /**
756  * @tc.name: AsyncNormalDownload002
757  * @tc.desc: Test sync download when download task pool is full
758  * @tc.type: FUNC
759  * @tc.require:
760  * @tc.author: lhy
761  */
762 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload002, TestSize.Level4)
763 {
764     /**
765      * @tc.steps: step1. Set max download task 1
766      * @tc.expected: step1. Ok
767      */
768     AsyncDownloadAssetsConfig config;
769     config.maxDownloadTask = 1;
770     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
771     /**
772      * @tc.steps: step2. Insert cloud data
773      * @tc.expected: step2. Ok
774      */
775     const int cloudCount = 1;
776     auto schema = GetSchema();
777     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
778     /**
779      * @tc.steps: step3. Fork download abnormal
780      */
781     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
782     /**
783      * @tc.steps: step4. Async cloud data, with abnormal result
784      * @tc.expected: step4. Ok
785      */
786     CloudSyncOption option = GetAsyncCloudSyncOption();
787     RelationalTestUtils::CloudBlockSync(option, delegate_);
788     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
789     EXPECT_EQ(status, OK);
790     EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
791     EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
792     /**
793      * @tc.steps: step5. Wait for failed download to finish
794      * @tc.expected: step5. Download task changes from 1 to 0
795      */
796     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
797     EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
798     std::this_thread::sleep_for(std::chrono::seconds(1));
799     EXPECT_EQ(manager->GetCurrentDownloadCount(), 0u);
800     /**
801      * @tc.steps: step6. Start a new download task to reach maxDownloadTask
802      * @tc.expected: step6. 1 task is downloading
803      */
804     auto [errCode, listener] = manager->BeginDownloadWithListener(nullptr);
805     ASSERT_EQ(errCode, E_OK);
806     ASSERT_EQ(listener, nullptr);
807     ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
808     /**
809      * @tc.steps: step7. Set download status to ok then try sync while task pool is full
810      * @tc.expected: step7. Download should be waiting instead of doing compensated sync
811      */
812     virtualAssetLoader_->SetDownloadStatus(OK);
813     LOGW("set download ok");
814     int count = 0;
815     std::mutex countMutex;
816     std::condition_variable cv;
817     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c720b02(const std::string &tableName, std::map<std::string, Assets> &) 818         std::map<std::string, Assets> &) {
819         std::lock_guard<std::mutex> autoLock(countMutex);
820         count++;
821         if (count == 1) {
822             std::this_thread::sleep_for(std::chrono::seconds(1));
823         }
824         cv.notify_all();
825     });
826     RelationalTestUtils::CloudBlockSync(option, delegate_);
827     std::this_thread::sleep_for(std::chrono::seconds(1));
828     EXPECT_EQ(count, 0);
829     /**
830      * @tc.steps: step8. Finish the download task, to let sync continue
831      * @tc.expected: step8. Only 1 actural download through virtualAssetLoader_
832      */
833     manager->FinishDownload();
834     std::this_thread::sleep_for(std::chrono::seconds(1));
835     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c720c02() 836     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
837         return count >= cloudCount;
838     });
839     EXPECT_EQ(count, 1);
840     virtualAssetLoader_->ForkDownload(nullptr);
841 }
842 
843 /**
844  * @tc.name: AsyncNormalDownload003
845  * @tc.desc: Test:async asset task paused
846  * @tc.type: FUNC
847  * @tc.require:
848  * @tc.author: bty
849  */
850 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload003, TestSize.Level4)
851 {
852     /**
853      * @tc.steps: step1. Set max download task 1
854      * @tc.expected: step1. Ok
855      */
856     AsyncDownloadAssetsConfig config;
857     config.maxDownloadTask = 1;
858     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
859     /**
860      * @tc.steps: step2. Insert cloud data
861      * @tc.expected: step2. Ok
862      */
863     const int cloudCount = 1;
864     auto schema = GetSchema();
865     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
866     /**
867      * @tc.steps: step3. async asset task submit
868      * @tc.expected: step3. Ok
869      */
870     int count = 0;
871     int expQueryTimes = 3;
872     std::mutex mutex;
873     std::condition_variable cond;
__anonc27c8c720d02(const std::string &, VBucket &extend) 874     virtualCloudDb_->ForkQuery([&count, &cond, expQueryTimes](const std::string &, VBucket &extend) {
875         count++;
876         if (count == 1) {
877             cond.notify_all();
878             std::this_thread::sleep_for(std::chrono::seconds(1));
879         }
880         if (count == expQueryTimes) {
881             std::string cursor;
882             CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
883             EXPECT_EQ(cursor, std::string("1"));
884         }
885     });
__anonc27c8c720e02null886     std::thread t1([this]{
887         CloudSyncOption option = GetAsyncCloudSyncOption();
888         EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
889     });
890     /**
891      * @tc.steps: step4. wait for async task query
892      * @tc.expected: step4. Ok
893      */
894     {
895         std::unique_lock<std::mutex> lock(mutex);
__anonc27c8c720f02() 896         (void)cond.wait_for(lock, std::chrono::seconds(1), [&count]() {
897             return count == 1;
898         });
899     }
900     /**
901      * @tc.steps: step5. priority task submit
902      * @tc.expected: step5. Ok
903      */
904     CloudSyncOption priOption = GetAsyncCloudSyncOption();
905     std::vector<int64_t> inValue = {3, 4};
906     priOption.priorityTask = true;
907     priOption.query = Query::Select().From("AsyncDownloadAssetsTest").In("pk", inValue);
908     priOption.asyncDownloadAssets = false;
909     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(priOption, delegate_));
910     t1.join();
911     virtualCloudDb_->ForkQuery(nullptr);
912     EXPECT_EQ(count, expQueryTimes);
913 }
914 
915 /**
916  * @tc.name: AsyncNormalDownload004
917  * @tc.desc: Test multiple tables and multiple batches of asset downloads
918  * @tc.type: FUNC
919  * @tc.require:
920  * @tc.author: liaoyonghuang
921  */
922 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload004, TestSize.Level1)
923 {
924     /**
925      * @tc.steps: step1. Set max download task 1
926      * @tc.expected: step1. Ok
927      */
928     AsyncDownloadAssetsConfig config;
929     config.maxDownloadAssetsCount = 25;
930     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
931     /**
932      * @tc.steps: step2. Insert cloud data
933      * @tc.expected: step2. Ok
934      */
935     const int cloudCount = 100;
936     std::string table1 = "TABLE1";
937     std::string table2 = "TABLE2";
938     DataBaseSchema schema;
939     schema.tables.push_back(GetTableSchema(table1));
940     schema.tables.push_back(GetTableSchema(table2));
941     EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
942     auto [record1, extend1] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, GetTableSchema(table1).fields);
943     EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table1, std::move(record1), extend1), OK);
944     auto [record2, extend2] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, cloudCount,
945         GetTableSchema(table2).fields);
946     EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table2, std::move(record2), extend2), OK);
947     /**
948      * @tc.steps: step3. async asset task submit
949      * @tc.expected: step3. Ok
950      */
951     int assetsDownloadTime = 0;
952     virtualAssetLoader_->ForkDownload([&table1, &table2, &assetsDownloadTime](const std::string &tableName,
__anonc27c8c721002(const std::string &tableName, std::map<std::string, Assets> &) 953         std::map<std::string, Assets> &) {
954         if (assetsDownloadTime < 100) { // 100 assets
955             EXPECT_EQ(tableName, table1);
956         } else {
957             EXPECT_EQ(tableName, table2);
958         }
959         assetsDownloadTime++;
960     });
961     CloudSyncOption option;
962     option.devices = {"cloud"};
963     option.asyncDownloadAssets = true;
964     Query query = Query::Select().FromTable({table1, table2});
965     option.query = query;
966     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
967     std::this_thread::sleep_for(std::chrono::seconds(5));
968     EXPECT_EQ(assetsDownloadTime, 200);
969     virtualAssetLoader_->ForkDownload(nullptr);
970 }
971 
972 /**
973  * @tc.name: AsyncAbnormalDownload006
974  * @tc.desc: Test abnormal async download.
975  * @tc.type: FUNC
976  * @tc.require:
977  * @tc.author: suyue
978  */
979 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload006, TestSize.Level4)
980 {
981     /**
982      * @tc.steps: step1. Set config and insert 70 cloud data
983      * @tc.expected: step1. ok
984      */
985     AsyncDownloadAssetsConfig config;
986     config.maxDownloadTask = 5;
987     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
988     const int cloudCount = 70;
989     auto schema = GetSchema();
990     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
991 
992     /**
993      * @tc.steps: step2. Fork download abnormal for 0-10 records
994      * @tc.expected: step2. ok
995      */
996     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
997     uint32_t failNum = 10;
998     const DownloadFailRange setRange = {.isAllFail = false, .failBeginIndex = 0, .failEndIndex = failNum};
999     virtualAssetLoader_->SetDownloadFailRange(setRange);
1000 
1001     /**
1002      * @tc.steps: step3. Async cloud data
1003      * @tc.expected: step3. ok
1004      */
1005     int count = 0;
1006     std::mutex countMutex;
1007     std::condition_variable cv;
1008     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonc27c8c721102(const std::string &tableName, std::map<std::string, Assets> &) 1009         std::map<std::string, Assets> &) {
1010         std::lock_guard<std::mutex> autoLock(countMutex);
1011         count++;
1012         if (count == 1) {
1013             std::this_thread::sleep_for(std::chrono::seconds(1));
1014         }
1015         cv.notify_all();
1016     });
1017 
1018     CloudSyncOption option = GetAsyncCloudSyncOption();
1019     RelationalTestUtils::CloudBlockSync(option, delegate_);
1020     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonc27c8c721202() 1021     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
1022         return count >= cloudCount;
1023     });
1024     std::this_thread::sleep_for(std::chrono::seconds(1));
1025     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1026     EXPECT_EQ(status, OK);
1027     EXPECT_EQ(downloadCount, static_cast<int32_t>(failNum * 2)); // 1 record has 2 asset
1028 
1029     virtualAssetLoader_->SetDownloadStatus(OK);
1030     virtualAssetLoader_->Reset();
1031     virtualAssetLoader_->ForkDownload(nullptr);
1032 }
1033 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,int32_t begin,int32_t end)1034 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalData(sqlite3 *&db,
1035     const std::string &tableName, int32_t begin, int32_t end)
1036 {
1037     const string sql = "update " + tableName + " set int_field = int_field+1 " + "where pk>=" + std::to_string(begin) +
1038         " and pk<=" + std::to_string(end) + ";";
1039     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1040     LOGW("update local data finished");
1041 }
1042 
1043 /**
1044  * @tc.name: AsyncAbnormalDownload007
1045  * @tc.desc: Test assets is downloading then update local data can upload
1046  * @tc.type: FUNC
1047  * @tc.require:
1048  * @tc.author: tankaisheng
1049  */
1050 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload007, TestSize.Level1)
1051 {
1052     /**
1053      * @tc.steps: step1. Set async config
1054      * @tc.expected: step1. ok
1055      */
1056     AsyncDownloadAssetsConfig config;
1057     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1058     /**
1059      * @tc.steps: step2. Init data and sync
1060      * @tc.expected: step2. ok
1061      */
1062     const int cloudCount = 10;
1063     auto schema = GetSchema();
1064     ASSERT_TRUE(!schema.tables.empty());
1065     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1066     CloudSyncOption option = GetAsyncCloudSyncOption();
1067     option.asyncDownloadAssets = false;
1068     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
1069     /**
1070      * @tc.steps: step3. update cloud data
1071      * @tc.expected: step3. ok
1072      */
1073     std::vector<VBucket> record;
1074     std::vector<VBucket> extend;
1075     Timestamp now = TimeHelper::GetSysCurrentTime();
1076     VBucket data;
1077     data.insert_or_assign("pk", 10L);
1078     data.insert_or_assign("int_field", 11L);
1079     Assets assets = {{.name = "new_asset_1", .hash = "new_hash"}};
1080     data.insert_or_assign("assets_1", assets);
1081     record.push_back(data);
1082     VBucket log;
1083     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
1084         now / CloudDbConstant::TEN_THOUSAND));
1085     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
1086         now / CloudDbConstant::TEN_THOUSAND));
1087     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1088     extend.push_back(log);
1089     std::string table = schema.tables.front().name;
1090     EXPECT_EQ(virtualCloudDb_->BatchInsert(table, std::move(record), extend), OK);
1091 
1092     /**
1093      * @tc.steps: step4. Sync and update local data then check upload count
1094      * @tc.expected: step4. ok
1095      */
1096     option.asyncDownloadAssets = true;
1097     std::atomic<bool> syncInProgress(true);
__anonc27c8c721302() 1098     std::thread syncThread([&]() {
1099         RelationalTestUtils::CloudBlockSync(option, delegate_);
1100         syncInProgress.store(false);
1101     });
1102 
1103     std::this_thread::sleep_for(std::chrono::milliseconds(500));
1104     UpdateLocalData(db_, "AsyncDownloadAssetsTest", 0, cloudCount);
1105 
1106     int count = 10;
__anonc27c8c721402(const std::string&, VBucket&) 1107     virtualCloudDb_->ForkUpload([&count, delegate = delegate_](const std::string&, VBucket&) {
1108         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
1109         auto [ret, count] = delegate->GetDownloadingAssetsCount();
1110         EXPECT_EQ(ret, OK);
1111         EXPECT_EQ(count, 0);
1112     });
1113     syncThread.join();
1114 }
1115 }