• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 
17 #include "cloud/assets_download_manager.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/mock_icloud_sync_storage_interface.h"
20 #include "cloud/virtual_asset_loader.h"
21 #include "cloud/virtual_cloud_data_translate.h"
22 #include "cloud/virtual_cloud_syncer.h"
23 #include "cloud_db_sync_utils_test.h"
24 #include "distributeddb_data_generate_unit_test.h"
25 #include "distributeddb_tools_unit_test.h"
26 #include "res_finalizer.h"
27 #include "rdb_data_generator.h"
28 #include "relational_store_client.h"
29 #include "relational_store_manager.h"
30 #include "runtime_config.h"
31 #include "virtual_communicator_aggregator.h"
32 
33 using namespace testing::ext;
34 using namespace DistributedDB;
35 using namespace DistributedDBUnitTest;
36 using namespace std;
37 
38 namespace {
39 string g_testDir;
40 const std::string QUERY_INCONSISTENT_SQL =
41     "select count(*) from naturalbase_rdb_aux_AsyncDownloadAssetsTest_log where flag&0x20!=0;";
42 IRelationalStore *g_store = nullptr;
43 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
44 RelationalStoreManager g_mgr(APP_ID, USER_ID);
45 typedef struct SkipAssetTestParam {
46     DBStatus downloadRes;
47     bool useBatch;
48     bool useAsync;
49     int startIndex;
50     int expectInconsistentCount;
51     DBStatus expectSyncRes;
52 } SkipAssetTestParamT;
53 class DistributedDBCloudAsyncDownloadAssetsTest : public testing::Test {
54 public:
55     static void SetUpTestCase();
56     static void TearDownTestCase();
57     void SetUp() override;
58     void TearDown() override;
59 protected:
60     static DataBaseSchema GetSchema(bool multiTables = false);
61     static TableSchema GetTableSchema(const std::string &tableName, bool withoutAsset = false);
62     static CloudSyncOption GetAsyncCloudSyncOption();
63     static int GetAssetFieldCount();
64     void InitStore();
65     void CloseDb();
66     void DoSkipAssetDownload(SkipAssetTestParamT param);
67     void UpdateLocalData(sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end);
68     void DeleteLocalData(sqlite3 *&db, const std::string &tableName);
69     void CheckLogTable(sqlite3 *&db, const std::string &tableName, int count);
70     void UpdateLocalAndCheckUploadCount(const bool &isAsync, const int &dataCount, const int &expectCount);
71     std::string storePath_;
72     sqlite3 *db_ = nullptr;
73     RelationalStoreDelegate *delegate_ = nullptr;
74     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
75     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
76     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
77 };
78 
SetUpTestCase()79 void DistributedDBCloudAsyncDownloadAssetsTest::SetUpTestCase()
80 {
81     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
83         LOGE("rm test db files error!");
84     }
85 }
86 
TearDownTestCase()87 void DistributedDBCloudAsyncDownloadAssetsTest::TearDownTestCase()
88 {
89     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
90         LOGE("rm test db files error!");
91     }
92 }
93 
SetUp()94 void DistributedDBCloudAsyncDownloadAssetsTest::SetUp()
95 {
96     DistributedDBToolsUnitTest::PrintTestCaseInfo();
97     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
98     InitStore();
99     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
100     ASSERT_TRUE(communicatorAggregator_ != nullptr);
101     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
102 }
103 
TearDown()104 void DistributedDBCloudAsyncDownloadAssetsTest::TearDown()
105 {
106     RefObject::DecObjRef(g_store);
107     CloseDb();
108     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
109         LOGE("rm test db files error.");
110     }
111     virtualCloudDb_ = nullptr;
112     virtualAssetLoader_ = nullptr;
113     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
114     communicatorAggregator_ = nullptr;
115 }
116 
GetSchema(bool multiTables)117 DataBaseSchema DistributedDBCloudAsyncDownloadAssetsTest::GetSchema(bool multiTables)
118 {
119     DataBaseSchema schema;
120     schema.tables.push_back(GetTableSchema("AsyncDownloadAssetsTest"));
121     if (multiTables) {
122         schema.tables.push_back(GetTableSchema("TABLE1"));
123         schema.tables.push_back(GetTableSchema("TABLE2"));
124     }
125     return schema;
126 }
127 
GetTableSchema(const std::string & tableName,bool withoutAsset)128 TableSchema DistributedDBCloudAsyncDownloadAssetsTest::GetTableSchema(const std::string &tableName, bool withoutAsset)
129 {
130     TableSchema tableSchema;
131     tableSchema.name = tableName;
132     Field field;
133     field.primary = true;
134     field.type = TYPE_INDEX<int64_t>;
135     field.colName = "pk";
136     tableSchema.fields.push_back(field);
137     field.primary = false;
138     field.colName = "int_field";
139     tableSchema.fields.push_back(field);
140     if (withoutAsset) {
141         return tableSchema;
142     }
143     field.type = TYPE_INDEX<Assets>;
144     field.colName = "assets_1";
145     tableSchema.fields.push_back(field);
146     field.colName = "asset_1";
147     field.type = TYPE_INDEX<Asset>;
148     tableSchema.fields.push_back(field);
149     return tableSchema;
150 }
151 
GetAsyncCloudSyncOption()152 CloudSyncOption DistributedDBCloudAsyncDownloadAssetsTest::GetAsyncCloudSyncOption()
153 {
154     CloudSyncOption option;
155     std::vector<std::string> tables;
156     auto schema = GetSchema();
157     for (const auto &table : schema.tables) {
158         tables.push_back(table.name);
159         LOGW("[DistributedDBCloudAsyncDownloadAssetsTest] Sync with table %s", table.name.c_str());
160     }
161     option.devices = {"cloud"};
162     option.query = Query::Select().FromTable(tables);
163     option.mode = SYNC_MODE_CLOUD_MERGE;
164     option.asyncDownloadAssets = true;
165     return option;
166 }
167 
GetAssetFieldCount()168 int DistributedDBCloudAsyncDownloadAssetsTest::GetAssetFieldCount()
169 {
170     int count = 0;
171     auto schema = GetSchema();
172     for (const auto &table : schema.tables) {
173         for (const auto &field : table.fields) {
174             if (field.type == TYPE_INDEX<Assets> || field.type == TYPE_INDEX<Asset>) {
175                 count++;
176             }
177         }
178     }
179     return count;
180 }
181 
GetRelationalStore()182 const RelationalSyncAbleStorage *GetRelationalStore()
183 {
184     RelationalDBProperties properties;
185     CloudDBSyncUtilsTest::InitStoreProp(g_testDir + "/" + STORE_ID_1 + ".db", APP_ID, USER_ID, STORE_ID_1, properties);
186     int errCode = E_OK;
187     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
188     if (g_store == nullptr) {
189         LOGE("Get db failed:%d", errCode);
190         return nullptr;
191     }
192     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
193 }
194 
InitStore()195 void DistributedDBCloudAsyncDownloadAssetsTest::InitStore()
196 {
197     if (storePath_.empty()) {
198         storePath_ = g_testDir + "/" + STORE_ID_1 + ".db";
199     }
200     db_ = RelationalTestUtils::CreateDataBase(storePath_);
201     ASSERT_NE(db_, nullptr);
202     auto schema = GetSchema(true);
203     EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
204     ASSERT_EQ(g_mgr.OpenStore(storePath_, STORE_ID_1, {}, delegate_), OK);
205     ASSERT_NE(delegate_, nullptr);
206     for (const auto &table : schema.tables) {
207         EXPECT_EQ(delegate_->CreateDistributedTable(table.name, TableSyncType::CLOUD_COOPERATION), OK);
208         LOGI("[DistributedDBCloudAsyncDownloadAssetsTest] CreateDistributedTable %s", table.name.c_str());
209     }
210     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
211     ASSERT_NE(g_cloudStoreHook, nullptr);
212     virtualCloudDb_ = make_shared<VirtualCloudDb>();
213     ASSERT_NE(virtualCloudDb_, nullptr);
214     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
215     virtualAssetLoader_ = make_shared<VirtualAssetLoader>();
216     ASSERT_NE(virtualAssetLoader_, nullptr);
217     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
218     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
219 
220     ASSERT_EQ(delegate_->SetCloudDbSchema(schema), DBStatus::OK);
221 }
222 
CheckInconsistentCount(sqlite3 * db,int64_t expectCount)223 void CheckInconsistentCount(sqlite3 *db, int64_t expectCount)
224 {
225     EXPECT_EQ(sqlite3_exec(db, QUERY_INCONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
226         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
227 }
228 
CloseDb()229 void DistributedDBCloudAsyncDownloadAssetsTest::CloseDb()
230 {
231     if (db_ != nullptr) {
232         sqlite3_close_v2(db_);
233         db_ = nullptr;
234     }
235     if (delegate_ != nullptr) {
236         EXPECT_EQ(g_mgr.CloseStore(delegate_), OK);
237         delegate_ = nullptr;
238     }
239 }
240 
241 /**
242  * @tc.name: AsyncDownloadAssetConfig001
243  * @tc.desc: Test config with valid and invalid param.
244  * @tc.type: FUNC
245  * @tc.require:
246  * @tc.author: zqq
247  */
248 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig001, TestSize.Level0)
249 {
250     /**
251      * @tc.steps: step1. Set valid param
252      * @tc.expected: step1.ok
253      */
254     AsyncDownloadAssetsConfig config;
255     AssetsDownloadManager manager;
256     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
257     config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
258     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
259     config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS;
260     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), E_OK);
261 
262     /**
263      * @tc.steps: step2. Set invalid param
264      * @tc.expected: step2.invalid args
265      */
266     config.maxDownloadTask += 1u;
267     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
268     config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK;
269     config.maxDownloadAssetsCount += 1u;
270     EXPECT_EQ(manager.SetAsyncDownloadAssetsConfig(config), -E_INVALID_ARGS);
271 }
272 
273 /**
274  * @tc.name: AsyncDownloadAssetConfig002
275  * @tc.desc: Test config work correctly.
276  * @tc.type: FUNC
277  * @tc.require:
278  * @tc.author: zqq
279  */
280 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig002, TestSize.Level1)
281 {
282     /**
283      * @tc.steps: step1. Set valid param twice
284      * @tc.expected: step1. ok
285      */
286     AsyncDownloadAssetsConfig config;
287     config.maxDownloadTask = 10;
288     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
289     config.maxDownloadTask = 1;
290     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
291     /**
292      * @tc.steps: step2. Insert cloud data
293      * @tc.expected: step2. ok
294      */
295     const int cloudCount = 20;
296     auto schema = GetSchema();
297     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
298     /**
299      * @tc.steps: step3. Begin download first, block async task
300      * @tc.expected: step3. ok
301      */
302     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
303     int finishCount = 0;
304     std::mutex finishMutex;
305     std::condition_variable cv;
__anonb165af940202(void *) 306     auto finishAction = [&finishCount, &finishMutex, &cv](void *) {
307         std::lock_guard<std::mutex> autoLock(finishMutex);
308         finishCount++;
309         cv.notify_all();
310     };
311     auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
312     ASSERT_EQ(errCode, E_OK);
313     ASSERT_EQ(listener, nullptr);
314     ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
315     std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
316     ASSERT_EQ(errCode, -E_MAX_LIMITS);
317     ASSERT_NE(listener, nullptr);
318     /**
319      * @tc.steps: step4. Async cloud data
320      * @tc.expected: step4. ok and async task still one
321      */
322     CloudSyncOption option = GetAsyncCloudSyncOption();
323     RelationalTestUtils::CloudBlockSync(option, delegate_);
324     EXPECT_EQ(manager->GetCurrentDownloadCount(), 1u);
325     /**
326      * @tc.steps: step5. Notify async task finish
327      * @tc.expected: step5. wait util another async task finish
328      */
329     manager->FinishDownload();
330     std::unique_lock uniqueLock(finishMutex);
__anonb165af940302() 331     auto res = cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finishCount]() {
332         return finishCount >= 2; // 2 async task
333     });
334     EXPECT_TRUE(res);
335     listener->Drop(true);
336 }
337 
338 /**
339  * @tc.name: AsyncDownloadAssetConfig003
340  * @tc.desc: Test asyncDownloadAssets and compensatedSyncOnly both true.
341  * @tc.type: FUNC
342  * @tc.require:
343  * @tc.author: tankaisheng
344  */
345 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncDownloadAssetConfig003, TestSize.Level1)
346 {
347     /**
348      * @tc.steps: step1. Insert cloud data
349      * @tc.expected: step1. ok
350      */
351     const int cloudCount = 10;
352     auto schema = GetSchema();
353     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
354     /**
355      * @tc.steps: step2. set compensatedSyncOnly true and sync return NOT_SUPPORT.
356      * @tc.expected: step2. NOT_SUPPORT
357      */
358     CloudSyncOption option = GetAsyncCloudSyncOption();
359     option.compensatedSyncOnly = true;
360     DBStatus result = delegate_->Sync(option, nullptr);
361     EXPECT_EQ(result, NOT_SUPPORT);
362 }
363 
364 /**
365  * @tc.name: FinishListener001
366  * @tc.desc: Test listen download finish event.
367  * @tc.type: FUNC
368  * @tc.require:
369  * @tc.author: zqq
370  */
371 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, FinishListener001, TestSize.Level0)
372 {
373     /**
374      * @tc.steps: step1. Begin download first time
375      * @tc.expected: step1.ok
376      */
377     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
378     std::atomic<bool> finished = false;
__anonb165af940402(void *) 379     auto finishAction = [&finished](void *) {
380         EXPECT_TRUE(finished);
381     };
382     auto [errCode, listener] = manager->BeginDownloadWithListener(finishAction);
383     ASSERT_EQ(errCode, E_OK);
384     ASSERT_EQ(listener, nullptr);
385     /**
386      * @tc.steps: step2. Begin download twice
387      * @tc.expected: step2. -E_MAX_LIMITS because default one task
388      */
389     std::tie(errCode, listener) = manager->BeginDownloadWithListener(finishAction);
390     EXPECT_EQ(errCode, -E_MAX_LIMITS);
391     EXPECT_NE(listener, nullptr);
392     /**
393      * @tc.steps: step3. Finish download
394      * @tc.expected: step3. finished is true in listener
395      */
396     finished = true;
397     manager->FinishDownload();
398     listener->Drop(true);
399 }
400 
401 /**
402  * @tc.name: AsyncComplexDownload001
403  * @tc.desc: Test complex async download.
404  * @tc.type: FUNC
405  * @tc.require:
406  * @tc.author: zqq
407  */
408 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload001, TestSize.Level1)
409 {
410     /**
411      * @tc.steps: step1. Set max download task 1
412      * @tc.expected: step1. ok
413      */
414     AsyncDownloadAssetsConfig config;
415     config.maxDownloadTask = 1;
416     config.maxDownloadAssetsCount = 1;
417     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
418     /**
419      * @tc.steps: step2. Insert cloud data
420      * @tc.expected: step2. ok
421      */
422     const int cloudCount = 10;
423     auto schema = GetSchema();
424     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
425     /**
426      * @tc.steps: step3. Async cloud data
427      * @tc.expected: step3. ok
428      */
429     CloudSyncOption option = GetAsyncCloudSyncOption();
430     RelationalTestUtils::CloudBlockSync(option, delegate_);
431     /**
432      * @tc.steps: step3. Block download cloud data
433      * @tc.expected: step3. ok
434      */
435     option.asyncDownloadAssets = false;
436     RelationalTestUtils::CloudBlockSync(option, delegate_);
437 }
438 
439 /**
440  * @tc.name: AsyncComplexDownload002
441  * @tc.desc: Test complex async download.
442  * @tc.type: FUNC
443  * @tc.require:
444  * @tc.author: zqq
445  */
446 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncComplexDownload002, TestSize.Level1)
447 {
448     /**
449      * @tc.steps: step1. Set max download task 1
450      * @tc.expected: step1. ok
451      */
452     AsyncDownloadAssetsConfig config;
453     config.maxDownloadTask = 1;
454     config.maxDownloadAssetsCount = 1;
455     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
456     /**
457      * @tc.steps: step2. Insert cloud data
458      * @tc.expected: step2. ok
459      */
460     const int cloudCount = 10;
461     auto schema = GetSchema();
462     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
463     /**
464      * @tc.steps: step3. Complex cloud data
465      * @tc.expected: step3. ok
466      */
467     CloudSyncOption option = GetAsyncCloudSyncOption();
468     for (int i = 0; i < 10; ++i) { // loop 10 times
469         option.asyncDownloadAssets = false;
470         RelationalTestUtils::CloudBlockSync(option, delegate_);
471         option.asyncDownloadAssets = true;
472         RelationalTestUtils::CloudBlockSync(option, delegate_);
473     }
474 }
475 
476 /**
477  * @tc.name: AsyncAbnormalDownload001
478  * @tc.desc: Test abnormal async download.
479  * @tc.type: FUNC
480  * @tc.require:
481  * @tc.author: zqq
482  */
483 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload001, TestSize.Level4)
484 {
485     /**
486      * @tc.steps: step1. Set max download task 1
487      * @tc.expected: step1. ok
488      */
489     AsyncDownloadAssetsConfig config;
490     config.maxDownloadTask = 1;
491     config.maxDownloadAssetsCount = 1;
492     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
493     /**
494      * @tc.steps: step2. Insert cloud data
495      * @tc.expected: step2. ok
496      */
497     const int cloudCount = 10;
498     auto schema = GetSchema();
499     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
500     /**
501      * @tc.steps: step3. Fork download abnormal
502      */
503     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
504     /**
505      * @tc.steps: step4. Async cloud data
506      * @tc.expected: step4. ok
507      */
508     CloudSyncOption option = GetAsyncCloudSyncOption();
509     RelationalTestUtils::CloudBlockSync(option, delegate_);
510     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
511     EXPECT_EQ(status, OK);
512     EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
513     EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
514     std::this_thread::sleep_for(std::chrono::seconds(1));
515     /**
516      * @tc.steps: step5. Async cloud data with download ok
517      * @tc.expected: step5. ok
518      */
519     virtualAssetLoader_->SetDownloadStatus(OK);
520     LOGW("set download ok");
521     int count = 0;
522     std::mutex countMutex;
523     std::condition_variable cv;
524     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940502(const std::string &tableName, std::map<std::string, Assets> &) 525         std::map<std::string, Assets> &) {
526         std::lock_guard<std::mutex> autoLock(countMutex);
527         count++;
528         if (count == 1) {
529             std::this_thread::sleep_for(std::chrono::seconds(1));
530         }
531         cv.notify_all();
532     });
533     RelationalTestUtils::CloudBlockSync(option, delegate_);
534     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af940602() 535     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
536         return count >= cloudCount;
537     });
538     std::this_thread::sleep_for(std::chrono::seconds(1));
539     std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
540     EXPECT_EQ(status, OK);
541     EXPECT_EQ(downloadCount, 0);
542     virtualAssetLoader_->ForkDownload(nullptr);
543 }
544 
545 /**
546  * @tc.name: AsyncAbnormalDownload002
547  * @tc.desc: Test abnormal sync download.
548  * @tc.type: FUNC
549  * @tc.require:
550  * @tc.author: zqq
551  */
552 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload002, TestSize.Level1)
553 {
554     /**
555      * @tc.steps: step1. Insert cloud data
556      * @tc.expected: step1. ok
557      */
558     const int cloudCount = 10;
559     auto schema = GetSchema();
560     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
561     /**
562      * @tc.steps: step2. Fork download abnormal
563      */
564     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
565     /**
566      * @tc.steps: step3. Sync cloud data
567      * @tc.expected: step3. DB_ERROR and not exist downloading assets count
568      */
569     CloudSyncOption option = GetAsyncCloudSyncOption();
570     option.asyncDownloadAssets = false;
571     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, CLOUD_ERROR);
572     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
573     EXPECT_EQ(status, OK);
574     EXPECT_EQ(downloadCount, 0);
575     virtualAssetLoader_->SetDownloadStatus(OK);
576 }
577 
578 /**
579  * @tc.name: AsyncAbnormalDownload003
580  * @tc.desc: Test abnormal async download.
581  * @tc.type: FUNC
582  * @tc.require:
583  * @tc.author: zqq
584  */
585 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload003, TestSize.Level4)
586 {
587     /**
588      * @tc.steps: step1. Set max download task 1
589      * @tc.expected: step1. ok
590      */
591     AsyncDownloadAssetsConfig config;
592     config.maxDownloadTask = 1;
593     config.maxDownloadAssetsCount = 4; // 1 record has 2 asset, config 1 batch has 2 record
594     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
595     /**
596      * @tc.steps: step2. Insert cloud data
597      * @tc.expected: step2. ok
598      */
599     const int cloudCount = 5;
600     auto schema = GetSchema();
601     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
602     /**
603      * @tc.steps: step3. Fork download abnormal
604      */
605     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
606     /**
607      * @tc.steps: step4. Sync cloud data
608      * @tc.expected: step4. DB_ERROR and exist downloading assets count
609      */
610     CloudSyncOption option = GetAsyncCloudSyncOption();
611     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
612     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
613     EXPECT_EQ(status, OK);
614     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
615     CheckInconsistentCount(db_, 5);
616     std::this_thread::sleep_for(std::chrono::seconds(1));
617     /**
618      * @tc.steps: step5. Sync cloud data again and block upload
619      * @tc.expected: step5. DB_ERROR and exist downloading assets count
620      */
621     EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
622     virtualAssetLoader_->SetDownloadStatus(OK);
__anonb165af940702(const std::string &, VBucket &) 623     virtualCloudDb_->ForkUpload([delegate = delegate_](const std::string &, VBucket &) {
624         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
625         auto [ret, count] = delegate->GetDownloadingAssetsCount();
626         EXPECT_EQ(ret, OK);
627         EXPECT_EQ(count, 0);
628     });
629     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
630     virtualAssetLoader_->ForkBatchDownload(nullptr);
631 }
632 
633 /**
634  * @tc.name: AsyncAbnormalDownload004
635  * @tc.desc: Test update trigger retain 0x1000 flag.
636  * @tc.type: FUNC
637  * @tc.require:
638  * @tc.author: zqq
639  */
640 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload004, TestSize.Level1)
641 {
642     /**
643      * @tc.steps: step1. Set async config
644      * @tc.expected: step1. ok
645      */
646     AsyncDownloadAssetsConfig config;
647     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
648     /**
649      * @tc.steps: step2. Insert cloud data
650      * @tc.expected: step2. ok
651      */
652     const int cloudCount = 1;
653     auto schema = GetSchema();
654     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
655     /**
656      * @tc.steps: step3. Fork download abnormal
657      */
658     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
659     /**
660      * @tc.steps: step4. Sync cloud data
661      * @tc.expected: step4. DB_ERROR and exist downloading assets count
662      */
663     CloudSyncOption option = GetAsyncCloudSyncOption();
664     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
665     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
666     EXPECT_EQ(status, OK);
667     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
668     /**
669      * @tc.steps: step5. Update local data
670      * @tc.expected: step5. Exist downloading assets count
671      */
672     EXPECT_EQ(RDBDataGenerator::UpsertLocalDBData(0, cloudCount, db_,
673         GetTableSchema("AsyncDownloadAssetsTest", true)), OK);
674     std::tie(status, downloadCount) = delegate_->GetDownloadingAssetsCount();
675     EXPECT_EQ(status, OK);
676     EXPECT_EQ(downloadCount, cloudCount * 2); // 1 record has 2 asset
677 }
678 
679 /**
680  * @tc.name: AsyncAbnormalDownload005
681  * @tc.desc: Test download assets which was locked
682  * @tc.type: FUNC
683  * @tc.require:
684  * @tc.author: liaoyonghuang
685  */
686 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload005, TestSize.Level1)
687 {
688     /**
689      * @tc.steps: step1. Set async config
690      * @tc.expected: step1. ok
691      */
692     AsyncDownloadAssetsConfig config;
693     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
694     /**
695      * @tc.steps: step2. Init data
696      * @tc.expected: step2. ok
697      */
698     const int cloudCount = 10;
699     auto schema = GetSchema();
700     ASSERT_TRUE(!schema.tables.empty());
701     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
702     CloudSyncOption option = GetAsyncCloudSyncOption();
703     option.asyncDownloadAssets = false;
704     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, OK);
705     /**
706      * @tc.steps: step3. Update cloud data and lock local data
707      * @tc.expected: step3. ok
708      */
709     auto [records, extends] =
710         RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, schema.tables.front().fields);
711     Asset asset = {.name = "asset_1", .hash = "new_hash"};
712     for (auto &record : records) {
713         record.insert_or_assign("asset_1", asset);
714     }
715     std::string table = schema.tables.front().name;
716     EXPECT_EQ(virtualCloudDb_->BatchUpdate(table, std::move(records), extends), OK);
717     virtualAssetLoader_->ForkDownload([&](const std::string &tableName,
__anonb165af940802(const std::string &tableName, std::map<std::string, Assets> &) 718         std::map<std::string, Assets> &) {
719         std::vector<std::vector<uint8_t>> hashKey;
720         CloudDBSyncUtilsTest::GetHashKey(table, "data_key < 5", db_, hashKey); // lock half of the data
721         EXPECT_EQ(Lock(table, hashKey, db_), OK);
722     });
723     /**
724      * @tc.steps: step4. Sync and check data
725      * @tc.expected: step4. ok
726      */
727     option.asyncDownloadAssets = true;
728     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
729     std::this_thread::sleep_for(std::chrono::seconds(1));
730     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
731     EXPECT_EQ(status, OK);
732     EXPECT_EQ(downloadCount, cloudCount / 2); // half of the data was not downloaded due to being locked
733     virtualAssetLoader_->ForkDownload(nullptr);
734 }
735 
736 /**
737  * @tc.name: AsyncNormalDownload001
738  * @tc.desc: Test abnormal async download.
739  * @tc.type: FUNC
740  * @tc.require:
741  * @tc.author: zqq
742  */
743 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload001, TestSize.Level1)
744 {
745     /**
746      * @tc.steps: step1. Register observer
747      * @tc.expected: step1. ok
748      */
749     auto rdbObserver = new(std::nothrow) RelationalStoreObserverUnitTest();
750     ASSERT_NE(rdbObserver, nullptr);
__anonb165af940902() 751     ResFinalizer resFinalizer([rdbObserver, this]() {
752         delegate_->UnRegisterObserver(rdbObserver);
753         delete rdbObserver;
754     });
755     EXPECT_EQ(delegate_->RegisterObserver(rdbObserver), OK);
756     /**
757      * @tc.steps: step2. Insert cloud data
758      * @tc.expected: step2. ok
759      */
760     const int cloudCount = 1;
761     auto schema = GetSchema();
762     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
763     EXPECT_EQ(RDBDataGenerator::InsertLocalDBData(cloudCount + 1, 1, db_, GetSchema()), E_OK);
764     /**
765      * @tc.steps: step3. Block upload while async donwload asset
766      * @tc.expected: step3. Sync ok
767      */
768     auto hook = RelationalTestUtils::GetRDBStorageHook(USER_ID, APP_ID, STORE_ID_1, storePath_);
769     ASSERT_NE(hook, nullptr);
__anonb165af940a02() 770     hook->SetBeforeUploadTransaction([]() {
771         int count = 1;
772         const int maxLoop = 5;
773         do {
774             std::this_thread::sleep_for(std::chrono::seconds(1));
775             count++;
776         } while (RuntimeContext::GetInstance()->GetAssetsDownloadManager()->GetCurrentDownloadCount() > 0 &&
777             count < maxLoop);
778         LOGW("AsyncNormalDownload001 End hook");
779     });
780     CloudSyncOption option = GetAsyncCloudSyncOption();
781     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
782     EXPECT_TRUE(rdbObserver->IsAssetChange(GetTableSchema("AsyncDownloadAssetsTest").name));
783     hook->SetBeforeUploadTransaction(nullptr);
784 }
785 
786 /**
787  * @tc.name: AsyncNormalDownload002
788  * @tc.desc: Test sync download when download task pool is full
789  * @tc.type: FUNC
790  * @tc.require:
791  * @tc.author: lhy
792  */
793 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload002, TestSize.Level4)
794 {
795     /**
796      * @tc.steps: step1. Set max download task 1
797      * @tc.expected: step1. Ok
798      */
799     AsyncDownloadAssetsConfig config;
800     config.maxDownloadTask = 1;
801     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
802     /**
803      * @tc.steps: step2. Insert cloud data
804      * @tc.expected: step2. Ok
805      */
806     const int cloudCount = 1;
807     auto schema = GetSchema();
808     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
809     /**
810      * @tc.steps: step3. Fork download abnormal
811      */
812     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
813     /**
814      * @tc.steps: step4. Async cloud data, with abnormal result
815      * @tc.expected: step4. Ok
816      */
817     CloudSyncOption option = GetAsyncCloudSyncOption();
818     RelationalTestUtils::CloudBlockSync(option, delegate_);
819     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
820     EXPECT_EQ(status, OK);
821     EXPECT_EQ(downloadCount, cloudCount * GetAssetFieldCount());
822     EXPECT_FALSE(RelationalTestUtils::IsExistEmptyHashAsset(db_, GetTableSchema("AsyncDownloadAssetsTest")));
823     /**
824      * @tc.steps: step5. Wait for failed download to finish
825      * @tc.expected: step5. Download task changes from 1 to 0
826      */
827     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
828     std::this_thread::sleep_for(std::chrono::seconds(1));
829     EXPECT_EQ(manager->GetCurrentDownloadCount(), 0u);
830     /**
831      * @tc.steps: step6. Start a new download task to reach maxDownloadTask
832      * @tc.expected: step6. 1 task is downloading
833      */
834     auto [errCode, listener] = manager->BeginDownloadWithListener(nullptr);
835     ASSERT_EQ(errCode, E_OK);
836     ASSERT_EQ(listener, nullptr);
837     ASSERT_EQ(manager->GetCurrentDownloadCount(), 1u);
838     /**
839      * @tc.steps: step7. Set download status to ok then try sync while task pool is full
840      * @tc.expected: step7. Download should be waiting instead of doing compensated sync
841      */
842     virtualAssetLoader_->SetDownloadStatus(OK);
843     LOGW("set download ok");
844     int count = 0;
845     std::mutex countMutex;
846     std::condition_variable cv;
847     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940b02(const std::string &tableName, std::map<std::string, Assets> &) 848         std::map<std::string, Assets> &) {
849         std::lock_guard<std::mutex> autoLock(countMutex);
850         count++;
851         if (count == 1) {
852             std::this_thread::sleep_for(std::chrono::seconds(1));
853         }
854         cv.notify_all();
855     });
856     RelationalTestUtils::CloudBlockSync(option, delegate_);
857     std::this_thread::sleep_for(std::chrono::seconds(1));
858     EXPECT_EQ(count, 0);
859     /**
860      * @tc.steps: step8. Finish the download task, to let sync continue
861      * @tc.expected: step8. Only 1 actural download through virtualAssetLoader_
862      */
863     manager->FinishDownload();
864     std::this_thread::sleep_for(std::chrono::seconds(1));
865     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af940c02() 866     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
867         return count >= cloudCount;
868     });
869     EXPECT_EQ(count, 1);
870     virtualAssetLoader_->ForkDownload(nullptr);
871 }
872 
873 /**
874  * @tc.name: AsyncNormalDownload004
875  * @tc.desc: Test multiple tables and multiple batches of asset downloads
876  * @tc.type: FUNC
877  * @tc.require:
878  * @tc.author: liaoyonghuang
879  */
880 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload004, TestSize.Level1)
881 {
882     /**
883      * @tc.steps: step1. Set max download task 1
884      * @tc.expected: step1. Ok
885      */
886     AsyncDownloadAssetsConfig config;
887     config.maxDownloadAssetsCount = 25;
888     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
889     /**
890      * @tc.steps: step2. Insert cloud data
891      * @tc.expected: step2. Ok
892      */
893     const int cloudCount = 100;
894     std::string table1 = "TABLE1";
895     std::string table2 = "TABLE2";
896     DataBaseSchema schema;
897     schema.tables.push_back(GetTableSchema(table1));
898     schema.tables.push_back(GetTableSchema(table2));
899     EXPECT_EQ(RDBDataGenerator::InitDatabase(schema, *db_), SQLITE_OK);
900     auto [record1, extend1] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, 0, GetTableSchema(table1).fields);
901     EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table1, std::move(record1), extend1), OK);
902     auto [record2, extend2] = RDBDataGenerator::GenerateDataRecords(0, cloudCount, cloudCount,
903         GetTableSchema(table2).fields);
904     EXPECT_EQ(virtualCloudDb_->BatchInsertWithGid(table2, std::move(record2), extend2), OK);
905     /**
906      * @tc.steps: step3. async asset task submit
907      * @tc.expected: step3. Ok
908      */
909     int assetsDownloadTime = 0;
910     virtualAssetLoader_->ForkDownload([&table1, &table2, &assetsDownloadTime](const std::string &tableName,
__anonb165af940d02(const std::string &tableName, std::map<std::string, Assets> &) 911         std::map<std::string, Assets> &) {
912         if (assetsDownloadTime < 100) { // 100 assets
913             EXPECT_EQ(tableName, table1);
914         } else {
915             EXPECT_EQ(tableName, table2);
916         }
917         assetsDownloadTime++;
918     });
919     CloudSyncOption option;
920     option.devices = {"cloud"};
921     option.asyncDownloadAssets = true;
922     Query query = Query::Select().FromTable({table1, table2});
923     option.query = query;
924     EXPECT_NO_FATAL_FAILURE(RelationalTestUtils::CloudBlockSync(option, delegate_));
925     std::this_thread::sleep_for(std::chrono::seconds(5));
926     EXPECT_EQ(assetsDownloadTime, 200);
927     virtualAssetLoader_->ForkDownload(nullptr);
928 }
929 
930 /**
931  * @tc.name: AsyncNormalDownload005
932  * @tc.desc: Test concurrent async download of assets and sync data.
933  * @tc.type: FUNC
934  * @tc.require:
935  * @tc.author: liaoyonghuang
936  */
937 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncNormalDownload005, TestSize.Level1)
938 {
939     /**
940      * @tc.steps: step1. Set max download task 1
941      * @tc.expected: step1. ok
942      */
943     AsyncDownloadAssetsConfig config;
944     config.maxDownloadTask = 12;
945     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
946     /**
947      * @tc.steps: step2. Insert cloud data and sync concurrently
948      * @tc.expected: step2. ok
949      */
950     const int cloudCount = 200;
951     auto schema = GetSchema();
952     int threadNum = 10;
953     CloudSyncOption option = GetAsyncCloudSyncOption();
954     thread *syncThreads[threadNum];
955     for (int i = 0; i < threadNum; i++) {
__anonb165af940e02() 956         syncThreads[i] = new thread([&]() {
957             EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
958             RelationalTestUtils::CloudBlockSync(option, delegate_);
959             DeleteLocalData(db_, "AsyncDownloadAssetsTest");
960         });
961     }
962     for (auto &thread : syncThreads) {
963         thread->join();
964         delete thread;
965     }
966 }
967 
968 /**
969  * @tc.name: AsyncAbnormalDownload006
970  * @tc.desc: Test abnormal async download.
971  * @tc.type: FUNC
972  * @tc.require:
973  * @tc.author: suyue
974  */
975 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload006, TestSize.Level4)
976 {
977     /**
978      * @tc.steps: step1. Set config and insert 70 cloud data
979      * @tc.expected: step1. ok
980      */
981     AsyncDownloadAssetsConfig config;
982     config.maxDownloadTask = 5;
983     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
984     const int cloudCount = 70;
985     auto schema = GetSchema();
986     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
987 
988     /**
989      * @tc.steps: step2. Fork download abnormal for 0-10 records
990      * @tc.expected: step2. ok
991      */
992     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
993     uint32_t failNum = 10;
994     const DownloadFailRange setRange = {.isAllFail = false, .failBeginIndex = 0, .failEndIndex = failNum};
995     virtualAssetLoader_->SetDownloadFailRange(setRange);
996 
997     /**
998      * @tc.steps: step3. Async cloud data
999      * @tc.expected: step3. ok
1000      */
1001     int count = 0;
1002     std::mutex countMutex;
1003     std::condition_variable cv;
1004     virtualAssetLoader_->ForkDownload([&count, &countMutex, &cv](const std::string &tableName,
__anonb165af940f02(const std::string &tableName, std::map<std::string, Assets> &) 1005         std::map<std::string, Assets> &) {
1006         std::lock_guard<std::mutex> autoLock(countMutex);
1007         count++;
1008         if (count == 1) {
1009             std::this_thread::sleep_for(std::chrono::seconds(1));
1010         }
1011         cv.notify_all();
1012     });
1013 
1014     CloudSyncOption option = GetAsyncCloudSyncOption();
1015     RelationalTestUtils::CloudBlockSync(option, delegate_);
1016     std::unique_lock<std::mutex> uniqueLock(countMutex);
__anonb165af941002() 1017     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count]() {
1018         return count >= cloudCount;
1019     });
1020     std::this_thread::sleep_for(std::chrono::seconds(1));
1021     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1022     EXPECT_EQ(status, OK);
1023     EXPECT_EQ(downloadCount, static_cast<int32_t>(failNum * 2)); // 1 record has 2 asset
1024 
1025     virtualAssetLoader_->SetDownloadStatus(OK);
1026     virtualAssetLoader_->Reset();
1027     virtualAssetLoader_->ForkDownload(nullptr);
1028 }
1029 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,int32_t begin,int32_t end)1030 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalData(
1031         sqlite3 *&db, const std::string &tableName, int32_t begin, int32_t end)
1032 {
1033     const string sql = "update " + tableName + " set int_field = int_field+1 " + "where pk>=" + std::to_string(begin) +
1034         " and pk<=" + std::to_string(end) + ";";
1035     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1036     LOGW("update local data finished");
1037 }
1038 
DeleteLocalData(sqlite3 * & db,const std::string & tableName)1039 void DistributedDBCloudAsyncDownloadAssetsTest::DeleteLocalData(sqlite3 *&db, const std::string &tableName)
1040 {
1041     const string sql = "delete from " + tableName + " where pk >= 0;";
1042     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1043     LOGW("delete local data finished");
1044 }
1045 
CheckLogTable(sqlite3 * & db,const std::string & tableName,int count)1046 void DistributedDBCloudAsyncDownloadAssetsTest::CheckLogTable(sqlite3 *&db, const std::string &tableName, int count)
1047 {
1048     const string sql = "select COUNT(*) from " + DBCommon::GetLogTableName(tableName) + " where data_key>0;";
1049     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1050         reinterpret_cast<void *>(count), nullptr), SQLITE_OK);
1051     LOGW("check log table finished");
1052 }
1053 
UpdateLocalAndCheckUploadCount(const bool & isAsync,const int & dataCount,const int & expectCount)1054 void DistributedDBCloudAsyncDownloadAssetsTest::UpdateLocalAndCheckUploadCount(const bool &isAsync,
1055     const int &dataCount, const int &expectCount)
1056 {
1057     /**
1058      * @tc.steps: step1. Set async config
1059      * @tc.expected: step1. ok
1060      */
1061     AsyncDownloadAssetsConfig config;
1062     config.maxDownloadTask = 12;
1063     config.maxDownloadAssetsCount = 100;
1064     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1065     /**
1066      * @tc.steps: step2. Init data, set download status false and sync
1067      * @tc.expected: step2. async download will return OK and sync download will return CLOUD_ERROR
1068      */
1069     const int cloudCount = dataCount;
1070     auto schema = GetSchema();
1071     ASSERT_TRUE(!schema.tables.empty());
1072     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1073     virtualAssetLoader_->SetDownloadStatus(DB_ERROR);
1074     CloudSyncOption option = GetAsyncCloudSyncOption();
1075     option.asyncDownloadAssets = isAsync;
1076     DBStatus expectStatus = isAsync ? OK : CLOUD_ERROR;
1077     RelationalTestUtils::CloudBlockSync(option, delegate_, OK, expectStatus);
1078     std::this_thread::sleep_for(std::chrono::seconds(1));
1079     /**
1080      * @tc.steps: step3. Modify all local data
1081      * @tc.expected: step3. OK
1082      */
1083     UpdateLocalData(db_, "AsyncDownloadAssetsTest", 0, cloudCount);
1084     std::this_thread::sleep_for(std::chrono::seconds(1));
1085     /**
1086      * @tc.steps: step4. Fork upload to count number of uploaded records
1087      * @tc.expected: step4. OK
1088      */
1089     int count = 0;
1090     std::mutex countMutex;
1091     std::condition_variable cv;
1092     virtualCloudDb_->ForkUpload([&count, &countMutex, &cv](const std::string&, VBucket&) {
1093         std::lock_guard<std::mutex> autoLock(countMutex);
1094         count++;
1095         cv.notify_all();
1096     });
1097     virtualAssetLoader_->SetDownloadStatus(OK);
1098     virtualAssetLoader_->Reset();
1099     /**
1100      * @tc.steps: step5. Sync again and check upload count
1101      * @tc.expected: step5. OK
1102      */
1103     RelationalTestUtils::CloudBlockSync(option, delegate_);
1104     std::unique_lock<std::mutex> uniqueLock(countMutex);
1105     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&count, &cloudCount, &isAsync]() {
1106         return !isAsync || count >= cloudCount;
1107     });
1108     EXPECT_EQ(count, expectCount);
1109     /**
1110      * @tc.steps: step6. Release resources
1111      * @tc.expected: step6. OK
1112      */
1113     virtualAssetLoader_->Reset();
1114     virtualAssetLoader_->ForkDownload(nullptr);
1115     virtualCloudDb_->ForkUpload(nullptr);
1116 }
1117 
1118 /**
1119  * @tc.name: AsyncAbnormalDownload007
1120  * @tc.desc: Test in async download mode and asset is not downloaded, the local data can be uploaded
1121  * @tc.type: FUNC
1122  * @tc.require:
1123  * @tc.author: tankaisheng
1124  */
1125 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload007, TestSize.Level1)
1126 {
1127     int cloudCount = 10;
1128     EXPECT_NO_FATAL_FAILURE(UpdateLocalAndCheckUploadCount(true, cloudCount, cloudCount));
1129 }
1130 
1131 /**
1132  * @tc.name: AsyncAbnormalDownload009
1133  * @tc.desc: Test in sync download mode and asset is not downloaded, the local data will be ignored when upload
1134  * @tc.type: FUNC
1135  * @tc.require:
1136  * @tc.author: liuhongyang
1137  */
1138 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload009, TestSize.Level1)
1139 {
1140     int cloudCount = 10;
1141     UpdateLocalAndCheckUploadCount(false, cloudCount, 0);
1142 }
1143 
ModifySkippedAsset(int rowIndex,std::map<std::string,Assets> & assets,DBStatus fakeStatus)1144 DBStatus ModifySkippedAsset(int rowIndex, std::map<std::string, Assets> &assets, DBStatus fakeStatus)
1145 {
1146     if (rowIndex != 1) {
1147         return OK;
1148     }
1149     for (auto &asset : assets) {
1150         for (auto &item : asset.second) {
1151             if (item.name == "asset_1" + std::to_string(rowIndex)) {
1152                 item.status = static_cast<uint32_t>(AssetStatus::ABNORMAL);
1153             }
1154         }
1155     }
1156     return fakeStatus;
1157 }
1158 
DoSkipAssetDownload(SkipAssetTestParamT param)1159 void DistributedDBCloudAsyncDownloadAssetsTest::DoSkipAssetDownload(SkipAssetTestParamT param)
1160 {
1161     /**
1162      * @tc.steps: step1 change max download task
1163      * @tc.expected: step1. Ok
1164      */
1165     AsyncDownloadAssetsConfig config;
1166     config.maxDownloadTask = CloudDbConstant::MAX_ASYNC_DOWNLOAD_TASK; // maximum of tasks
1167     config.maxDownloadAssetsCount = CloudDbConstant::MAX_ASYNC_DOWNLOAD_ASSETS; // maximum of asset counts
1168     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1169     /**
1170      * @tc.steps: step2 Insert cloud data
1171      * @tc.expected: step2. Ok
1172      */
1173     const int cloudCount = 5;
1174     auto schema = GetSchema();
1175     std::string tableName = "AsyncDownloadAssetsTest";
1176     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(param.startIndex, cloudCount, 0, schema, virtualCloudDb_), OK);
1177     CloudDBSyncUtilsTest::CheckLocalRecordNum(db_, tableName, param.startIndex);
1178     /**
1179      * @tc.steps: step3 Fork download abnormal
1180      * @tc.expected: step3. Ok
1181      */
1182     int count = 0;
1183     std::mutex countMutex;
1184     std::condition_variable cv;
1185     if (param.useBatch) {
1186         RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1187         virtualAssetLoader_->ForkBatchDownload([&count, &countMutex, &cv, param](int rowIndex,
1188             std::map<std::string, Assets> &assets) {
1189             std::lock_guard<std::mutex> autoLock(countMutex);
1190             count++;
1191             auto ret = ModifySkippedAsset(rowIndex, assets, param.downloadRes);
1192             if (count == 1) {
1193                 std::this_thread::sleep_for(std::chrono::seconds(1));
1194             }
1195             cv.notify_all();
1196             return ret;
1197         });
1198     } else {
1199         RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1200         virtualAssetLoader_->SetDownloadStatus(param.downloadRes);
1201     }
1202     /**
1203      * @tc.steps: step4. sync cloud data
1204      * @tc.expected: step4. Ok
1205      */
1206     CloudSyncOption option = GetAsyncCloudSyncOption();
1207     option.asyncDownloadAssets = param.useAsync;
1208     RelationalTestUtils::CloudBlockSync(option, delegate_, param.expectSyncRes);
1209     /**
1210      * @tc.steps: step5. wait for sync to finish
1211      * @tc.expected: step5. check local record number and inconsistent count
1212      */
1213     std::unique_lock<std::mutex> uniqueLock(countMutex);
1214     auto res = cv.wait_for(uniqueLock, std::chrono::seconds(DBConstant::MAX_SYNC_TIMEOUT),
1215         [&count, param, cloudCount] {
1216         return count >= cloudCount || !param.useBatch;
1217     });
1218     EXPECT_TRUE(res);
1219     std::this_thread::sleep_for(std::chrono::seconds(1));
1220     CloudDBSyncUtilsTest::CheckLocalRecordNum(db_, tableName, param.startIndex + cloudCount);
1221     CheckInconsistentCount(db_, param.expectInconsistentCount);
1222     /**
1223      * @tc.steps: step6. clear
1224      */
1225     virtualAssetLoader_->SetDownloadStatus(OK);
1226     virtualAssetLoader_->Reset();
1227     virtualAssetLoader_->ForkBatchDownload(nullptr);
1228 }
1229 
1230 /**
1231  * @tc.name: SkipAssetDownloadTest001
1232  * @tc.desc: Test async batch download returns Skip_Assets
1233  * @tc.type: FUNC
1234  * @tc.require:
1235  * @tc.author: liuhongyang
1236  */
1237 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest001, TestSize.Level1)
1238 {
1239     /**
1240      * @tc.expected: step1. sync return OK, and has 1 inconsistent records
1241      */
1242     SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = true, .useAsync = true,
1243         .startIndex = 0, .expectInconsistentCount = 1, .expectSyncRes = OK};
1244     DoSkipAssetDownload(param);
1245 }
1246 
1247 /**
1248  * @tc.name: SkipAssetDownloadTest002
1249  * @tc.desc: Test sync batch download returns Skip_Assets
1250  * @tc.type: FUNC
1251  * @tc.require:
1252  * @tc.author: liuhongyang
1253  */
1254 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest002, TestSize.Level1)
1255 {
1256     /**
1257      * @tc.expected: step1. sync return OK, and has 1 inconsistent records
1258      */
1259     SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = true, .useAsync = false,
1260         .startIndex = 0, .expectInconsistentCount = 1, .expectSyncRes = OK};
1261     DoSkipAssetDownload(param);
1262 }
1263 
1264 /**
1265  * @tc.name: SkipAssetDownloadTest003
1266  * @tc.desc: Test sync one-by-one download returns Skip_Assets
1267  * @tc.type: FUNC
1268  * @tc.require:
1269  * @tc.author: liuhongyang
1270  */
1271 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, SkipAssetDownloadTest003, TestSize.Level1)
1272 {
1273     /**
1274      * @tc.expected: step1. sync return OK, and has 5 inconsistent records
1275      */
1276     SkipAssetTestParamT param = {.downloadRes = SKIP_ASSET, .useBatch = false, .useAsync = false,
1277         .startIndex = 0, .expectInconsistentCount = 5, .expectSyncRes = OK};
1278     DoSkipAssetDownload(param);
1279 }
1280 
1281 /**
1282  * @tc.name: AsyncAbnormalDownload008
1283  * @tc.desc: Test the total count of download after download failure
1284  * @tc.type: FUNC
1285  * @tc.require:
1286  * @tc.author: bty
1287  */
1288 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload008, TestSize.Level1)
1289 {
1290     /**
1291      * @tc.steps: step1. Set max download task 1
1292      * @tc.expected: step1. ok
1293      */
1294     AsyncDownloadAssetsConfig config;
1295     config.maxDownloadTask = 1;
1296     config.maxDownloadAssetsCount = 2;
1297     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1298     /**
1299      * @tc.steps: step2. Insert cloud data
1300      * @tc.expected: step2. ok
1301      */
1302     const int cloudCount = 4;
1303     auto schema = GetSchema();
1304     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1305     /**
1306      * @tc.steps: step3. Fork download return cloud error
1307      * @tc.expected: step3. CLOUD_ERROR
1308      */
1309     int downloadIndex = 0;
1310     int failedCount = 2;
1311     virtualAssetLoader_->ForkBatchDownload([&downloadIndex, failedCount](
__anonb165af941502( int rowIndex, std::map<std::string, Assets> &assets) 1312         int rowIndex, std::map<std::string, Assets> &assets) {
1313         downloadIndex++;
1314         if (downloadIndex <= failedCount) {
1315             return CLOUD_ERROR;
1316         }
1317         return OK;
1318     });
1319     /**
1320      * @tc.steps: step4. Sync
1321      * @tc.expected: step4. ok
1322      */
1323     CloudSyncOption option = GetAsyncCloudSyncOption();
1324     RelationalTestUtils::CloudBlockSync(option, delegate_);
1325     std::this_thread::sleep_for(std::chrono::seconds(1));
1326     /**
1327      * @tc.steps: step5. Check download count
1328      * @tc.expected: step5. ok
1329      */
1330     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1331     EXPECT_EQ(status, OK);
1332     EXPECT_EQ(downloadCount, cloudCount);
1333     virtualAssetLoader_->ForkBatchDownload(nullptr);
1334 }
1335 
1336 /**
1337  * @tc.name: AsyncAbnormalDownload010
1338  * @tc.desc: Test assets is async downloading when delete local data.
1339  * @tc.type: FUNC
1340  * @tc.require:
1341  * @tc.author: tankaisheng
1342  */
1343 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload010, TestSize.Level1)
1344 {
1345     /**
1346      * @tc.steps: step1. Set max download task 1
1347      * @tc.expected: step1. ok
1348      */
1349     AsyncDownloadAssetsConfig config;
1350     config.maxDownloadTask = 1;
1351     config.maxDownloadAssetsCount = 2;
1352     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1353     /**
1354      * @tc.steps: step2. Insert cloud data
1355      * @tc.expected: step2. ok
1356      */
1357     const int cloudCount = 2000;
1358     auto schema = GetSchema();
1359     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1360     /**
1361      * @tc.steps: step3. Sync
1362      * @tc.expected: step3. ok
1363      */
1364     CloudSyncOption option = GetAsyncCloudSyncOption();
1365     RelationalTestUtils::CloudBlockSync(option, delegate_);
1366     /**
1367      * @tc.steps: step4. Delete local data.
1368      * @tc.expected: step4. ok
1369      */
1370     DeleteLocalData(db_, "AsyncDownloadAssetsTest");
1371     /**
1372      * @tc.steps: step5. Check log table.
1373      * @tc.expected: step5. ok
1374      */
1375     CheckLogTable(db_, "AsyncDownloadAssetsTest", 0);
1376 }
1377 
1378 /**
1379  * @tc.name: TriggerAsyncTask001
1380  * @tc.desc: Test trigger async task.
1381  * @tc.type: FUNC
1382  * @tc.require:
1383  * @tc.author: zqq
1384  */
1385 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, TriggerAsyncTask001, TestSize.Level1)
1386 {
1387     auto storage = std::make_shared<MockICloudSyncStorageInterface>();
1388     ASSERT_NE(storage, nullptr);
1389     auto proxy = StorageProxy::GetCloudDb(storage.get());
1390     ASSERT_NE(proxy, nullptr);
1391     auto syncer = new(std::nothrow) VirtualCloudSyncer(proxy);
1392     ASSERT_NE(syncer, nullptr);
1393     /**
1394      * @tc.steps: step1. Trigger async task with not exist table contain assets.
1395      */
1396     syncer->TriggerAsyncTask();
1397     /**
1398      * @tc.steps: step2. Trigger async task with exist table contain assets.
1399      */
1400     EXPECT_CALL(*storage, IsExistTableContainAssets).WillRepeatedly(testing::Return(true));
__anonb165af941602() 1401     EXPECT_CALL(*storage, GetDownloadAssetTable).WillRepeatedly([]() {
1402         std::pair<int, std::vector<std::string>> res;
1403         return res;
1404     });
1405     syncer->TriggerAsyncTask();
1406     syncer->WaitTaskFinished();
1407     /**
1408      * @tc.steps: step3. Async trigger and wait finished.
1409      */
__anonb165af941702() 1410     std::thread t1([syncer]() {
1411         for (int i = 0; i < 1000; ++i) {
1412             syncer->TriggerAsyncTask();
1413         }
1414     });
__anonb165af941802() 1415     std::thread t2([syncer]() {
1416         for (int i = 0; i < 1000; ++i) {
1417             syncer->WaitTaskFinished();
1418         }
1419     });
1420     t1.join();
1421     t2.join();
1422     syncer->Close();
1423     RefObject::KillAndDecObjRef(syncer);
1424 }
1425 
1426 /**
1427  * @tc.name: AsyncAbnormalDownload011
1428  * @tc.desc: Test set reference when async downloading.
1429  * @tc.type: FUNC
1430  * @tc.require:
1431  * @tc.author: tankaisheng
1432  */
1433 HWTEST_F(DistributedDBCloudAsyncDownloadAssetsTest, AsyncAbnormalDownload011, TestSize.Level1)
1434 {
1435     /**
1436      * @tc.steps: step1. Set max download task 1
1437      * @tc.expected: step1. ok
1438      */
1439     AsyncDownloadAssetsConfig config;
1440     config.maxDownloadTask = 1;
1441     config.maxDownloadAssetsCount = 2;
1442     EXPECT_EQ(RuntimeConfig::SetAsyncDownloadAssetsConfig(config), OK);
1443     /**
1444      * @tc.steps: step2. Insert cloud data
1445      * @tc.expected: step2. ok
1446      */
1447     const int cloudCount = 100;
1448     auto schema = GetSchema();
1449     EXPECT_EQ(RDBDataGenerator::InsertCloudDBData(0, cloudCount, 0, schema, virtualCloudDb_), OK);
1450     /**
1451      * @tc.steps: step3. Fork download set reference
1452      * @tc.expected: step3. ok
1453      */
1454     TableReferenceProperty tableReferenceProperty;
1455     tableReferenceProperty.sourceTableName = "AsyncDownloadAssetsTest";
1456     tableReferenceProperty.targetTableName = "TABLE1";
1457     std::map<std::string, std::string> columns;
1458     columns["int_field"] = "int_field";
1459     tableReferenceProperty.columns = columns;
1460     CloudSyncOption option = GetAsyncCloudSyncOption();
1461 
1462     std::mutex mtx;
1463     std::condition_variable cv;
1464     bool t2_started = false;
1465 
__anonb165af941902() 1466     std::thread t1([this, option, &mtx, &cv, &t2_started]() {
1467         {
1468             std::unique_lock<std::mutex> lock(mtx);
1469             t2_started = true;
1470             cv.notify_one();
1471         }
1472         RelationalTestUtils::CloudBlockSync(option, delegate_);
1473     });
1474 
__anonb165af941a02null1475     std::thread t2([this, tableReferenceProperty, &mtx, &cv, &t2_started] {
1476         std::unique_lock<std::mutex> lock(mtx);
1477         cv.wait(lock, [&]{ return t2_started; });
1478         EXPECT_EQ(delegate_->SetReference({tableReferenceProperty}), OK);
1479     });
1480 
1481     t1.join();
1482     t2.join();
1483     /**
1484      * @tc.steps: step5. Check download count
1485      * @tc.expected: step5. ok
1486      */
1487     auto [status, downloadCount] = delegate_->GetDownloadingAssetsCount();
1488     EXPECT_EQ(status, OK);
1489     EXPECT_NE(downloadCount, 0);
1490     virtualAssetLoader_->ForkBatchDownload(nullptr);
1491 }
1492 }