1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "log_print.h"
21 #include "relational_store_delegate.h"
22 #include "relational_store_manager.h"
23 #include "runtime_config.h"
24 #include "time_helper.h"
25 #include "virtual_asset_loader.h"
26 #include "virtual_cloud_data_translate.h"
27 #include "virtual_cloud_db.h"
28 #include "virtual_communicator_aggregator.h"
29 #include "sqlite_relational_utils.h"
30 #include "cloud/cloud_storage_utils.h"
31 #include "cloud_db_sync_utils_test.h"
32
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38 "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39 "id TEXT PRIMARY KEY," \
40 "name TEXT," \
41 "height REAL ," \
42 "photo BLOB," \
43 "asset ASSET," \
44 "assets ASSETS," \
45 "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62 std::mutex dataMutex;
63 std::condition_variable cv;
64 bool finish = false;
65 SyncProcess last;
66 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67 for (const auto &item: process) {
68 if (item.second.process == DistributedDB::FINISHED) {
69 {
70 std::lock_guard<std::mutex> autoLock(dataMutex);
71 finish = true;
72 }
73 last = item.second;
74 cv.notify_one();
75 }
76 }
77 };
78 LOGW("begin call sync");
79 ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80 std::unique_lock<std::mutex> uniqueLock(dataMutex);
81 cv.wait(uniqueLock, [&finish]() {
82 return finish;
83 });
84 lastProcess_ = last;
85 LOGW("end call sync");
86 }
87
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)88 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
89 {
90 if (count != 1) {
91 return 0;
92 }
93 auto expectCount = reinterpret_cast<int64_t>(data);
94 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
95 return 0;
96 }
97
98 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
99 public:
100 static void SetUpTestCase();
101 static void TearDownTestCase();
102 void SetUp() override;
103 void TearDown() override;
104 void WriteDataWithoutCommitTransaction();
105 protected:
106 void InitTestDir();
107 DataBaseSchema GetSchema();
108 void CloseDb();
109 void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
110 const Assets &templateAsset = {});
111 void UpdateLocalTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
112 bool updateAssets = true);
113 void UpdateLocalAssetRecord(const std::string &tableName, int64_t begin, int64_t count);
114 void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
115 void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
116 void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
117 void InsertLocalAssetData(const std::string &assetHash);
118 void InsertCloudAssetData(const std::string &assetHash);
119 void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
120 void PrepareForAssetOperation010();
121 void UpdateAssetWhenSyncUpload();
122 DBStatus InsertRecordToCloud(const std::vector<VBucket> &record);
123 void PrepareDataInCloud();
124 void LocalAssetRemoveTest();
125
126 static std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
127 static std::vector<VBucket> GenerateAssetsRecords(const std::map<std::string, int32_t> &colType,
128 const Asset &templateAsset, int assetsCount, int rowCount);
129 static Asset GenerateAsset(const Asset &templateAsset, int id);
130 static Assets GenerateAssets(const Asset &templateAsset, int id, int assetsCount);
131 std::string testDir_;
132 std::string storePath_;
133 sqlite3 *db_ = nullptr;
134 RelationalStoreDelegate *delegate_ = nullptr;
135 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
136 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
137 std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
138 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
139 std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
140 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
141 TrackerSchema trackerSchema = {
142 .tableName = tableName_, .extendColNames = {"name"}, .trackerColNames = {"age"}
143 };
144 };
145
SetUpTestCase()146 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
147 {
148 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
149 }
150
TearDownTestCase()151 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
152 {}
153
SetUp()154 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
155 {
156 DistributedDBToolsUnitTest::PrintTestCaseInfo();
157 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
158 InitTestDir();
159 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
160 LOGE("rm test db files error.");
161 }
162 DistributedDBToolsUnitTest::PrintTestCaseInfo();
163 LOGD("Test dir is %s", testDir_.c_str());
164 db_ = RelationalTestUtils::CreateDataBase(storePath_);
165 ASSERT_NE(db_, nullptr);
166 CreateUserDBAndTable(db_);
167 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
168 RelationalStoreDelegate::Option option;
169 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
170 ASSERT_NE(delegate_, nullptr);
171 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
172 ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
173 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
174 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
175 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
176 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
177 virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
178 DataBaseSchema dataBaseSchema = GetSchema();
179 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
180 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
181 ASSERT_TRUE(communicatorAggregator_ != nullptr);
182 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
183 }
184
TearDown()185 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
186 {
187 CloseDb();
188 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
189 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
190 LOGE("rm test db files error.");
191 }
192 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
193 communicatorAggregator_ = nullptr;
194 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
195 }
196
InitTestDir()197 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
198 {
199 if (!testDir_.empty()) {
200 return;
201 }
202 DistributedDBToolsUnitTest::TestDirInit(testDir_);
203 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
204 LOGI("The test db is:%s", testDir_.c_str());
205 }
206
GetSchema()207 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
208 {
209 DataBaseSchema schema;
210 TableSchema tableSchema;
211 tableSchema.name = tableName_;
212 tableSchema.sharedTableName = tableName_ + "_shared";
213 tableSchema.fields = {
214 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
215 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
216 {"age", TYPE_INDEX<int64_t>}
217 };
218 schema.tables.push_back(tableSchema);
219 return schema;
220 }
221
CloseDb()222 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
223 {
224 virtualCloudDb_->ForkUpload(nullptr);
225 virtualCloudDb_ = nullptr;
226 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
227 delegate_ = nullptr;
228 mgr_ = nullptr;
229 }
230
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)231 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
232 int64_t count, size_t assetCount, const Assets &templateAsset)
233 {
234 std::string photo = "phone";
235 int errCode;
236 std::vector<uint8_t> assetBlob;
237 std::vector<uint8_t> assetsBlob;
238 const int64_t index2 = 2;
239 for (int64_t i = begin; i < begin + count; ++i) {
240 std::string name = g_localAsset.name + std::to_string(i);
241 Asset asset = g_localAsset;
242 asset.name = name;
243 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
244 std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
245 string sql = "INSERT OR REPLACE INTO " + tableName +
246 " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
247 "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
248 sqlite3_stmt *stmt = nullptr;
249 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
250 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
251 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
252 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
253 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
254 SQLiteUtils::ResetStatement(stmt, true, errCode);
255 }
256 }
257
UpdateLocalTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,bool updateAssets)258 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalTableRecord(const std::string &tableName, int64_t begin,
259 int64_t count, size_t assetCount, bool updateAssets)
260 {
261 int errCode;
262 std::vector<uint8_t> assetBlob;
263 std::vector<uint8_t> assetsBlob;
264 std::string hash = updateAssets ? "new_hash" : g_localAsset.hash;
265 for (int64_t i = begin; i < begin + count; ++i) {
266 std::string name = g_localAsset.name + std::to_string(i);
267 Asset asset = g_localAsset;
268 asset.name = name;
269 asset.hash = hash;
270 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
271 std::vector<Asset> assets = GetAssets(name, {}, assetCount);
272 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
273 std::string dataName = "new_name_" + std::to_string(i);
274 std::string sql = "UPDATE " + tableName + " SET name = ?, asset = ?, assets = ? where id = " +
275 std::to_string(i);
276 sqlite3_stmt *stmt = nullptr;
277 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
278 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 1, dataName), E_OK); // 1st bind
279 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetBlob, false), E_OK); // 2nd bind
280 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetsBlob, false), E_OK); // 3rd bind
281 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
282 SQLiteUtils::ResetStatement(stmt, true, errCode);
283 }
284 }
285
InsertRecordToCloud(const std::vector<VBucket> & record)286 DBStatus DistributedDBCloudAssetsOperationSyncTest::InsertRecordToCloud(const std::vector<VBucket> &record)
287 {
288 std::vector<VBucket> extend;
289 for (size_t i = 0; i < record.size(); ++i) {
290 VBucket log;
291 Timestamp now = TimeHelper::GetSysCurrentTime();
292 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
293 now / CloudDbConstant::TEN_THOUSAND));
294 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
295 now / CloudDbConstant::TEN_THOUSAND));
296 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
297 extend.push_back(log);
298 }
299 std::vector<VBucket> copyRecord = record;
300 return virtualCloudDb_->BatchInsert(tableName_, std::move(copyRecord), extend);
301 }
302
PrepareDataInCloud()303 void DistributedDBCloudAssetsOperationSyncTest::PrepareDataInCloud()
304 {
305 std::map<std::string, int32_t> colType;
306 colType["asset"] = TYPE_INDEX<Asset>;
307 colType["assets"] = TYPE_INDEX<Assets>;
308 Asset templateAsset = g_localAsset;
309 const int assetsCount = 10;
310 const int rowCount = 200;
311 auto recordAssets = GenerateAssetsRecords(colType, templateAsset, assetsCount, rowCount);
312 EXPECT_EQ(InsertRecordToCloud(recordAssets), OK);
313 }
314
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)315 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
316 const Assets &templateAsset, size_t assetCount)
317 {
318 std::vector<Asset> assets;
319 for (size_t i = 1; i <= assetCount; ++i) {
320 Asset asset;
321 if (i - 1 < templateAsset.size()) {
322 asset = templateAsset[i - 1];
323 } else {
324 asset = g_localAsset;
325 asset.name = baseName + "_" + std::to_string(i);
326 asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
327 }
328 assets.push_back(asset);
329 }
330 return assets;
331 }
332
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)333 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
334 {
335 std::vector<VBucket> record;
336 std::vector<VBucket> extend;
337 Timestamp now = TimeHelper::GetSysCurrentTime();
338 const int assetCount = 2;
339 for (int64_t i = begin; i < (begin + count); ++i) {
340 VBucket data;
341 data.insert_or_assign("id", std::to_string(i));
342 data.insert_or_assign("name", "Cloud" + std::to_string(i));
343 Assets assets;
344 for (int j = 1; j <= assetCount; ++j) {
345 Asset asset;
346 asset.name = "Phone_" + std::to_string(j);
347 asset.assetId = std::to_string(j);
348 asset.status = AssetStatus::UPDATE;
349 assets.push_back(asset);
350 }
351 assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
352 record.push_back(data);
353 VBucket log;
354 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
355 now / CloudDbConstant::TEN_THOUSAND));
356 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
357 now / CloudDbConstant::TEN_THOUSAND));
358 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
359 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
360 extend.push_back(log);
361 }
362
363 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
364 }
365
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)366 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
367 bool checkAsset)
368 {
369 std::vector<VBucket> allData;
370 auto dbSchema = GetSchema();
371 ASSERT_GT(dbSchema.tables.size(), 0u);
372 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
373 int index = 0;
374 ASSERT_EQ(allData.size(), expectCount.size());
375 for (const auto &data : allData) {
376 auto colIter = data.find("assets");
377 EXPECT_NE(colIter, data.end());
378 if (colIter == data.end()) {
379 index++;
380 continue;
381 }
382 Type colValue = data.at("assets");
383 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
384 auto assets = RelationalTestUtils::GetAssets(colValue, translate);
385 size_t size = assets.size();
386 if (checkAsset) {
387 Type colValue1 = data.at("asset");
388 auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
389 size += assets1.size();
390 }
391 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
392 EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
393 for (const auto &item : assets) {
394 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
395 item.status);
396 }
397 index++;
398 }
399 }
400
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)401 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
402 int &removeCount)
403 {
404 virtualAssetLoader_->ForkDownload([this, &downLoadCount](const std::string &tableName,
405 std::map<std::string, Assets> &assets) {
406 downLoadCount++;
407 if (downLoadCount == 1) {
408 std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
409 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
410 }
411 });
412 virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
413 EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
414 removeCount++;
415 return removeStatus;
416 });
417 }
418
GenerateAssetsRecords(const std::map<std::string,int32_t> & colType,const Asset & templateAsset,int assetsCount,int rowCount)419 std::vector<VBucket> DistributedDBCloudAssetsOperationSyncTest::GenerateAssetsRecords(
420 const std::map<std::string, int32_t> &colType, const Asset &templateAsset, int assetsCount, int rowCount)
421 {
422 std::vector<VBucket> res;
423 for (int i = 0; i < rowCount; ++i) {
424 VBucket record;
425 record["id"] = std::to_string(i);
426 for (const auto &[col, type] : colType) {
427 if (type == TYPE_INDEX<Asset>) {
428 record[col] = GenerateAsset(templateAsset, i);
429 } else if (type == TYPE_INDEX<Assets>) {
430 record[col] = GenerateAssets(templateAsset, i, assetsCount);
431 }
432 }
433 res.push_back(record);
434 }
435 return res;
436 }
437
GenerateAsset(const Asset & templateAsset,int id)438 Asset DistributedDBCloudAssetsOperationSyncTest::GenerateAsset(const Asset &templateAsset, int id)
439 {
440 Asset res = templateAsset;
441 res.name.append("_").append(std::to_string(id));
442 res.hash.append("_").append(std::to_string(id));
443 return res;
444 }
445
GenerateAssets(const Asset & templateAsset,int id,int assetsCount)446 Assets DistributedDBCloudAssetsOperationSyncTest::GenerateAssets(const Asset &templateAsset, int id, int assetsCount)
447 {
448 Assets assets;
449 auto baseAsset = GenerateAsset(templateAsset, id);
450 for (int i = 0; i < assetsCount; ++i) {
451 assets.push_back(GenerateAsset(baseAsset, i));
452 }
453 return assets;
454 }
455
456 /**
457 * @tc.name: SyncWithAssetOperation001
458 * @tc.desc: Delete Assets When Download
459 * @tc.type: FUNC
460 * @tc.require:
461 * @tc.author: zhangqiquan
462 */
463 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level1)
464 {
465 const int actualCount = 10;
466 const int deleteDataCount = 5;
467 const int deleteAssetsCount = 4;
468 InsertUserTableRecord(tableName_, 0, actualCount);
469 std::string tableName = tableName_;
__anonaa4bfccd0602(const std::string &, VBucket &) 470 virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
471 for (int64_t i = 0; i < deleteDataCount; i++) {
472 std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
473 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
474 }
475 for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
476 std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
477 std::to_string(i) + ";";
478 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
479 }
480 });
481 Query query = Query::Select().FromTable({ tableName_ });
482 BlockSync(query, delegate_);
483 virtualCloudDb_->ForkUpload(nullptr);
484 std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
485 expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
486 CheckAssetsCount(expectCount);
487 }
488
489 /**
490 * @tc.name: SyncWithAssetOperation002
491 * @tc.desc: Download Assets When local assets was removed
492 * @tc.type: FUNC
493 * @tc.require:
494 * @tc.author: zhangqiquan
495 */
496 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level1)
497 {
498 const int actualCount = 1;
499 InsertUserTableRecord(tableName_, 0, actualCount);
500 Query query = Query::Select().FromTable({ tableName_ });
501 BlockSync(query, delegate_);
502 int downLoadCount = 0;
503 int removeCount = 0;
504 ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
505 UpdateCloudTableRecord(0, actualCount, false);
506 RelationalTestUtils::CloudBlockSync(query, delegate_);
507 EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
508 EXPECT_EQ(removeCount, 1);
509 virtualAssetLoader_->ForkDownload(nullptr);
510 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
511
512 std::vector<size_t> expectCount = { 0 };
513 CheckAssetsCount(expectCount);
514 }
515
516 /**
517 * @tc.name: SyncWithAssetOperation003
518 * @tc.desc: Delete Assets When Download
519 * @tc.type: FUNC
520 * @tc.require:
521 * @tc.author: bty
522 */
523 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level1)
524 {
525 InsertUserTableRecord(tableName_, 0, 1); // 1 is count
526 int uploadCount = 0;
__anonaa4bfccd0702(const std::string &, VBucket &) 527 virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
528 if (uploadCount > 0) {
529 return;
530 }
531 SqlCondition condition;
532 condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
533 std::vector<VBucket> records;
534 EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
535 uploadCount++;
536 });
537 Query query = Query::Select().FromTable({ tableName_ });
538 BlockSync(query, delegate_);
539 virtualCloudDb_->ForkUpload(nullptr);
540
541 std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
542 sqlite3_stmt *stmt = nullptr;
543 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
544 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
545 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
546 Type cloudValue;
547 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
548 std::vector<uint8_t> assetsBlob;
549 Assets assets;
550 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
551 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
552 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
553 for (size_t i = 0; i < assets.size(); ++i) {
554 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
555 }
556 }
557 int errCode;
558 SQLiteUtils::ResetStatement(stmt, true, errCode);
559 }
560
LocalAssetRemoveTest()561 void DistributedDBCloudAssetsOperationSyncTest::LocalAssetRemoveTest()
562 {
563 const int actualCount = 5; // 5 record
564 InsertUserTableRecord(tableName_, 0, actualCount);
565 Query query = Query::Select().FromTable({ tableName_ });
566 BlockSync(query, delegate_);
567 int downLoadCount = 0;
568 int removeCount = 0;
569 ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
570 UpdateCloudTableRecord(0, actualCount, false);
571 RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOVE_ASSETS_FAIL);
572 EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
573 EXPECT_EQ(removeCount, 1);
574 virtualAssetLoader_->ForkDownload(nullptr);
575 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
576
577 std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
578 CheckAssetsCount(expectCount);
579 }
580
581 /**
582 * @tc.name: SyncWithAssetOperation004
583 * @tc.desc: Download Assets When local assets was removed
584 * @tc.type: FUNC
585 * @tc.require:
586 * @tc.author: zhangqiquan
587 */
588 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level1)
589 {
590 LocalAssetRemoveTest();
591 }
592
UpdateAssetWhenSyncUpload()593 void DistributedDBCloudAssetsOperationSyncTest::UpdateAssetWhenSyncUpload()
594 {
595 string sql = "UPDATE " + tableName_ + " SET asset = ? WHERE id = '54';";
596 Asset asset = g_localAsset;
597 asset.hash = "123";
598 const int assetId = 54;
599 asset.name = g_localAsset.name + std::to_string(assetId);
600 std::vector<uint8_t> assetBlob;
601 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
602 sqlite3_stmt *stmt = nullptr;
603 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
604 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
605 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
606 int errCode;
607 SQLiteUtils::ResetStatement(stmt, true, errCode);
608 }
609
610 /**
611 * @tc.name: SyncWithAssetOperation005
612 * @tc.desc: check asset when update in fill before upload sync process
613 * @tc.type: FUNC
614 * @tc.require:
615 * @tc.author: luoguo
616 */
617 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation005, TestSize.Level1)
618 {
619 /**
620 * @tc.steps:step1. Insert 60 records.
621 * @tc.expected: step1. ok.
622 */
623 InsertUserTableRecord(tableName_, 0, 60);
624
625 /**
626 * @tc.steps:step2. Sync to cloud and wait in upload.
627 * @tc.expected: step2. ok.
628 */
629 bool isUpload = false;
__anonaa4bfccd0802(const std::string &, VBucket &) 630 virtualCloudDb_->ForkUpload([&isUpload](const std::string &, VBucket &) {
631 if (isUpload == true) {
632 return;
633 }
634 isUpload = true;
635 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
636 });
637 Query query = Query::Select().FromTable({tableName_});
638
639 bool finish = false;
__anonaa4bfccd0902(const std::map<std::string, SyncProcess> &process) 640 auto callback = [&finish](const std::map<std::string, SyncProcess> &process) {
641 for (const auto &item : process) {
642 if (item.second.process == DistributedDB::FINISHED) {
643 {
644 finish = true;
645 }
646 }
647 }
648 };
649 ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
650
651 while (isUpload == false) {
652 std::this_thread::sleep_for(std::chrono::milliseconds(50));
653 }
654
655 /**
656 * @tc.steps:step3. update asset when sync upload.
657 * @tc.expected: step3. ok.
658 */
659 UpdateAssetWhenSyncUpload();
660
661 /**
662 * @tc.steps:step4. check asset data.
663 * @tc.expected: step4. ok.
664 */
665 while (finish == false) {
666 std::this_thread::sleep_for(std::chrono::milliseconds(50));
667 }
668 virtualCloudDb_->ForkUpload(nullptr);
669 std::vector<VBucket> allData;
670 auto dbSchema = GetSchema();
671 ASSERT_GT(dbSchema.tables.size(), 0u);
672 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
673 ASSERT_EQ(allData.size(), 60ul);
674 auto data = allData[54]; // update data
675 auto data1 = allData[55]; // no update data
676
677 Type colValue = data.at("asset");
678 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
679 auto assets = RelationalTestUtils::GetAssets(colValue, translate, true);
680 ASSERT_EQ(assets[0].hash, std::string("123"));
681
682 Type colValue1 = data1.at("asset");
683 auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
684 ASSERT_EQ(assets1[0].hash, std::string("DEC"));
685 }
686
687 /**
688 * @tc.name: SyncWithAssetOperation006
689 * @tc.desc: Remove Local Datas When local assets was empty
690 * @tc.type: FUNC
691 * @tc.require:
692 * @tc.author: lijun
693 */
694 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level1)
695 {
696 const int actualCount = 5;
697 InsertUserTableRecord(tableName_, 0, actualCount);
698 Query query = Query::Select().FromTable({ tableName_ });
699 BlockSync(query, delegate_);
700
701 UpdateCloudTableRecord(0, 2, true);
702 BlockSync(query, delegate_);
703
704 int removeCount = 0;
__anonaa4bfccd0a02(const std::vector<Asset> &assets) 705 virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
706 removeCount = assets.size();
707 return DBStatus::OK;
708 });
709 std::string device = "";
710 ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
711 ASSERT_EQ(9, removeCount);
712 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
713 }
714
715 /**
716 * @tc.name: SyncWithAssetOperation007
717 * @tc.desc: Test assetId fill when assetId changed
718 * @tc.type: FUNC
719 * @tc.require:
720 * @tc.author: wangxiangdong
721 */
722 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level1)
723 {
724 /**
725 * @tc.steps:step1. Insert 5 records and sync.
726 * @tc.expected: step1. ok.
727 */
728 const int actualCount = 5;
729 std::string name = g_localAsset.name + std::to_string(0);
730 Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
731 expectAssets[0].hash.append("change"); // modify first asset
732 InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
733 Query query = Query::Select().FromTable({ tableName_ });
734 BlockSync(query, delegate_);
735 /**
736 * @tc.steps:step2. modify data and sync.
737 * @tc.expected: step2. ok.
738 */
739 UpdateCloudTableRecord(0, 1, true);
740 BlockSync(query, delegate_);
741 /**
742 * @tc.steps:step3. check modified data cursor.
743 * @tc.expected: step3. ok.
744 */
745 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
746 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
747 reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
748 sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
749 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
750 reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
751 }
752
753 /**
754 * @tc.name: SyncWithAssetOperation008
755 * @tc.desc: Test assetId fill when assetId changed
756 * @tc.type: FUNC
757 * @tc.require:
758 * @tc.author: wangxiangdong
759 */
760 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation008, TestSize.Level1)
761 {
762 /**
763 * @tc.steps:step1. Insert 5 records and sync.
764 * @tc.expected: step1. ok.
765 */
766 const int actualCount = 5;
767 InsertUserTableRecord(tableName_, 0, actualCount);
768 Query query = Query::Select().FromTable({ tableName_ });
769 BlockSync(query, delegate_);
770 /**
771 * @tc.steps:step2. modify data and sync.
772 * @tc.expected: step2. ok.
773 */
774 UpdateCloudTableRecord(0, 1, true);
775 int removeCount = 0;
__anonaa4bfccd0b02(std::map<std::string, Assets> &assets) 776 virtualAssetLoader_->SetRemoveLocalAssetsCallback([&removeCount](std::map<std::string, Assets> &assets) {
777 removeCount = assets["asset"].size() + assets["assets"].size();
778 return LOCAL_ASSET_NOT_FOUND;
779 });
780 BlockSync(query, delegate_);
781 EXPECT_EQ(removeCount, 3); // one record has 3 asset
782 virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
783 /**
784 * @tc.steps:step3. check asset number.
785 * @tc.expected: step3. ok.
786 */
787 std::vector<size_t> expectCount = { 3, 3, 3, 3, 3 };
788 CheckAssetsCount(expectCount, true);
789 /**
790 * @tc.steps:step4. sync and check.
791 * @tc.expected: step4. ok.
792 */
793 BlockSync(query, delegate_);
794 expectCount = { 0, 3, 3, 3, 3 };
795 CheckAssetsCount(expectCount, true);
796 }
797
798 /**
799 * @tc.name: SyncWithAssetOperation009
800 * @tc.desc: Test asset remove local and check db asset empty finally.
801 * @tc.type: FUNC
802 * @tc.require:
803 * @tc.author: wangxiangdong
804 */
805 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level1)
806 {
807 /**
808 * @tc.steps:step1. Insert 5 records and sync.
809 * @tc.expected: step1. ok.
810 */
811 const int actualCount = 5;
812 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
813 InsertUserTableRecord(tableName_, 0, actualCount);
814 /**
815 * @tc.steps:step2. modify data and sync.
816 * @tc.expected: step2. ok.
817 */
818 UpdateCloudTableRecord(0, 1, true);
819 Query query = Query::Select().FromTable({ tableName_ });
820 BlockSync(query, delegate_);
821 /**
822 * @tc.steps:step3. check asset number.
823 * @tc.expected: step3. ok.
824 */
825 std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
826 CheckAssetsCount(expectCount, true);
827 }
828
InsertLocalAssetData(const std::string & assetHash)829 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
830 {
831 Assets assets;
832 std::string assetNameBegin = "Phone";
833 for (int j = 1; j <= g_assetsNum; ++j) {
834 Asset asset;
835 asset.name = assetNameBegin + "_" + std::to_string(j);
836 asset.status = AssetStatus::NORMAL;
837 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
838 asset.hash = assetHash + "_" + std::to_string(j);
839 asset.assetId = std::to_string(j);
840 assets.push_back(asset);
841 }
842 string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
843 sqlite3_stmt *stmt = nullptr;
844 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
845 std::vector<uint8_t> assetBlob;
846 std::vector<uint8_t> assetsBlob;
847 RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
848 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
849 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
850 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
851 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
852 int errCode;
853 SQLiteUtils::ResetStatement(stmt, true, errCode);
854 }
855
InsertCloudAssetData(const std::string & assetHash)856 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
857 {
858 std::vector<VBucket> record;
859 std::vector<VBucket> extend;
860 Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
861 VBucket data;
862 data.insert_or_assign("id", "0");
863 data.insert_or_assign("name", "CloudTest0");
864 Asset asset = g_localAsset;
865 data.insert_or_assign("asset", asset);
866 Assets assets;
867 std::string assetNameBegin = "Phone";
868 for (int j = 1; j <= g_assetsNum; ++j) {
869 Asset assetTmp;
870 assetTmp.name = assetNameBegin + "_" + std::to_string(j);
871 assetTmp.status = AssetStatus::NORMAL;
872 assetTmp.hash = assetHash + "_" + std::to_string(j);
873 assetTmp.assetId = std::to_string(j);
874 assets.push_back(assetTmp);
875 }
876 data.insert_or_assign("assets", assets);
877 record.push_back(data);
878 VBucket log;
879 log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
880 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
881 log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
882 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
883 log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
884 extend.push_back(log);
885 virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
886 }
887
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)888 void DistributedDBCloudAssetsOperationSyncTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
889 const std::string &tableName)
890 {
891 Timestamp now = TimeHelper::GetSysCurrentTime();
892 std::vector<VBucket> extend;
893 for (int64_t i = 0; i < count; ++i) {
894 VBucket log;
895 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
896 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
897 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
898 extend.push_back(log);
899 }
900 ASSERT_EQ(virtualCloudDb_->BatchDelete(tableName, extend), DBStatus::OK);
901 std::this_thread::sleep_for(std::chrono::milliseconds(count));
902 }
903
PrepareForAssetOperation010()904 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
905 {
906 InsertCloudAssetData("cloudAsset");
907 InsertLocalAssetData("localAsset");
908 }
909
910 /**
911 * @tc.name: SyncWithAssetOperation010
912 * @tc.desc: Test check status of asset, when the hash of asset is different.
913 * @tc.type: FUNC
914 * @tc.require:
915 * @tc.author: liufuchenxing
916 */
917 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level1)
918 {
919 /**
920 * @tc.steps:step1. prepare local and cloud asset data.
921 * @tc.expected: step1. ok.
922 */
923 PrepareForAssetOperation010();
924
925 /**
926 * @tc.steps:step2. sync and check the status of assets.
927 * @tc.expected: step2. ok.
928 */
929 virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anonaa4bfccd0c02(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 930 std::vector<VBucket> &extend, bool) {
931 ASSERT_EQ(static_cast<int>(record.size()), 1);
932 VBucket &bucket = record[0];
933 ASSERT_TRUE(bucket.find("assets") != bucket.end());
934 Assets assets = std::get<Assets>(bucket["assets"]);
935 ASSERT_EQ(static_cast<int>(assets.size()), 3);
936 for (size_t i = 0; i < assets.size(); i++) {
937 ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
938 }
939 });
940
941 Query query = Query::Select().FromTable({ tableName_ });
942 BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
943 }
944
WriteDataWithoutCommitTransaction()945 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
946 {
947 ASSERT_NE(db_, nullptr);
948 SQLiteUtils::BeginTransaction(db_);
949 InsertLocalAssetData("localAsset");
950 constexpr int kSleepDurationSeconds = 3;
951 std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
952 }
953
954 /**
955 * @tc.name: TestOpenDatabaseBusy001
956 * @tc.desc: Test open database when the database is busy.
957 * @tc.type: FUNC
958 * @tc.require:
959 * @tc.author: liufuchenxing
960 */
961 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
962 {
963 /**
964 * @tc.steps:step1. close store.
965 * @tc.expected:step1. check ok.
966 */
967 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
968 delegate_ = nullptr;
969 /**
970 * @tc.steps:step2. Another thread write data into database into database without commit.
971 * @tc.expected:step2. check ok.
972 */
973 std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
974 std::this_thread::sleep_for(std::chrono::seconds(1));
975 /**
976 * @tc.steps:step3. open relational delegate.
977 * @tc.expected:step3. open success.
978 */
979 RelationalStoreDelegate::Option option;
980 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
981 thread.join();
982 }
983
984 /**
985 * @tc.name: SyncWithAssetOperation011
986 * @tc.desc: Test change assets between download and remove
987 * @tc.type: FUNC
988 * @tc.require:
989 * @tc.author: liaoyonghuang
990 */
991 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation011, TestSize.Level1)
992 {
993 /**
994 * @tc.steps:step1. Insert 5 records and sync.
995 * @tc.expected: step1. ok.
996 */
997 const int actualCount = 5;
998 InsertUserTableRecord(tableName_, 0, actualCount);
999 Query query = Query::Select().FromTable({ tableName_ });
1000 BlockSync(query, delegate_);
1001 /**
1002 * @tc.steps:step2. modify assets of cloud data. update 1st asset and delete 2nd asset.
1003 * @tc.expected: step2. ok.
1004 */
1005 std::vector<VBucket> record;
1006 std::vector<VBucket> extend;
1007 int dataNum = 0;
1008 Timestamp now = TimeHelper::GetSysCurrentTime();
1009 VBucket data;
1010 data.insert_or_assign("id", std::to_string(dataNum));
1011 data.insert_or_assign("name", "Cloud" + std::to_string(dataNum));
1012 Asset cloudAsset = g_localAsset;
1013 cloudAsset.name += std::to_string(dataNum);
1014 cloudAsset.hash = "new_hash";
1015 Assets cloudAssets = {cloudAsset};
1016 data.insert_or_assign("assets", cloudAssets);
1017 record.push_back(data);
1018 VBucket log;
1019 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
1020 now / CloudDbConstant::TEN_THOUSAND));
1021 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
1022 now / CloudDbConstant::TEN_THOUSAND));
1023 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(dataNum));
1025 extend.push_back(log);
1026 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
1027 /**
1028 * @tc.steps:step3. Update local assets between remove and download, sync and check whether download is invoked.
1029 * @tc.expected: step3. ok.
1030 */
__anonaa4bfccd0d02(std::map<std::string, Assets> &assets) 1031 virtualAssetLoader_->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
1032 UpdateLocalTableRecord(tableName_, 0, 1);
1033 return OK;
1034 });
__anonaa4bfccd0e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1035 virtualAssetLoader_->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
1036 EXPECT_TRUE(false);
1037 });
1038 BlockSync(query, delegate_);
1039
1040 virtualAssetLoader_->SetRemoveLocalAssetsCallback(nullptr);
1041 virtualAssetLoader_->ForkDownload(nullptr);
1042 }
1043
1044 /**
1045 * @tc.name: SyncWithAssetOperation012
1046 * @tc.desc: Batch download Assets When local assets was removed
1047 * @tc.type: FUNC
1048 * @tc.require:
1049 * @tc.author: liaoyonghuang
1050 */
1051 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation012, TestSize.Level1)
1052 {
1053 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1054 LocalAssetRemoveTest();
1055 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1056 }
1057
UpdateLocalAssetRecord(const std::string & tableName,int64_t begin,int64_t count)1058 void DistributedDBCloudAssetsOperationSyncTest::UpdateLocalAssetRecord(const std::string &tableName, int64_t begin,
1059 int64_t count)
1060 {
1061 int errCode;
1062 std::vector<uint8_t> assetBlob;
1063 std::vector<uint8_t> assetsBlob;
1064 for (int64_t i = begin; i < begin + count; ++i) {
1065 Asset asset = g_localAsset;
1066 asset.hash = "new_hash";
1067 asset.status = static_cast<uint32_t>(AssetStatus::UPDATE);
1068 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
1069 std::vector<Asset> assets;
1070 assets.push_back(asset);
1071 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
1072 std::string sql = "UPDATE " + tableName + " SET height = '175.0', asset = ?, assets = ? where id = " +
1073 std::to_string(i);
1074 sqlite3_stmt *stmt = nullptr;
1075 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
1076 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK); // 1st bind
1077 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2nd bind
1078 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
1079 SQLiteUtils::ResetStatement(stmt, true, errCode);
1080 }
1081 }
1082
1083 /**
1084 * @tc.name: SyncWithAssetOperation013
1085 * @tc.desc: Test device modify data and then sync cursor will not changes
1086 * @tc.type: FUNC
1087 * @tc.require:
1088 * @tc.author: caihaoting
1089 */
1090 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation013, TestSize.Level1)
1091 {
1092 /**
1093 * @tc.steps:step1. Insert local and cloud asset data and sync.
1094 * @tc.expected: step1. ok.
1095 */
1096 InsertCloudAssetData("assetHash");
1097 InsertLocalAssetData("assetHash");
1098 Query query = Query::Select().FromTable({ tableName_ });
1099 BlockSync(query, delegate_);
1100 /**
1101 * @tc.steps:step2. modify data of asset and sync.
1102 * @tc.expected: step2. ok.
1103 */
1104 UpdateLocalAssetRecord(tableName_, 0, 1);
1105 const int cursor = 2;
1106 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1107 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1108 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1109 BlockSync(query, delegate_);
1110 /**
1111 * @tc.steps:step3. check modified data cursor and cursor is not changed.
1112 * @tc.expected: step3. ok.
1113 */
1114 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1115 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1116 }
1117
1118 /**
1119 * @tc.name: SyncWithAssetOperation014
1120 * @tc.desc: Test device data does not change while sync and cursor will not changes
1121 * @tc.type: FUNC
1122 * @tc.require:
1123 * @tc.author: caihaoting
1124 */
1125 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation014, TestSize.Level1)
1126 {
1127 /**
1128 * @tc.steps:step1. Insert 5 records and sync.
1129 * @tc.expected: step1. ok.
1130 */
1131 const int actualCount = 5;
1132 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1133 InsertUserTableRecord(tableName_, 0, actualCount);
1134 Query query = Query::Select().FromTable({ tableName_ });
1135 BlockSync(query, delegate_);
1136 /**
1137 * @tc.steps:step2. modify data and sync.
1138 * @tc.expected: step2. ok.
1139 */
1140 UpdateLocalAssetRecord(tableName_, 0, 1);
1141 const int cursor = 6;
1142 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
1143 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1144 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1145 BlockSync(query, delegate_);
1146 /**
1147 * @tc.steps:step3. check modified data cursor and cursor is not changed.
1148 * @tc.expected: step3. ok.
1149 */
1150 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1151 reinterpret_cast<void *>(cursor), nullptr), SQLITE_OK);
1152 }
1153
1154 /**
1155 * @tc.name: IgnoreRecord001
1156 * @tc.desc: Download Assets When local assets was removed
1157 * @tc.type: FUNC
1158 * @tc.require:
1159 * @tc.author: zhangqiquan
1160 */
1161 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level1)
1162 {
1163 const int actualCount = 1;
1164 InsertUserTableRecord(tableName_, 0, actualCount);
1165 Query query = Query::Select().FromTable({ tableName_ });
1166 BlockSync(query, delegate_);
1167 std::vector<size_t> expectCount = { 2 };
1168 CheckAssetsCount(expectCount);
1169
1170 VBucket record;
1171 record["id"] = std::to_string(0);
1172 record["assets"] = Assets();
1173 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1174 record["id"] = std::to_string(1);
1175 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1176 expectCount = { 0, 0 };
1177 CheckAssetsCount(expectCount);
1178
1179 std::vector<VBucket> logs;
1180 EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
1181 for (const auto &log : logs) {
1182 int64_t cursor = std::get<int64_t>(log.at("cursor"));
1183 EXPECT_GE(cursor, 0);
1184 }
1185 }
1186
1187 /**
1188 * @tc.name: IgnoreRecord002
1189 * @tc.desc: Ignore Assets When Download
1190 * @tc.type: FUNC
1191 * @tc.require:
1192 * @tc.author: zhangqiquan
1193 */
1194 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level1)
1195 {
1196 const int actualCount = 1;
1197 InsertUserTableRecord(tableName_, 0, actualCount);
1198 Query query = Query::Select().FromTable({ tableName_ });
1199 RelationalTestUtils::CloudBlockSync(query, delegate_);
1200 UpdateCloudTableRecord(0, actualCount, false);
1201
1202 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1203 RelationalTestUtils::CloudBlockSync(query, delegate_);
1204 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1205 std::vector<size_t> expectCount = { 4 };
1206 CheckAssetsCount(expectCount);
1207 RelationalTestUtils::CloudBlockSync(query, delegate_);
1208 }
1209
1210 /**
1211 * @tc.name: IgnoreRecord003
1212 * @tc.desc: Ignore Assets When Upload
1213 * @tc.type: FUNC
1214 * @tc.require:
1215 * @tc.author: zhangqiquan
1216 */
1217 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level1)
1218 {
1219 const int actualCount = 1;
1220 InsertUserTableRecord(tableName_, 0, actualCount);
1221 Query query = Query::Select().FromTable({ tableName_ });
1222 virtualCloudDb_->SetConflictInUpload(true);
1223 RelationalTestUtils::CloudBlockSync(query, delegate_);
1224 virtualCloudDb_->SetConflictInUpload(false);
1225 std::vector<size_t> expectCount = { 2 };
1226 CheckAssetsCount(expectCount);
1227 RelationalTestUtils::CloudBlockSync(query, delegate_);
1228 }
1229
1230 /**
1231 * @tc.name: IgnoreRecord004
1232 * @tc.desc: Ignore Assets When Btch Download
1233 * @tc.type: FUNC
1234 * @tc.require:
1235 * @tc.author: luoguo
1236 */
1237 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord004, TestSize.Level1)
1238 {
1239 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1240 const int actualCount = 10;
1241 InsertUserTableRecord(tableName_, 0, actualCount);
1242 Query query = Query::Select().FromTable({ tableName_ });
1243 RelationalTestUtils::CloudBlockSync(query, delegate_);
1244 UpdateCloudTableRecord(0, actualCount, false);
1245
1246 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1247 RelationalTestUtils::CloudBlockSync(query, delegate_);
1248 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
1249 std::vector<size_t> expectCount(10, 4);
1250 CheckAssetsCount(expectCount);
1251 RelationalTestUtils::CloudBlockSync(query, delegate_);
1252 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1253 }
1254
1255 /**
1256 * @tc.name: UpsertData001
1257 * @tc.desc: Upsert data after delete it
1258 * @tc.type: FUNC
1259 * @tc.require:
1260 * @tc.author: zhangqiquan
1261 */
1262 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level1)
1263 {
1264 // insert id 0 to local
1265 const int actualCount = 1;
1266 InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
1267 std::vector<std::map<std::string, std::string>> conditions;
1268 std::map<std::string, std::string> entries;
1269 entries["id"] = "0";
1270 conditions.push_back(entries);
1271 // delete id 0 in local
1272 RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
1273 // upsert id 0 to local
1274 VBucket record;
1275 record["id"] = std::to_string(0);
1276 record["assets"] = Assets();
1277 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
1278 // check id 0 exist
1279 CheckAssetsCount({ 0 });
1280 }
1281
1282 /**
1283 * @tc.name: UpsertData002
1284 * @tc.desc: Test sync after Upsert.
1285 * @tc.type: FUNC
1286 * @tc.require:
1287 * @tc.author: liaoyonghuang
1288 */
1289 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level1)
1290 {
1291 /**
1292 * @tc.steps:step1. Insert 5 records and sync.
1293 * @tc.expected: step1. ok.
1294 */
1295 const int actualCount = 5;
1296 InsertUserTableRecord(tableName_, 0, actualCount);
1297 Query query = Query::Select().FromTable({ tableName_ });
1298 BlockSync(query, delegate_);
1299
1300 /**
1301 * @tc.steps:step2. UpsertData and sync.
1302 * @tc.expected: step2. ok.
1303 */
1304 int dataCnt = -1;
1305 std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 5";
__anonaa4bfccd0f02(sqlite3_stmt *stmt) 1306 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1307 dataCnt = sqlite3_column_int(stmt, 0);
1308 return E_OK;
1309 });
1310 EXPECT_EQ(dataCnt, 1);
1311 vector<VBucket> records;
1312 for (int i = 0; i < actualCount; i++) {
1313 VBucket record;
1314 record["id"] = std::to_string(i);
1315 record["name"] = std::string("UpsertName");
1316 records.push_back(record);
1317 }
1318 EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
1319 // check cursor has been increased
1320 checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 10";
__anonaa4bfccd1002(sqlite3_stmt *stmt) 1321 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
1322 dataCnt = sqlite3_column_int(stmt, 0);
1323 return E_OK;
1324 });
1325 EXPECT_EQ(dataCnt, 1);
1326 BlockSync(query, delegate_);
1327
1328 /**
1329 * @tc.steps:step3. Check local data.
1330 * @tc.expected: step3. All local data has been merged by the cloud.
1331 */
1332 std::vector<VBucket> allData;
1333 auto dbSchema = GetSchema();
1334 ASSERT_GT(dbSchema.tables.size(), 0u);
1335 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
1336 for (const auto &data : allData) {
1337 ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
1338 }
1339 }
1340
1341 /**
1342 * @tc.name: SyncWithAssetConflict001
1343 * @tc.desc: Upload with asset no change
1344 * @tc.type: FUNC
1345 * @tc.require:
1346 * @tc.author: zhangqiquan
1347 */
1348 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level1)
1349 {
1350 // cloud and local insert same data
1351 const int actualCount = 1;
1352 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1353 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1354 InsertUserTableRecord(tableName_, 0, actualCount);
1355 // sync and local asset's status are normal
1356 Query query = Query::Select().FromTable({ tableName_ });
1357 RelationalTestUtils::CloudBlockSync(query, delegate_);
1358 auto dbSchema = GetSchema();
1359 ASSERT_GT(dbSchema.tables.size(), 0u);
1360 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1361 for (const auto &oneRow : assets) {
1362 for (const auto &asset : oneRow) {
1363 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1364 }
1365 }
1366 }
1367
1368 /**
1369 * @tc.name: UpsertDataInvalid001
1370 * @tc.desc: Upsert invalid data
1371 * @tc.type: FUNC
1372 * @tc.require:
1373 * @tc.author: wangxiangdong
1374 */
1375 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
1376 {
1377 VBucket record;
1378 record["id"] = std::to_string(0);
1379 record["assets"] = Assets();
1380 /**
1381 * @tc.steps:step1. UpsertData to empty table.
1382 * @tc.expected: step1. INVALID_ARGS.
1383 */
1384 EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
1385 /**
1386 * @tc.steps:step2. UpsertData to shared table.
1387 * @tc.expected: step2. INVALID_ARGS.
1388 */
1389 EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
1390 /**
1391 * @tc.steps:step3. UpsertData to not device table and shared table.
1392 * @tc.expected: step3. NOT_FOUND.
1393 */
1394 const char *createSQL =
1395 "CREATE TABLE IF NOT EXISTS testing(" \
1396 "id TEXT PRIMARY KEY," \
1397 "name TEXT," \
1398 "height REAL ," \
1399 "photo BLOB," \
1400 "asset ASSET," \
1401 "assets ASSETS," \
1402 "age INT);";
1403 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1404 EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
1405 /**
1406 * @tc.steps:step4. UpsertData to not exist table.
1407 * @tc.expected: step4. NOT_FOUND.
1408 */
1409 EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
1410 }
1411
1412 /**
1413 * @tc.name: UpsertDataInvalid002
1414 * @tc.desc: Upsert device data
1415 * @tc.type: FUNC
1416 * @tc.require:
1417 * @tc.author: wangxiangdong
1418 */
1419 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level1)
1420 {
1421 VBucket record;
1422 record["id"] = std::to_string(0);
1423 record["assets"] = Assets();
1424 /**
1425 * @tc.steps:step1. create user table.
1426 * @tc.expected: step1. INVALID_ARGS.
1427 */
1428 const char *createSQL =
1429 "CREATE TABLE IF NOT EXISTS deviceTable(" \
1430 "id TEXT PRIMARY KEY," \
1431 "name TEXT," \
1432 "height REAL ," \
1433 "photo BLOB," \
1434 "asset ASSET," \
1435 "assets ASSETS," \
1436 "age INT);";
1437 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
1438 /**
1439 * @tc.steps:step2. create device table.
1440 * @tc.expected: step2. OK.
1441 */
1442 RelationalStoreDelegate *delegate1 = nullptr;
1443 std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
1444 RelationalStoreDelegate::Option option;
1445 ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1446 ASSERT_NE(delegate1, nullptr);
1447 std::string deviceTableName = "deviceTable";
1448 ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
1449 DataBaseSchema dataBaseSchema;
1450 TableSchema tableSchema;
1451 tableSchema.name = deviceTableName;
1452 tableSchema.sharedTableName = deviceTableName + "_shared";
1453 tableSchema.fields = {
1454 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
1455 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
1456 {"age", TYPE_INDEX<int64_t>}
1457 };
1458 dataBaseSchema.tables.push_back(tableSchema);
1459 ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1460 /**
1461 * @tc.steps:step3. UpsertData to device table.
1462 * @tc.expected: step3. NOT_FOUND.
1463 */
1464 EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
1465 EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
1466 delegate1 = nullptr;
1467 mgr1 = nullptr;
1468 }
1469
1470 /**
1471 * @tc.name: DownloadAssetStatusTest004
1472 * @tc.desc: Test upload asset status
1473 * @tc.type: FUNC
1474 * @tc.require:
1475 * @tc.author: zhangqiquan
1476 */
1477 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, DownloadAssetStatusTest004, TestSize.Level1)
1478 {
1479 /**
1480 * @tc.steps:step1. cloud assets {0, 1}
1481 * @tc.expected: step1. OK.
1482 */
1483 // cloud and local insert same data
1484 // cloud assets {0, 1} local assets {0, 1, 2}
1485 const int actualCount = 1;
1486 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_, 2);
1487 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms for data conflict
1488 /**
1489 * @tc.steps:step2. local assets {0, 1, 2}, and change assert {0}
1490 * @tc.expected: step2. OK.
1491 */
1492 std::string name = g_localAsset.name + std::to_string(0);
1493 Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
1494 expectAssets[0].hash.append("change"); // modify first asset
1495 InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
1496 /**
1497 * @tc.steps:step3. sync
1498 * @tc.expected: step3. upload status is {UPDATE, NORMAL, INSERT}
1499 */
1500 std::vector<AssetStatus> expectStatus = {
1501 AssetStatus::UPDATE, AssetStatus::NORMAL, AssetStatus::INSERT
1502 };
1503 // sync and local asset's status are normal
1504 Query query = Query::Select().FromTable({ tableName_ });
1505 RelationalTestUtils::CloudBlockSync(query, delegate_);
1506 auto dbSchema = GetSchema();
1507 ASSERT_GT(dbSchema.tables.size(), 0u);
1508 // cloud asset status is update normal insert
1509 VBucket extend;
1510 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
1511 std::vector<VBucket> data;
1512 ASSERT_EQ(virtualCloudDb_->Query(tableName_, extend, data), QUERY_END);
1513 ASSERT_EQ(data.size(), static_cast<size_t>(actualCount));
1514 Assets actualAssets;
1515 ASSERT_EQ(CloudStorageUtils::GetValueFromType(data[0]["assets"], actualAssets), E_OK);
1516 ASSERT_EQ(actualAssets.size(), expectStatus.size());
1517 for (size_t i = 0; i < actualAssets.size(); ++i) {
1518 EXPECT_EQ(actualAssets[i].status, expectStatus[i]);
1519 }
1520 /**
1521 * @tc.steps:step4. check local assets status.
1522 * @tc.expected: step4. all assets status is NORMAL.
1523 */
1524 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
1525 for (const auto &oneRow : assets) {
1526 for (const auto &asset : oneRow) {
1527 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
1528 }
1529 }
1530 }
1531
1532 /**
1533 * @tc.name: UploadAssetsTest001
1534 * @tc.desc: Test upload asset with error.
1535 * @tc.type: FUNC
1536 * @tc.require:
1537 * @tc.author: liaoyonghuang
1538 */
1539 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1540 {
1541 /**
1542 * @tc.steps:step1. Insert 10 records.
1543 * @tc.expected: step1. ok.
1544 */
1545 const int actualCount = 10;
1546 InsertUserTableRecord(tableName_, 0, actualCount);
1547 /**
1548 * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1549 * @tc.expected: step2. ok.
1550 */
1551 int recordIndex = 0;
1552 Asset tempAsset = {
1553 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1554 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1555 };
__anonaa4bfccd1102(const std::string &tableName, VBucket &extend) 1556 virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1557 Asset asset;
1558 Assets assets;
1559 switch (recordIndex) {
1560 case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1561 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1562 break;
1563 case 1: // record[1] is considered successful because it is a conflict.
1564 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1565 static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1566 break;
1567 case 2: // record[2] fail because of empty gid.
1568 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1569 break;
1570 case 3: // record[3] fail because of empty assetId.
1571 asset = tempAsset;
1572 asset.assetId = "";
1573 extend[std::string(CloudDbConstant::ASSET)] = asset;
1574 break;
1575 case 4: // record[4] fail because of empty assetId.
1576 assets.push_back(tempAsset);
1577 assets[0].assetId = "";
1578 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1579 break;
1580 case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1581 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1582 break;
1583 default:
1584 break;
1585 }
1586 recordIndex++;
1587 });
1588 /**
1589 * @tc.steps:step3. Sync and check upLoadInfo.
1590 * @tc.expected: step3. failCount is 5 and successCount is 5.
1591 */
1592 Query query = Query::Select().FromTable({ tableName_ });
1593 BlockSync(query, delegate_);
1594 for (const auto &table : lastProcess_.tableProcess) {
1595 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1596 EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1597 EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1598 }
1599 virtualCloudDb_->ForkUpload(nullptr);
1600 }
1601
1602 /**
1603 * @tc.name: UploadAssetsTest002
1604 * @tc.desc: Test upload asset with error.
1605 * @tc.type: FUNC
1606 * @tc.require:
1607 * @tc.author: liaoyonghuang
1608 */
1609 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1610 {
1611 /**
1612 * @tc.steps:step1. Insert 10 records.
1613 * @tc.expected: step1. ok.
1614 */
1615 const int actualCount = 10;
1616 InsertUserTableRecord(tableName_, 0, actualCount);
1617 Query query = Query::Select().FromTable({ tableName_ });
1618 BlockSync(query, delegate_);
1619 /**
1620 * @tc.steps:step2. Delete local data.
1621 * @tc.expected: step2. OK.
1622 */
1623 std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1624 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1625 /**
1626 * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1627 * @tc.expected: step3. ok.
1628 */
__anonaa4bfccd1202(const std::string &tableName, VBucket &extend) 1629 virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1630 extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1631 });
1632 BlockSync(query, delegate_);
1633 for (const auto &table : lastProcess_.tableProcess) {
1634 EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1635 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1636 EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1637 }
1638 virtualCloudDb_->ForkUpload(nullptr);
1639 }
1640
1641 /**
1642 * @tc.name: UploadAssetsTest003
1643 * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1644 * @tc.type: FUNC
1645 * @tc.require:
1646 * @tc.author: liaoyonghuang
1647 */
1648 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level1)
1649 {
1650 /**
1651 * @tc.steps:step1. Insert 100 records.
1652 * @tc.expected: step1. ok.
1653 */
1654 const int actualCount = 100;
1655 InsertUserTableRecord(tableName_, 0, actualCount);
1656 /**
1657 * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1658 * @tc.expected: step2. ok.
1659 */
1660 int uploadCount = 0;
__anonaa4bfccd1302(const std::string &tableName, VBucket &extend) 1661 virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1662 if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1663 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1664 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1665 }
1666 uploadCount++;
1667 });
1668 Query query = Query::Select().FromTable({ tableName_ });
1669 BlockSync(query, delegate_);
1670 for (const auto &table : lastProcess_.tableProcess) {
1671 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1672 EXPECT_EQ(table.second.upLoadInfo.total, 100u);
1673 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1674 EXPECT_EQ(table.second.upLoadInfo.successCount, 100u);
1675 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1676 }
1677 virtualCloudDb_->ForkUpload(nullptr);
1678 }
1679
1680 /**
1681 * @tc.name: UploadAssetsTest004
1682 * @tc.desc: Test batch delete return error CLOUD_RECORD_NOT_FOUND.
1683 * @tc.type: FUNC
1684 * @tc.require:
1685 * @tc.author: liaoyonghuang
1686 */
1687 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest004, TestSize.Level1)
1688 {
1689 /**
1690 * @tc.steps:step1. Insert 100 records and sync to cloud.
1691 * @tc.expected: step1. ok.
1692 */
1693 const int actualCount = 100;
1694 InsertUserTableRecord(tableName_, 0, actualCount);
1695 Query query = Query::Select().FromTable({ tableName_ });
1696 BlockSync(query, delegate_);
1697 /**
1698 * @tc.steps:step2. delete 50 records in local.
1699 * @tc.expected: step2. ok.
1700 */
1701 std::string sql = "delete from " + tableName_ + " where CAST(id AS INTEGER) >= " + std::to_string(actualCount / 2);
1702 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1703 /**
1704 * @tc.steps:step3. set return error CLOUD_RECORD_NOT_FOUND in batch delete.
1705 * @tc.expected: step3. ok.
1706 */
1707 int index = 0;
__anonaa4bfccd1402(const std::string &tableName, VBucket &extend) 1708 virtualCloudDb_->ForkUpload([&index](const std::string &tableName, VBucket &extend) {
1709 if (extend.count(CloudDbConstant::DELETE_FIELD) != 0 && index % 2 == 0 &&
1710 std::get<bool>(extend.at(CloudDbConstant::DELETE_FIELD))) {
1711 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1712 }
1713 index++;
1714 });
1715 /**
1716 * @tc.steps:step4. sync and check result.
1717 * @tc.expected: step4. ok.
1718 */
1719 BlockSync(query, delegate_);
1720 for (const auto &table : lastProcess_.tableProcess) {
1721 EXPECT_EQ(table.second.upLoadInfo.total, 50u);
1722 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1723 EXPECT_EQ(table.second.upLoadInfo.successCount, 50u);
1724 }
1725 virtualCloudDb_->ForkUpload(nullptr);
1726 }
1727
1728 /**
1729 * @tc.name: BatchNormalDownloadAsset001
1730 * @tc.desc: Test batch download asset in two batch.
1731 * @tc.type: FUNC
1732 * @tc.require:
1733 * @tc.author: zqq
1734 */
1735 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchNormalDownloadAsset001, TestSize.Level1)
1736 {
1737 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1738 PrepareDataInCloud();
1739
1740 Query query = Query::Select().FromTable({ tableName_ });
1741 BlockSync(query, delegate_);
1742 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1743 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1744 virtualAssetLoader_->Reset();
1745 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1746 }
1747
1748 /**
1749 * @tc.name: BatchAbnormalDownloadAsset001
1750 * @tc.desc: Test batch download asset failed.
1751 * @tc.type: FUNC
1752 * @tc.require:
1753 * @tc.author: zqq
1754 */
1755 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, BatchAbnormalDownloadAsset001, TestSize.Level1)
1756 {
1757 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1758 PrepareDataInCloud();
__anonaa4bfccd1502(int rowIndex, std::map<std::string, Assets> &assets) 1759 virtualAssetLoader_->ForkBatchDownload([](int rowIndex, std::map<std::string, Assets> &assets) {
1760 if (rowIndex > 50) { // 50 record failed
1761 for (auto &asset : assets) {
1762 for (auto &item : asset.second) {
1763 item.status = AssetStatus::ABNORMAL;
1764 }
1765 }
1766 return DB_ERROR;
1767 }
1768 return OK;
1769 });
1770
1771 Query query = Query::Select().FromTable({ tableName_ });
1772 BlockSync(query, delegate_);
1773 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 1u); // download 1 times
1774 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1775 virtualAssetLoader_->Reset();
1776
1777 virtualAssetLoader_->ForkBatchDownload(nullptr);
1778 BlockSync(query, delegate_);
1779 EXPECT_EQ(virtualAssetLoader_->GetBatchDownloadCount(), 2u); // download 2 times
1780 EXPECT_EQ(virtualAssetLoader_->GetBatchRemoveCount(), 0u); // remove 0 times
1781 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1782 }
1783
1784 /**
1785 * @tc.name: SyncWithLogFlag001
1786 * @tc.desc: Test cloud update to local, flag will change.
1787 * @tc.type: FUNC
1788 * @tc.require:
1789 * @tc.author: lg
1790 */
1791 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithLogFlag001, TestSize.Level1)
1792 {
1793 /**
1794 * @tc.steps:step1. Insert 5 records and sync.
1795 * @tc.expected: step1. ok.
1796 */
1797 const int actualCount = 5;
1798 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
1799 InsertUserTableRecord(tableName_, 0, actualCount);
1800 /**
1801 * @tc.steps:step2. modify data and sync, check flag count.
1802 * @tc.expected: step2. ok.
1803 */
1804 UpdateCloudTableRecord(0, 3, true);
1805 Query query = Query::Select().FromTable({ tableName_ });
1806 BlockSync(query, delegate_);
1807 string checkSql = "select count(*) from naturalbase_rdb_aux_" + tableName_ + "_log where flag&0x4000=0x4000;";
1808 EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1809 reinterpret_cast<void *>(3u), nullptr), SQLITE_OK);
1810 /**
1811 * @tc.steps:step3. delete local data and sync, check flag count.
1812 * @tc.expected: step3. ok.
1813 */
1814 std::string sql = "delete from " + tableName_ + " where id = 0;" ;
1815 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1816 BlockSync(query, delegate_);
1817 EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1818 reinterpret_cast<void *>(2u), nullptr), SQLITE_OK);
1819
1820 /**
1821 * @tc.steps:step4. delete cloud data and sync, check flag count.
1822 * @tc.expected: step4. ok.
1823 */
1824 DeleteCloudDBData(1, 1, tableName_);
1825 BlockSync(query, delegate_);
1826 EXPECT_EQ(sqlite3_exec(db_, checkSql.c_str(), QueryCountCallback,
1827 reinterpret_cast<void *>(1u), nullptr), SQLITE_OK);
1828 }
1829 }
1830 #endif
1831