1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64 {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67 {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69 {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70 {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73 COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76 " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79 COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81 .name = "Phone",
82 .assetId = "0",
83 .subpath = "/local/sync",
84 .uri = "/local/sync",
85 .modifyTime = "123456",
86 .createTime = "",
87 .size = "256",
88 .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90 .name = "Phone_copy_2",
91 .assetId = "0",
92 .subpath = "/local/sync",
93 .uri = "/local/sync",
94 .modifyTime = "123456",
95 .createTime = "",
96 .size = "256",
97 .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128 .fields = CLOUD_FIELDS};
129 dataBaseSchema.tables.push_back(assetsTableSchema);
130 assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131 .fields = NO_PRIMARY_FIELDS};
132 dataBaseSchema.tables.push_back(assetsTableSchema);
133 assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134 .fields = COMPOUND_PRIMARY_FIELDS};
135 dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)138 void GenerateDataRecords(
139 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
140 {
141 for (int64_t i = begin; i < begin + count; i++) {
142 Assets assets;
143 Asset asset = ASSET_COPY;
144 asset.name = ASSET_COPY.name + std::to_string(i);
145 assets.emplace_back(asset);
146 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
147 assets.emplace_back(asset);
148 VBucket data;
149 data.insert_or_assign(COL_ID, i);
150 data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
151 data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
152 data.insert_or_assign(COL_ASSETS, assets);
153 data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
154 record.push_back(data);
155
156 VBucket log;
157 Timestamp now = TimeHelper::GetSysCurrentTime();
158 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
159 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
161 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
162 extend.push_back(log);
163 }
164 }
165
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)166 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
167 {
168 int errCode;
169 std::vector<VBucket> record;
170 std::vector<VBucket> extend;
171 GenerateDataRecords(begin, count, 0, record, extend);
172 const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
173 for (VBucket vBucket : record) {
174 sqlite3_stmt *stmt = nullptr;
175 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
176 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
177 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
178 ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
179 sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
180 if (isAssetNull) {
181 ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
182 } else {
183 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
184 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
185 }
186 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
187 std::get<Assets>(vBucket[COL_ASSETS]));
188 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
189 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
190 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
191 SQLiteUtils::ResetStatement(stmt, true, errCode);
192 }
193 }
194
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)195 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
196 {
197 int errCode;
198 std::vector<uint8_t> assetBlob;
199 const string sql = "update " + tableName + " set assets=?;";
200 sqlite3_stmt *stmt = nullptr;
201 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
202 if (isEmptyAssets) {
203 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
204 } else {
205 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
206 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
207 }
208 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
209 SQLiteUtils::ResetStatement(stmt, true, errCode);
210 }
211
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)212 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
213 {
214 int errCode;
215 std::vector<uint8_t> assetBlob;
216 const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
217 " and id<=" + std::to_string(end) + ";";
218 sqlite3_stmt *stmt = nullptr;
219 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
220 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
221 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
222 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
223 SQLiteUtils::ResetStatement(stmt, true, errCode);
224 }
225
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)226 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
227 {
228 ASSERT_NE(db, nullptr);
229 for (int64_t i = begin; i < begin + count; i++) {
230 string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
231 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
232 }
233 }
234
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)235 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
236 {
237 for (int64_t i = begin; i < begin + count; i++) {
238 VBucket idMap;
239 idMap.insert_or_assign("#_gid", std::to_string(i));
240 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
241 }
242 }
243
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)244 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
245 const std::string &tableName)
246 {
247 std::this_thread::sleep_for(std::chrono::milliseconds(1));
248 std::vector<VBucket> record;
249 std::vector<VBucket> extend;
250 GenerateDataRecords(begin, count, gidStart, record, extend);
251 for (auto &entry: extend) {
252 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
253 }
254 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
255 std::this_thread::sleep_for(std::chrono::milliseconds(1));
256 }
257
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)258 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
259 {
260 auto status = static_cast<std::vector<int64_t> *>(data);
261 const int decimal = 10;
262 for (int i = 0; i < count; i++) {
263 status->push_back(strtol(colValue[0], nullptr, decimal));
264 }
265 return 0;
266 }
267
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)268 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
269 {
270 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
271 std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
272 " and data_key <=" + std::to_string(endId) + ";";
273 std::vector<int64_t> status;
274 char *str = NULL;
275 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
276 SQLITE_OK);
277 ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
278
279 for (auto stat : status) {
280 ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
281 }
282 }
283
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)284 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
285 {
286 std::vector<VBucket> record;
287 std::vector<VBucket> extend;
288 GenerateDataRecords(begin, count, gidStart, record, extend);
289 if (tableName == ASSETS_TABLE_NAME_SHARED) {
290 for (auto &vBucket: record) {
291 vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
292 }
293 }
294 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
295 }
296
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)297 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
298 {
299 std::unique_lock<std::mutex> lock(g_processMutex);
300 bool result = g_processCondition.wait_for(
301 lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
302 ASSERT_EQ(result, true);
303 LOGD("-------------------sync end--------------");
304 }
305
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK,bool isMerge=false)306 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK,
307 bool isMerge = false)
308 {
309 g_syncProcess = {};
310 Query query = Query::Select().FromTable(tableNames);
311 std::vector<SyncProcess> expectProcess;
312 CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
313 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
314 std::unique_lock<std::mutex> lock(g_processMutex);
315 g_syncProcess = process.begin()->second;
316 if (g_syncProcess.process == FINISHED) {
317 g_processCondition.notify_one();
318 ASSERT_EQ(g_syncProcess.errCode, errCode);
319 }
320 };
321 CloudSyncOption option;
322 option.devices = {DEVICE_CLOUD};
323 option.mode = mode;
324 option.query = query;
325 option.waitTime = SYNC_WAIT_TIME;
326 option.lockAction = static_cast<LockAction>(0xff); // lock all
327 option.merge = isMerge;
328 ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
329
330 if (dbStatus == DBStatus::OK) {
331 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
332 }
333 }
334
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)335 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
336 {
337 for (auto &item : assets) {
338 for (auto &asset : item.second) {
339 EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
340 if (index < 4) { // 1-4 is inserted
341 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
342 }
343 LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
344 index);
345 }
346 }
347 }
348
CheckDownloadFailedForTest002(sqlite3 * & db)349 void CheckDownloadFailedForTest002(sqlite3 *&db)
350 {
351 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
352 sqlite3_stmt *stmt = nullptr;
353 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
354 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
355 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
356 Type cloudValue;
357 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
358 std::vector<uint8_t> assetsBlob;
359 Assets assets;
360 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
361 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
362 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
363 for (size_t i = 0; i < assets.size(); ++i) {
364 EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
365 }
366 }
367 int errCode;
368 SQLiteUtils::ResetStatement(stmt, true, errCode);
369 }
370
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)371 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
372 {
373 Assets assets;
374 Asset asset = ASSET_COPY;
375 asset.name = ASSET_COPY.name + std::to_string(id);
376 asset.status = status;
377 assets.emplace_back(asset);
378 asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
379 assets.emplace_back(asset);
380 int errCode;
381 std::vector<uint8_t> assetBlob;
382 const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
383 sqlite3_stmt *stmt = nullptr;
384 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
385 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
386 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
387 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
388 SQLiteUtils::ResetStatement(stmt, true, errCode);
389 }
390
CheckConsistentCount(sqlite3 * db,int64_t expectCount)391 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
392 {
393 EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
394 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
395 }
396
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)397 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
398 {
399 EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
400 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
401 }
402
CloseDb()403 void CloseDb()
404 {
405 if (g_delegate != nullptr) {
406 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
407 g_delegate = nullptr;
408 }
409 delete g_observer;
410 g_virtualCloudDb = nullptr;
411 }
412
413 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
414 public:
415 static void SetUpTestCase(void);
416 static void TearDownTestCase(void);
417 void SetUp();
418 void TearDown();
419
420 protected:
421 void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
422 const std::set<int> &failIndex);
423 void CheckLocalAssetIsEmpty(const std::string &tableName);
424 void CheckCursorData(const std::string &tableName, int begin);
425 void WaitForSync(int &syncCount);
426 const RelationalSyncAbleStorage *GetRelationalStore();
427 void InitDataStatusTest(bool needDownload);
428 void DataStatusTest001(bool needDownload);
429 void DataStatusTest003();
430 void DataStatusTest004();
431 void DataStatusTest005();
432 void DataStatusTest006();
433 void DataStatusTest007();
434 sqlite3 *db = nullptr;
435 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
436 };
437
SetUpTestCase(void)438 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
439 {
440 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
441 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
442 LOGI("The test db is:%s", g_storePath.c_str());
443 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
444 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
445 }
446
TearDownTestCase(void)447 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
448
SetUp(void)449 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
450 {
451 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
452 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
453 LOGE("rm test db files error.");
454 }
455 DistributedDBToolsUnitTest::PrintTestCaseInfo();
456 LOGD("Test dir is %s", g_testDir.c_str());
457 db = RelationalTestUtils::CreateDataBase(g_storePath);
458 ASSERT_NE(db, nullptr);
459 InitDatabase(db);
460 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
461 ASSERT_NE(g_observer, nullptr);
462 ASSERT_EQ(
463 g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
464 DBStatus::OK);
465 ASSERT_NE(g_delegate, nullptr);
466 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
467 ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
468 ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
469 g_virtualCloudDb = make_shared<VirtualCloudDb>();
470 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
471 g_syncProcess = {};
472 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
473 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
474 DataBaseSchema dataBaseSchema;
475 GetCloudDbSchema(dataBaseSchema);
476 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
477 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
478 ASSERT_NE(g_cloudStoreHook, nullptr);
479 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
480 ASSERT_TRUE(communicatorAggregator_ != nullptr);
481 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
482 }
483
TearDown(void)484 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
485 {
486 RefObject::DecObjRef(g_store);
487 g_virtualCloudDb->ForkUpload(nullptr);
488 CloseDb();
489 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
490 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
491 LOGE("rm test db files error.");
492 }
493 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
494 communicatorAggregator_ = nullptr;
495 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
496 }
497
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)498 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
499 const std::string &expectAssetId, const std::set<int> &failIndex)
500 {
501 std::string sql = "SELECT assets FROM " + tableName + ";";
502 sqlite3_stmt *stmt = nullptr;
503 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
504 int index = 0;
505 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
506 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
507 Type cloudValue;
508 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
509 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
510 for (const auto &asset : assets) {
511 index++;
512 if (failIndex.find(index) != failIndex.end()) {
513 EXPECT_EQ(asset.assetId, "0");
514 } else {
515 EXPECT_EQ(asset.assetId, expectAssetId);
516 }
517 }
518 }
519 int errCode = E_OK;
520 SQLiteUtils::ResetStatement(stmt, true, errCode);
521 }
522
CheckLocalAssetIsEmpty(const std::string & tableName)523 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
524 {
525 std::string sql = "SELECT asset FROM " + tableName + ";";
526 sqlite3_stmt *stmt = nullptr;
527 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
528 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
529 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
530 }
531 int errCode = E_OK;
532 SQLiteUtils::ResetStatement(stmt, true, errCode);
533 }
534
CheckCursorData(const std::string & tableName,int begin)535 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
536 {
537 std::string logTableName = DBCommon::GetLogTableName(tableName);
538 std::string sql = "SELECT cursor FROM " + logTableName + ";";
539 sqlite3_stmt *stmt = nullptr;
540 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
541 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
542 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
543 Type cloudValue;
544 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
545 EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
546 begin++;
547 }
548 int errCode = E_OK;
549 SQLiteUtils::ResetStatement(stmt, true, errCode);
550 }
551
WaitForSync(int & syncCount)552 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
553 {
554 std::unique_lock<std::mutex> lock(g_processMutex);
555 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
556 [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
557 ASSERT_EQ(result, true);
558 }
559
GetRelationalStore()560 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
561 {
562 RelationalDBProperties properties;
563 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
564 int errCode = E_OK;
565 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
566 if (g_store == nullptr) {
567 return nullptr;
568 }
569 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
570 }
571
InitDataStatusTest(bool needDownload)572 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
573 {
574 int cloudCount = 20;
575 int localCount = 10;
576 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
577 if (needDownload) {
578 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
579 }
580 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
581 std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
582 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
583 sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
584 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
585 sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
586 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
587 std::this_thread::sleep_for(std::chrono::milliseconds(1));
588 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
589 std::this_thread::sleep_for(std::chrono::milliseconds(1));
590 sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
591 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
592 sql = "update " + logName + " SET status = 1 where data_key in (4);";
593 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
594 }
595
DataStatusTest001(bool needDownload)596 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
597 {
598 int cloudCount = 20;
599 int count = 0;
600 g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
601 count++;
602 if (count == 1) {
603 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
604 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
605 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
606 }
607 if (count == 2) { // 2 is compensated sync
608 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
609 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
610 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
611 g_processCondition.notify_one();
612 }
613 });
614 InitDataStatusTest(needDownload);
615 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
616 WaitForSync(count);
617 }
618
DataStatusTest003()619 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
620 {
621 int count = 0;
622 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
623 count++;
624 if (count == 1) {
625 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
626 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
627 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
628 }
629 if (count == 2) { // 2 is compensated sync
630 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
631 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
632 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
633 g_processCondition.notify_one();
634 }
635 });
636 int downLoadCount = 0;
637 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
638 std::map<std::string, Assets> &assets) {
639 downLoadCount++;
640 if (downLoadCount == 1) {
641 std::vector<std::vector<uint8_t>> hashKey;
642 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
643 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
644 }
645 });
646 InitDataStatusTest(true);
647 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
648 WaitForSync(count);
649 g_virtualAssetLoader->ForkDownload(nullptr);
650 }
651
DataStatusTest004()652 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
653 {
654 int count = 0;
655 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
656 count++;
657 if (count == 1) {
658 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
659 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
660 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
661 }
662 if (count == 2) { // 2 is compensated sync
663 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
664 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
665 CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
666 g_processCondition.notify_one();
667 }
668 });
669 int downLoadCount = 0;
670 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
671 std::map<std::string, Assets> &assets) {
672 downLoadCount++;
673 if (downLoadCount == 1) {
674 std::vector<std::vector<uint8_t>> hashKey;
675 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
676 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
677 std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
678 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
679 }
680 });
681 int queryIdx = 0;
682 g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
683 LOGD("query index:%d", ++queryIdx);
684 if (queryIdx == 4) { // 4 is compensated sync
685 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
686 " SET status = 1 where data_key=15;";
687 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
688 }
689 });
690 InitDataStatusTest(true);
691 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
692 WaitForSync(count);
693 g_virtualAssetLoader->ForkDownload(nullptr);
694 g_virtualCloudDb->ForkQuery(nullptr);
695 }
696
DataStatusTest005()697 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
698 {
699 int count = 0;
700 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
701 count++;
702 if (count == 1) {
703 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
704 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
705 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
706 }
707 if (count == 2) { // 2 is compensated sync
708 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
709 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
710 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
711 g_processCondition.notify_one();
712 }
713 });
714 int downLoadCount = 0;
715 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
716 std::map<std::string, Assets> &assets) {
717 downLoadCount++;
718 if (downLoadCount == 1) {
719 std::vector<std::vector<uint8_t>> hashKey;
720 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
721 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
722 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
723 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
724 }
725 });
726 InitDataStatusTest(true);
727 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
728 WaitForSync(count);
729 g_virtualAssetLoader->ForkDownload(nullptr);
730 }
731
DataStatusTest006()732 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
733 {
734 int count = 0;
735 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
736 count++;
737 if (count == 1) {
738 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
739 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
740 "(status = 0 and data_key in (11))";
741 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
742 }
743 if (count == 2) { // 2 is compensated sync
744 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
745 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
746 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
747 g_processCondition.notify_one();
748 }
749 });
750 int downLoadCount = 0;
751 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
752 std::map<std::string, Assets> &assets) {
753 downLoadCount++;
754 if (downLoadCount == 1) {
755 std::vector<std::vector<uint8_t>> hashKey;
756 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
757 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
758 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
759 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
760 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
761 }
762 });
763 InitDataStatusTest(true);
764 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
765 WaitForSync(count);
766 g_virtualAssetLoader->ForkDownload(nullptr);
767 }
768
DataStatusTest007()769 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
770 {
771 int count = 0;
772 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
773 count++;
774 if (count == 1) {
775 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
776 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
777 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
778 }
779 if (count == 2) { // 2 is compensated sync
780 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
781 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
782 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
783 g_processCondition.notify_one();
784 }
785 });
786 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
787 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
788 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
789 .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
790 std::map<std::string, Assets> &assets) {
791 return CLOUD_ERROR;
792 });
793 InitDataStatusTest(true);
794 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
795 WaitForSync(count);
796 }
797
798 /*
799 * @tc.name: DownloadAssetForDupDataTest001
800 * @tc.desc: Test the download interface call with duplicate data for the same primary key.
801 * @tc.type: FUNC
802 * @tc.require:
803 * @tc.author: liufuchenxing
804 */
805 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level0)
806 {
807 /**
808 * @tc.steps:step1. Mock asset download interface.
809 * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
810 */
811 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
812 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
813 int index = 1;
814 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
815 .Times(2)
816 .WillRepeatedly(
__anon3db76c681102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 817 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
818 LOGD("Download GID:%s", gid.c_str());
819 CheckDownloadForTest001(index, assets);
820 index++;
821 return DBStatus::OK;
822 });
823
824 /**
825 * @tc.steps:step2. Insert local data [0, 10), sync data
826 * @tc.expected: step2. sync success.
827 */
828 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
829 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
830
831 /**
832 * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
833 * @tc.expected: step3. sync success.
834 */
835 DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
836 InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
837 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
838 }
839
840 /**
841 * @tc.name: FillAssetId001
842 * @tc.desc: Test if assetId is filled in single primary key table
843 * @tc.type: FUNC
844 * @tc.require:
845 * @tc.author: chenchaohao
846 */
847 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level0)
848 {
849 /**
850 * @tc.steps:step1. local insert assets and sync, check the local assetId.
851 * @tc.expected: step1. return OK.
852 */
853 int localCount = 50;
854 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
855 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
856 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
857
858 /**
859 * @tc.steps:step2. local update assets and sync ,check the local assetId.
860 * @tc.expected: step2. sync success.
861 */
862 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
863 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
864 CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
865 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
866 }
867
868 /**
869 * @tc.name: FillAssetId002
870 * @tc.desc: Test if assetId is filled in no primary key table
871 * @tc.type: FUNC
872 * @tc.require:
873 * @tc.author: chenchaohao
874 */
875 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level0)
876 {
877 /**
878 * @tc.steps:step1. local insert assets and sync, check the local assetId.
879 * @tc.expected: step1. return OK.
880 */
881 int localCount = 50;
882 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
883 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
884 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
885
886 /**
887 * @tc.steps:step2. local update assets and sync ,check the local assetId.
888 * @tc.expected: step2. sync success.
889 */
890 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
891 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
892 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
893 }
894
895 /**
896 * @tc.name: FillAssetId003
897 * @tc.desc: Test if assetId is filled in compound primary key table
898 * @tc.type: FUNC
899 * @tc.require:
900 * @tc.author: chenchaohao
901 */
902 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level0)
903 {
904 /**
905 * @tc.steps:step1. local insert assets and sync, check the local assetId.
906 * @tc.expected: step1. return OK.
907 */
908 int localCount = 50;
909 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
910 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
911 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
912
913 /**
914 * @tc.steps:step2. local update assets and sync ,check the local assetId.
915 * @tc.expected: step2. sync success.
916 */
917 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
918 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
919 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
920 }
921
922 /**
923 * @tc.name: FillAssetId004
924 * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
925 * @tc.type: FUNC
926 * @tc.require:
927 * @tc.author: chenchaohao
928 */
929 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level0)
930 {
931 /**
932 * @tc.steps:step1. local insert assets and sync, check the local assetId.
933 * @tc.expected: step1. return OK.
934 */
935 int localCount = 50;
936 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
937 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
938 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
939
940 /**
941 * @tc.steps:step2. local update assets and sync ,check the local assetId.
942 * @tc.expected: step2. sync success.
943 */
944 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
945 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
946 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
947 }
948
949 /**
950 * @tc.name: FillAssetId001
951 * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
952 * @tc.type: FUNC
953 * @tc.require:
954 * @tc.author: chenchaohao
955 */
956 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level0)
957 {
958 /**
959 * @tc.steps:step1. local insert assets and sync, check the local assetId.
960 * @tc.expected: step1. return OK.
961 */
962 int localCount = 50;
963 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
964 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
965 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
966
967 /**
968 * @tc.steps:step2. local update assets and sync ,check the local assetId.
969 * @tc.expected: step2. sync success.
970 */
971 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
972 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
973 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
974 }
975
976 /**
977 * @tc.name: FillAssetId006
978 * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
979 * @tc.type: FUNC
980 * @tc.require:
981 * @tc.author: chenchaohao
982 */
983 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level0)
984 {
985 /**
986 * @tc.steps:step1. local insert assets and sync, check the local assetId.
987 * @tc.expected: step1. return OK.
988 */
989 int localCount = 50;
990 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
991 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
992 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
993
994 /**
995 * @tc.steps:step2. local update assets and sync ,check the local assetId.
996 * @tc.expected: step2. sync success.
997 */
998 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
999 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1000 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1001 }
1002
1003 /**
1004 * @tc.name: FillAssetId007
1005 * @tc.desc: Test if assetId is filled when extend lack of assets
1006 * @tc.type: FUNC
1007 * @tc.require:
1008 * @tc.author: chenchaohao
1009 */
1010 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level0)
1011 {
1012 CloudSyncConfig config;
1013 config.maxUploadCount = 200; // max upload 200
1014 g_delegate->SetCloudSyncConfig(config);
1015 /**
1016 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1017 * @tc.expected: step1. return OK.
1018 */
1019 int localCount = 50;
1020 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681202(const std::string &tableName, VBucket &extend) 1021 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1022 extend.erase("assets");
1023 });
1024 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1025 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1026
1027 /**
1028 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1029 * @tc.expected: step2. sync success.
1030 */
1031 int addLocalCount = 10;
1032 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681302(const std::string &tableName, VBucket &extend) 1033 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1034 if (extend.find("assets") != extend.end()) {
1035 for (auto &asset : std::get<Assets>(extend["assets"])) {
1036 asset.name = "pad";
1037 }
1038 }
1039 });
1040 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1041 int beginFailFillNum = 101;
1042 int endFailFillNum = 120;
1043 std::set<int> index;
1044 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1045 index.insert(i);
1046 }
1047 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1048
1049 /**
1050 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1051 * @tc.expected: step2. sync success.
1052 */
1053 g_virtualCloudDb->ForkUpload(nullptr);
1054 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1055 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1056 }
1057
1058 /**
1059 * @tc.name: FillAssetId008
1060 * @tc.desc: Test if assetId is filled when extend lack of assetId
1061 * @tc.type: FUNC
1062 * @tc.require:
1063 * @tc.author: chenchaohao
1064 */
1065 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level0)
1066 {
1067 /**
1068 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1069 * @tc.expected: step1. return OK.
1070 */
1071 int localCount = 50;
1072 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681402(const std::string &tableName, VBucket &extend) 1073 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1074 if (extend.find("assets") != extend.end()) {
1075 for (auto &asset : std::get<Assets>(extend["assets"])) {
1076 asset.assetId = "";
1077 }
1078 }
1079 });
1080 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1081 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1082
1083 /**
1084 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1085 * @tc.expected: step2. sync success.
1086 */
1087 g_virtualCloudDb->ForkUpload(nullptr);
1088 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1089 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1090 }
1091
1092 /**
1093 * @tc.name: FillAssetId009
1094 * @tc.desc: Test if assetId is filled when extend exists useless assets
1095 * @tc.type: FUNC
1096 * @tc.require:
1097 * @tc.author: chenchaohao
1098 */
1099 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level0)
1100 {
1101 /**
1102 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1103 * @tc.expected: step1. return OK.
1104 */
1105 int localCount = 50;
1106 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681502(const std::string &tableName, VBucket &extend) 1107 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1108 if (extend.find("assets") != extend.end()) {
1109 Asset asset = ASSET_COPY2;
1110 Assets &assets = std::get<Assets>(extend["assets"]);
1111 assets.push_back(asset);
1112 }
1113 });
1114 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1115 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1116 }
1117
1118 /**
1119 * @tc.name: FillAssetId010
1120 * @tc.desc: Test if assetId is filled when some success and some fail
1121 * @tc.type: FUNC
1122 * @tc.require:
1123 * @tc.author: chenchaohao
1124 */
1125 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level0)
1126 {
1127 /**
1128 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1129 * @tc.expected: step1. return OK.
1130 */
1131 int localCount = 30;
1132 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1133 g_virtualCloudDb->SetInsertFailed(1);
1134 std::atomic<int> count = 0;
__anon3db76c681602(const std::string &tableName, VBucket &extend) 1135 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1136 if (extend.find("assets") != extend.end() && count == 0) {
1137 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1138 count++;
1139 }
1140 });
1141 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1142 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1143 }
1144
1145 /**
1146 * @tc.name: FillAssetId011
1147 * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1148 * @tc.type: FUNC
1149 * @tc.require:
1150 * @tc.author: chenchaohao
1151 */
1152 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level0)
1153 {
1154 /**
1155 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1156 * @tc.expected: step1. return OK.
1157 */
1158 int localCount = 50;
1159 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1160 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1161 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1162
1163 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1164 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1165 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1166 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1167 }
1168
1169 /**
1170 * @tc.name: FillAssetId012
1171 * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1172 * @tc.type: FUNC
1173 * @tc.require:
1174 * @tc.author: chenchaohao
1175 */
1176 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level0)
1177 {
1178 /**
1179 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1180 * @tc.expected: step1. return OK.
1181 */
1182 int localCount = 50;
1183 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1184 std::atomic<int> count = 1;
1185 g_virtualCloudDb->SetClearExtend(count);
1186 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1187 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1188
1189 /**
1190 * @tc.steps:step2. set extend size normal then sync, check the asseid.
1191 * @tc.expected: step2. return OK.
1192 */
1193 g_virtualCloudDb->SetClearExtend(0);
1194 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1195 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1196
1197 /**
1198 * @tc.steps:step3. set extend size large then sync, check the asseid.
1199 * @tc.expected: step3. return OK.
1200 */
1201 count = -1; // -1 means extend push a empty vBucket
1202 g_virtualCloudDb->SetClearExtend(count);
1203 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1204 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1205 }
1206
1207 /**
1208 * @tc.name: FillAssetId013
1209 * @tc.desc: Test fill assetId and removedevicedata when data is delete
1210 * @tc.type: FUNC
1211 * @tc.require:
1212 * @tc.author: chenchaohao
1213 */
1214 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level0)
1215 {
1216 /**
1217 * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1218 * @tc.expected: step1. return OK.
1219 */
1220 int localCount = 20;
1221 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1222 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1223 int deleteLocalCount = 10;
1224 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1225 int addLocalCount = 30;
1226 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1227 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1228
1229 /**
1230 * @tc.steps:step2. RemoveDeviceData.
1231 * @tc.expected: step2. return OK.
1232 */
1233 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1234 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1235 }
1236
1237 /**
1238 * @tc.name: FillAssetId014
1239 * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1240 * @tc.type: FUNC
1241 * @tc.require:
1242 * @tc.author: bty
1243 */
1244 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level0)
1245 {
1246 /**
1247 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1248 * @tc.expected: step1. return OK.
1249 */
1250 int localCount = 50;
1251 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1252 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1253 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1254
1255 /**
1256 * @tc.steps:step2. RemoveDeviceData
1257 * @tc.expected: step2. return OK.
1258 */
1259 Assets assets;
1260 std::vector<AssetStatus> statusVec = {
1261 AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1262 AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1263 };
1264 for (auto &status : statusVec) {
1265 Asset temp = ASSET_COPY;
1266 temp.name += std::to_string(status);
1267 temp.status = status | AssetStatus::UPLOADING;
1268 assets.emplace_back(temp);
1269 }
1270 UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1271 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1272 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1273
1274 /**
1275 * @tc.steps:step3. check status
1276 * @tc.expected: step3. return OK.
1277 */
1278 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1279 sqlite3_stmt *stmt = nullptr;
1280 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1281 int index = 0;
1282 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1283 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1284 Type cloudValue;
1285 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1286 Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1287 for (const auto &ast : newAssets) {
1288 EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1289 }
1290 }
1291 int errCode = E_OK;
1292 SQLiteUtils::ResetStatement(stmt, true, errCode);
1293 }
1294
1295 /**
1296 * @tc.name: FillAssetId015
1297 * @tc.desc: Test if fill assetId when upload return cloud network error
1298 * @tc.type: FUNC
1299 * @tc.require:
1300 * @tc.author: chenchaohao
1301 */
1302 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level0)
1303 {
1304 /**
1305 * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1306 * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1307 */
1308 int localCount = 20;
1309 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1310 g_virtualCloudDb->SetCloudNetworkError(true);
1311 std::atomic<int> count = 0;
__anon3db76c681702(const std::string &tableName, VBucket &extend) 1312 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1313 if (extend.find("assets") != extend.end() && count == 0) {
1314 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1315 count++;
1316 }
1317 });
1318 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1319 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1320 g_virtualCloudDb->SetCloudNetworkError(false);
1321 g_virtualCloudDb->ForkUpload(nullptr);
1322 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1323 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1324
1325 /**
1326 * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1327 * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1328 */
1329 int addLocalCount = 10;
1330 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1331 std::atomic<int> num = 0;
__anon3db76c681802(const std::string &tableName, VBucket &extend) 1332 g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1333 if (extend.find("assets") != extend.end() && num == 0) {
1334 for (auto &asset : std::get<Assets>(extend["assets"])) {
1335 asset.name = "pad";
1336 break;
1337 }
1338 num++;
1339 }
1340 });
1341 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1342 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1343 }
1344
1345 /**
1346 * @tc.name: FillAssetId016
1347 * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1348 * @tc.type: FUNC
1349 * @tc.require:
1350 * @tc.author: chenchaohao
1351 */
1352 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level0)
1353 {
1354 /**
1355 * @tc.steps:step1. local insert data and sync, then delete last local data
1356 * @tc.expected: step1. return OK.
1357 */
1358 int localCount = 20;
1359 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1360 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1361 int deletLocalCount = 10;
1362 DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1363 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1364
1365 /**
1366 * @tc.steps:step2. RemoveDeviceData.
1367 * @tc.expected: step2. return OK.
1368 */
1369 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1370 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1371 }
1372
1373 /**
1374 * @tc.name: FillAssetId017
1375 * @tc.desc: Test cursor when download not change
1376 * @tc.type: FUNC
1377 * @tc.require:
1378 * @tc.author: chenchaohao
1379 */
1380 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level0)
1381 {
1382 /**
1383 * @tc.steps:step1. local insert data and sync,check cursor.
1384 * @tc.expected: step1. return OK.
1385 */
1386 int localCount = 20;
1387 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1388 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1389 CheckCursorData(ASSETS_TABLE_NAME, 1);
1390
1391 /**
1392 * @tc.steps:step2. sync again and optype is not change, check cursor.
1393 * @tc.expected: step2. return OK.
1394 */
1395 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1396 CheckCursorData(ASSETS_TABLE_NAME, 1);
1397 }
1398
1399 /**
1400 * @tc.name: FillAssetId018
1401 * @tc.desc: Test if assetId is filled when contains "#_error"
1402 * @tc.type: FUNC
1403 * @tc.require:
1404 * @tc.author: zhaoliang
1405 */
1406 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level0)
1407 {
1408 /**
1409 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1410 * @tc.expected: step1. return OK.
1411 */
1412 int localCount = 30;
1413 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1414 std::atomic<int> count = 0;
__anon3db76c681902(const std::string &tableName, VBucket &extend) 1415 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1416 if (extend.find("assets") != extend.end() && count == 0) {
1417 extend["#_error"] = std::string("test");
1418 count++;
1419 }
1420 });
1421 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1422 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1423 }
1424
1425 /**
1426 * @tc.name: DownloadAssetForDupDataTest002
1427 * @tc.desc: Test download failed
1428 * @tc.type: FUNC
1429 * @tc.require:
1430 * @tc.author: bty
1431 */
1432 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level0)
1433 {
1434 /**
1435 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1436 * @tc.expected: step1. return OK
1437 */
1438 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1439 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1440 int index = 0;
1441 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1442 .WillRepeatedly(
__anon3db76c681a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1443 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1444 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1445 return DBStatus::CLOUD_ERROR;
1446 });
1447
1448 /**
1449 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1450 * @tc.expected: step2. sync success.
1451 */
1452 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1453 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1454
1455 /**
1456 * @tc.steps:step3. check if the hash of assets in db is empty
1457 * @tc.expected: step3. OK
1458 */
1459 CheckDownloadFailedForTest002(db);
1460 }
1461
1462 /**
1463 * @tc.name: DownloadAssetForDupDataTest003
1464 * @tc.desc: Test download failed and flag was modified
1465 * @tc.type: FUNC
1466 * @tc.require:
1467 * @tc.author: bty
1468 */
1469 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level0)
1470 {
1471 /**
1472 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1473 * @tc.expected: step1. return OK
1474 */
1475 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1476 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1477 int index = 0;
1478 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1479 .WillRepeatedly(
__anon3db76c681b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1480 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1481 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1482 for (auto &item : assets) {
1483 for (auto &asset : item.second) {
1484 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1485 }
1486 }
1487 return DBStatus::CLOUD_ERROR;
1488 });
1489
1490 /**
1491 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1492 * @tc.expected: step2. sync success.
1493 */
1494 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1495 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1496
1497 /**
1498 * @tc.steps:step3. check if the hash of assets in db is empty
1499 * @tc.expected: step3. OK
1500 */
1501 CheckDownloadFailedForTest002(db);
1502 }
1503
1504 /**
1505 * @tc.name: DownloadAssetForDupDataTest004
1506 * @tc.desc: test sync with deleted assets
1507 * @tc.type: FUNC
1508 * @tc.require:
1509 * @tc.author: bty
1510 */
1511 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level0)
1512 {
1513 /**
1514 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1515 * @tc.expected: step1. return OK
1516 */
1517 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1518 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1519 int index = 0;
1520 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1521 .WillRepeatedly(
__anon3db76c681c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1522 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1523 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1524 return DBStatus::OK;
1525 });
1526
1527 /**
1528 * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1529 * @tc.expected: step2. return OK
1530 */
1531 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1532 UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1533 UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1534 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1535
1536 /**
1537 * @tc.steps:step3. sync, check download num
1538 * @tc.expected: step3. return OK
1539 */
1540 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1541 EXPECT_GE(index, 2); // 2 is download num
1542 }
1543
1544 /**
1545 * @tc.name: DownloadAssetForDupDataTest005
1546 * @tc.desc: test DOWNLOADING status of asset after uploading
1547 * @tc.type: FUNC
1548 * @tc.require:
1549 * @tc.author: bty
1550 */
1551 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level0)
1552 {
1553 /**
1554 * @tc.steps:step1. init data and sync
1555 * @tc.expected: step1. return OK
1556 */
1557 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1558 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1559 UpdateAssetsForLocal(db, 6, AssetStatus::DOWNLOADING); // 6 is id
1560 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1561
1562 /**
1563 * @tc.steps:step2. check asset status
1564 * @tc.expected: step2. return OK
1565 */
1566 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1567 sqlite3_stmt *stmt = nullptr;
1568 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1569 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1570 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1571 Type cloudValue;
1572 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1573 std::vector<uint8_t> assetsBlob;
1574 Assets assets;
1575 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1576 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1577 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1578 for (size_t i = 0; i < assets.size(); ++i) {
1579 EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1580 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1581 }
1582 }
1583 int errCode;
1584 SQLiteUtils::ResetStatement(stmt, true, errCode);
1585 }
1586
1587 /**
1588 * @tc.name: FillAssetId019
1589 * @tc.desc: Test the stability of cleaning asset id
1590 * @tc.type: FUNC
1591 * @tc.require:
1592 * @tc.author: bty
1593 */
1594 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level0)
1595 {
1596 /**
1597 * @tc.steps:step1. local insert assets and sync.
1598 * @tc.expected: step1. return OK.
1599 */
1600 int localCount = 20;
1601 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1602 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1603
1604 /**
1605 * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1606 * @tc.expected: step2. return OK.
1607 */
1608 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1609 + " set data_key='999' where data_key>'10';";
1610 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1611 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1612 }
1613
1614 /**
1615 * @tc.name: FillAssetId020
1616 * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1617 * @tc.type: FUNC
1618 * @tc.require:
1619 * @tc.author: zhangtao
1620 */
1621 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level0)
1622 {
1623 CloudSyncConfig config;
1624 config.maxUploadCount = 200; // max upload 200
1625 g_delegate->SetCloudSyncConfig(config);
1626
1627 /**
1628 * @tc.steps:step1. local insert assets and erase assets extends
1629 * @tc.expected: step1. return OK.
1630 */
1631 int localCount = 50;
1632 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon3db76c681d02(const std::string &tableName, VBucket &extend) 1633 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1634 extend.erase("assets");
1635 });
1636 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1637 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1638
1639 /**
1640 * @tc.steps:step2. local insert assets and modify assetId to empty
1641 * @tc.expected: step2. return OK.
1642 */
1643 int addLocalCount = 10;
1644 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681e02(const std::string &tableName, VBucket &extend) 1645 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1646 if (extend.find("assets") != extend.end()) {
1647 for (auto &asset : std::get<Assets>(extend["assets"])) {
1648 asset.assetId = "";
1649 }
1650 }
1651 });
1652 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1653 int beginFailFillNum = 101;
1654 int endFailFillNum = 120;
1655 std::set<int> index;
1656 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1657 index.insert(i);
1658 }
1659 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1660
1661 /**
1662 * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1663 * @tc.expected: step3. return OK.
1664 */
1665 InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon3db76c681f02(const std::string &tableName, VBucket &extend) 1666 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1667 if (extend.find("assets") != extend.end()) {
1668 for (auto &asset : std::get<Assets>(extend["assets"])) {
1669 asset.name = "mod_pat";
1670 }
1671 }
1672 });
1673 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1674 beginFailFillNum = 121;
1675 endFailFillNum = 140;
1676 std::set<int> newIndex;
1677 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1678 newIndex.insert(i);
1679 }
1680 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1681
1682 /**
1683 * @tc.steps:step4. local update assets and sync, check the local assetId.
1684 * @tc.expected: step4. sync success.
1685 */
1686 g_virtualCloudDb->ForkUpload(nullptr);
1687 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1688 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1689 }
1690
1691 /**
1692 * @tc.name: FillAssetId021
1693 * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1694 * @tc.type: FUNC
1695 * @tc.require:
1696 * @tc.author: zhangtao
1697 */
1698 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level0)
1699 {
1700 CloudSyncConfig config;
1701 config.maxUploadCount = 200; // max upload 200
1702 g_delegate->SetCloudSyncConfig(config);
1703
1704 /**
1705 * @tc.steps:step1. local insert assets and erase assets extends
1706 * @tc.expected: step1. return OK.
1707 */
1708 int localCount = 50;
1709 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1710
1711 /**
1712 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1713 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1714 */
1715 int uploadFailId = 0;
1716 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1717 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1718 uploadFailId++;
1719 if (uploadFailId == 25) { // 25 is the middle record
1720 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1721 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1722 }
1723 return OK;
1724 });
1725
1726 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1727 int beginFailFillNum = 49;
1728 int endFailFillNum = 50;
1729 std::set<int> index;
1730 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1731 index.insert(i);
1732 }
1733 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1734 g_virtualCloudDb->ForkUpload(nullptr);
1735 }
1736
1737 /**
1738 * @tc.name: FillAssetId022
1739 * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1740 * @tc.type: FUNC
1741 * @tc.require:
1742 * @tc.author: zhangtao
1743 */
1744 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level0)
1745 {
1746 CloudSyncConfig config;
1747 config.maxUploadCount = 200; // max upload 200
1748 g_delegate->SetCloudSyncConfig(config);
1749
1750 /**
1751 * @tc.steps:step1. local insert assets and erase assets extends
1752 * @tc.expected: step1. return OK.
1753 */
1754 int localCount = 50;
1755 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1756
1757 /**
1758 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1759 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1760 */
1761 int uploadFailId = 0;
1762 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1763 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1764 uploadFailId++;
1765 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1766 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1767 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1768 }
1769 return OK;
1770 });
1771
1772 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1773 int beginFailFillNum = 49;
1774 int endFailFillNum = 54;
1775 std::set<int> index;
1776 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1777 index.insert(i);
1778 }
1779 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1780 g_virtualCloudDb->ForkUpload(nullptr);
1781 }
1782
1783 /**
1784 * @tc.name: FillAssetId023
1785 * @tc.desc: Test if BatchUpdate with local assets missing
1786 * @tc.type: FUNC
1787 * @tc.require:
1788 * @tc.author: zhangtao
1789 */
1790 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level0)
1791 {
1792 /**
1793 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1794 * @tc.expected: step1. return OK.
1795 */
1796 int localCount = 50;
1797 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1798 std::atomic<int> count = 1;
1799 g_virtualCloudDb->SetClearExtend(count);
1800 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1801 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1802
1803 /**
1804 * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1805 * @tc.expected: step2. return OK.
1806 */
1807 g_virtualCloudDb->SetClearExtend(0);
1808
1809 int uploadFailId = 0;
1810 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1811 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1812 uploadFailId++;
1813 if (uploadFailId == 25) { // 25 is the middle record
1814 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1815 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1816 }
1817 return OK;
1818 });
1819 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1820 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1821 }
1822
1823 /**
1824 * @tc.name: FillAssetId024
1825 * @tc.desc: Test if BatchUpdate with multiple local assets missing
1826 * @tc.type: FUNC
1827 * @tc.require:
1828 * @tc.author: zhangtao
1829 */
1830 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level0)
1831 {
1832 /**
1833 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1834 * @tc.expected: step1. return OK.
1835 */
1836 int localCount = 50;
1837 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1838 std::atomic<int> count = 1;
1839 g_virtualCloudDb->SetClearExtend(count);
1840 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1841 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1842
1843 /**
1844 * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1845 * @tc.expected: step2. return OK.
1846 */
1847 g_virtualCloudDb->SetClearExtend(0);
1848
1849 int uploadFailId = 0;
1850 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1851 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1852 uploadFailId++;
1853 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1854 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1855 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1856 }
1857 return OK;
1858 });
1859 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1860 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1861 }
1862
1863 /**
1864 * @tc.name: FillAssetId025
1865 * @tc.desc: Test if BatchInsert with local assets missing and missing record added into successCount
1866 * @tc.type: FUNC
1867 * @tc.require:
1868 * @tc.author: zhangtao
1869 */
1870 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId025, TestSize.Level0)
1871 {
1872 CloudSyncConfig config;
1873 config.maxUploadCount = 200; // max upload 200
1874 g_delegate->SetCloudSyncConfig(config);
1875 /**
1876 * @tc.steps:step1. insert local data.
1877 * @tc.expected: step1. return OK.
1878 */
1879 int localCount = 40;
1880 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1881
1882 /**
1883 * @tc.steps:step2. BatchInsert with local assets missing then sync, check the asseid.
1884 * @tc.expected: step2. return OK.
1885 */
1886 int uploadFailId = 0;
1887 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon3db76c682402(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1888 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1889 uploadFailId++;
1890 if (uploadFailId == 25) { // 25 is the middle record
1891 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1892 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1893 }
1894 return OK;
1895 });
1896 g_syncProcess = {};
1897 Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
1898 std::vector<TableProcessInfo> expectProcess = {
1899 { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
1900 { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
1901 };
1902
1903 /**
1904 * @tc.steps:step3. Check if sync process consistent with exptectProcess
1905 * @tc.expected: step3. return OK.
1906 */
1907 int index = 0;
__anon3db76c682502(const std::map<std::string, SyncProcess> &process) 1908 CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
1909 g_syncProcess = std::move(process.begin()->second);
1910 ASSERT_LT(index, 2);
1911 for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
1912 EXPECT_EQ(info.process, expectProcess[index].process);
1913 EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
1914 EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
1915 EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
1916 EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
1917 }
1918 index++;
1919 if (g_syncProcess.process == FINISHED) {
1920 g_processCondition.notify_one();
1921 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1922 }
1923 };
1924 ASSERT_EQ(g_delegate->Sync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
1925 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
1926
1927 /**
1928 * @tc.steps:step4. Check assets results
1929 * @tc.expected: step4. return OK.
1930 */
1931 int beginFailFillNum = 49;
1932 int endFailFillNum = 50;
1933 std::set<int> indexes;
1934 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1935 indexes.insert(i);
1936 }
1937 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", indexes);
1938 g_virtualCloudDb->ForkUpload(nullptr);
1939 }
1940
1941 /**
1942 * @tc.name: ConsistentFlagTest001
1943 * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1944 * @tc.type: FUNC
1945 * @tc.require:
1946 * @tc.author: bty
1947 */
1948 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level0)
1949 {
1950 /**
1951 * @tc.steps:step1. init data for the different asset, sync and check flag
1952 * @tc.expected: step1. return OK.
1953 */
1954 int localCount = 10; // 10 is num of local
1955 int cloudCount = 20; // 20 is num of cloud
1956 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1957 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1958 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1959 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1960 CheckConsistentCount(db, cloudCount);
1961
1962 /**
1963 * @tc.steps:step2. update local data, sync and check flag
1964 * @tc.expected: step2. return OK.
1965 */
1966 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1967 DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1968 CheckConsistentCount(db, 0L);
1969 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1970 CheckConsistentCount(db, cloudCount);
1971 }
1972
1973 /**
1974 * @tc.name: ConsistentFlagTest002
1975 * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1976 * @tc.type: FUNC
1977 * @tc.require:
1978 * @tc.author: bty
1979 */
1980 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level0)
1981 {
1982 /**
1983 * @tc.steps:step1. init data for the same asset, sync and check flag
1984 * @tc.expected: step1. return OK.
1985 */
1986 int cloudCount = 20; // 20 is num of cloud
1987 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1988 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1989 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1990 CheckConsistentCount(db, cloudCount);
1991
1992 /**
1993 * @tc.steps:step2. update local data, sync and check flag
1994 * @tc.expected: step2. return OK.
1995 */
1996 int deleteLocalCount = 5;
1997 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1998 CheckConsistentCount(db, cloudCount - deleteLocalCount);
1999 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2000 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2001 CheckConsistentCount(db, cloudCount);
2002 }
2003
2004 /**
2005 * @tc.name: ConsistentFlagTest003
2006 * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
2007 * @tc.type: FUNC
2008 * @tc.require:
2009 * @tc.author: bty
2010 */
2011 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level0)
2012 {
2013 /**
2014 * @tc.steps:step1. init data
2015 * @tc.expected: step1. return OK.
2016 */
2017 int localCount = 20; // 20 is num of local
2018 int cloudCount = 10; // 10 is num of cloud
2019 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2020 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2021 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2022
2023 /**
2024 * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
2025 * @tc.expected: step2. return OK.
2026 */
2027 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2028 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2029 int index = 0;
2030 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2031 .WillRepeatedly(
__anon3db76c682602(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2032 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2033 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2034 if (index == 1) { // 1 is first download
2035 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2036 }
2037 return DBStatus::OK;
2038 });
2039
2040 /**
2041 * @tc.steps:step3. fork upload, check consistent count
2042 * @tc.expected: step3. return OK.
2043 */
2044 int upIdx = 0;
__anon3db76c682702(const std::string &tableName, VBucket &extend) 2045 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2046 LOGD("upload index:%d", ++upIdx);
2047 if (upIdx == 1) { // 1 is first upload
2048 CheckConsistentCount(db, localCount - cloudCount - 1);
2049 }
2050 });
2051
2052 /**
2053 * @tc.steps:step4. fork query, check consistent count
2054 * @tc.expected: step4. return OK.
2055 */
2056 int queryIdx = 0;
__anon3db76c682802(const std::string &, VBucket &) 2057 g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
2058 LOGD("query index:%d", ++queryIdx);
2059 if (queryIdx == 3) { // 3 is the last query
2060 CheckConsistentCount(db, localCount - 1);
2061 }
2062 });
2063 int count = 0;
__anon3db76c682902() 2064 g_cloudStoreHook->SetSyncFinishHook([&count]() {
2065 count++;
2066 if (count == 2) { // 2 is compensated sync
2067 g_processCondition.notify_one();
2068 }
2069 });
2070 /**
2071 * @tc.steps:step5. sync, check consistent count
2072 * @tc.expected: step5. return OK.
2073 */
2074 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2075 WaitForSync(count);
2076 CheckConsistentCount(db, localCount);
2077 }
2078
2079 /**
2080 * @tc.name: ConsistentFlagTest004
2081 * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
2082 * @tc.type: FUNC
2083 * @tc.require:
2084 * @tc.author: bty
2085 */
2086 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level0)
2087 {
2088 /**
2089 * @tc.steps:step1. init data
2090 * @tc.expected: step1. return OK.
2091 */
2092 int localCount = 20; // 20 is num of local
2093 int cloudCount = 10; // 10 is num of cloud
2094 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2095 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2096 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2097
2098 /**
2099 * @tc.steps:step2. fork upload, return error filed of CLOUD_NETWORK_ERROR
2100 * @tc.expected: step2. return OK.
2101 */
2102 int upIdx = 0;
__anon3db76c682a02(const std::string &tableName, VBucket &extend) 2103 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2104 LOGD("upload index:%d", ++upIdx);
2105 if (upIdx == 1) {
2106 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2107 }
2108 });
2109
2110 /**
2111 * @tc.steps:step3. sync, check consistent count
2112 * @tc.expected: step3. return OK.
2113 */
2114 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2115 CheckConsistentCount(db, localCount - 1);
2116
2117 /**
2118 * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2119 * @tc.expected: step4. return OK.
2120 */
2121 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2122 upIdx = 0;
__anon3db76c682b02(const std::string &tableName, VBucket &extend) 2123 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2124 LOGD("upload index:%d", ++upIdx);
2125 if (upIdx == 1) {
2126 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2127 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2128 }
2129 if (upIdx == 2) {
2130 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2131 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2132 }
2133 });
2134
2135 /**
2136 * @tc.steps:step5. sync, check consistent count
2137 * @tc.expected: step5. return OK.
2138 */
2139 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2140 CheckConsistentCount(db, localCount - 2);
2141 }
2142
2143 /**
2144 * @tc.name: ConsistentFlagTest005
2145 * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2146 * @tc.type: FUNC
2147 * @tc.require:
2148 * @tc.author: bty
2149 */
2150 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level0)
2151 {
2152 /**
2153 * @tc.steps:step1. init data
2154 * @tc.expected: step1. return OK.
2155 */
2156 int localCount = 20; // 20 is num of local
2157 int cloudCount = 10; // 10 is num of cloud
2158 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2159 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2160 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2161
2162 /**
2163 * @tc.steps:step2. fork download, update local assets where id=2
2164 * @tc.expected: step2. return OK.
2165 */
2166 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2167 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2168 int index = 0;
2169 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2170 .WillRepeatedly(
2171 [this, &index](const std::string &, const std::string &gid, const Type &,
__anon3db76c682c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2172 std::map<std::string, Assets> &assets) {
2173 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2174 if (index == 1) { // 1 is first download
2175 std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2176 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2177 }
2178 return DBStatus::OK;
2179 });
2180
2181 /**
2182 * @tc.steps:step3. fork upload, check consistent count
2183 * @tc.expected: step3. return OK.
2184 */
2185 int upIdx = 0;
__anon3db76c682d02(const std::string &tableName, VBucket &extend) 2186 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2187 LOGD("upload index:%d", ++upIdx);
2188 if (upIdx == 1) { // 1 is first upload
2189 CheckConsistentCount(db, localCount - cloudCount - 1);
2190 }
2191 });
2192
2193 /**
2194 * @tc.steps:step4. sync, check consistent count
2195 * @tc.expected: step4. return OK.
2196 */
2197 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2198 CheckConsistentCount(db, localCount);
2199 }
2200
2201 /**
2202 * @tc.name: ConsistentFlagTest006
2203 * @tc.desc:
2204 * @tc.type: FUNC
2205 * @tc.require:
2206 * @tc.author: bty
2207 */
2208 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level0)
2209 {
2210 /**
2211 * @tc.steps:step1. init data
2212 * @tc.expected: step1. return OK.
2213 */
2214 int cloudCount = 10; // 10 is num of cloud
2215 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2216 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2217
2218 /**
2219 * @tc.steps:step2. fork download, update local assets where id=2
2220 * @tc.expected: step2. return OK.
2221 */
2222 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2223 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2224 int delCount = 3; // 3 is num of cloud
2225 DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2226 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2227 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2228 int index = 0;
2229 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2230 .WillRepeatedly(
2231 [&index](const std::string &, const std::string &gid, const Type &,
__anon3db76c682e02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2232 std::map<std::string, Assets> &assets) {
2233 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2234 if (index == 1) { // 1 is first download
2235 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2236 }
2237 return DBStatus::OK;
2238 });
2239
2240 /**
2241 * @tc.steps:step3. fork upload, check consistent count
2242 * @tc.expected: step3. return OK.
2243 */
2244 int upIdx = 0;
__anon3db76c682f02(const std::string &tableName, VBucket &extend) 2245 g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2246 LOGD("upload index:%d", ++upIdx);
2247 if (upIdx == 1) { // 1 is first upload
2248 CheckConsistentCount(db, delCount);
2249 CheckCompensatedCount(db, 0L);
2250 }
2251 });
2252
2253 /**
2254 * @tc.steps:step4. sync, check consistent count
2255 * @tc.expected: step4. return OK.
2256 */
2257 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2258 CheckConsistentCount(db, cloudCount);
2259 }
2260
2261 /**
2262 * @tc.name: SyncDataStatusTest001
2263 * @tc.desc: No need to download asset, check status after sync
2264 * @tc.type: FUNC
2265 * @tc.require:
2266 * @tc.author: bty
2267 */
2268 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level0)
2269 {
2270 DataStatusTest001(false);
2271 }
2272
2273 /**
2274 * @tc.name: SyncDataStatusTest002
2275 * @tc.desc: Need to download asset, check status after sync
2276 * @tc.type: FUNC
2277 * @tc.require:
2278 * @tc.author: bty
2279 */
2280 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level0)
2281 {
2282 DataStatusTest001(true);
2283 }
2284
2285 /**
2286 * @tc.name: SyncDataStatusTest003
2287 * @tc.desc: Lock during download and check status
2288 * @tc.type: FUNC
2289 * @tc.require:
2290 * @tc.author: bty
2291 */
2292 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level0)
2293 {
2294 DataStatusTest003();
2295 }
2296
2297 /**
2298 * @tc.name: SyncDataStatusTest004
2299 * @tc.desc: Lock and delete during download, check status
2300 * @tc.type: FUNC
2301 * @tc.require:
2302 * @tc.author: bty
2303 */
2304 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level0)
2305 {
2306 DataStatusTest004();
2307 }
2308
2309 /**
2310 * @tc.name: SyncDataStatusTest005
2311 * @tc.desc: Lock and update during download, check status
2312 * @tc.type: FUNC
2313 * @tc.require:
2314 * @tc.author: bty
2315 */
2316 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level0)
2317 {
2318 DataStatusTest005();
2319 }
2320
2321 /**
2322 * @tc.name: SyncDataStatusTest006
2323 * @tc.desc: Lock and update and Unlock during download, check status
2324 * @tc.type: FUNC
2325 * @tc.require:
2326 * @tc.author: bty
2327 */
2328 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level0)
2329 {
2330 DataStatusTest006();
2331 }
2332
2333 /**
2334 * @tc.name: SyncDataStatusTest007
2335 * @tc.desc: Download return error, check status
2336 * @tc.type: FUNC
2337 * @tc.require:
2338 * @tc.author: bty
2339 */
2340 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level0)
2341 {
2342 DataStatusTest007();
2343 }
2344
2345 /**
2346 * @tc.name: SyncDataStatusTest008
2347 * @tc.desc: Test upload process when data locked
2348 * @tc.type: FUNC
2349 * @tc.require:
2350 * @tc.author: bty
2351 */
2352 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level0)
2353 {
2354 /**
2355 * @tc.steps:step1. init local data
2356 * @tc.expected: step1. return OK.
2357 */
2358 int localCount = 40;
2359 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2360 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2361 std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2362 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2363
2364 /**
2365 * @tc.steps:step2. sync and check process
2366 * @tc.expected: step2. return OK.
2367 */
2368 g_syncProcess = {};
2369 Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2370 std::vector<TableProcessInfo> expectProcess = {
2371 { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2372 { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2373 };
2374 int index = 0;
2375 CloudSyncConfig config;
2376 config.maxUploadCount = 100; // max upload 100
2377 g_delegate->SetCloudSyncConfig(config);
__anon3db76c683002(const std::map<std::string, SyncProcess> &process) 2378 CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2379 g_syncProcess = std::move(process.begin()->second);
2380 ASSERT_LT(index, 2);
2381 for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2382 EXPECT_EQ(info.process, expectProcess[index].process);
2383 EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2384 EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2385 EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2386 EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2387 }
2388 index++;
2389 if (g_syncProcess.process == FINISHED) {
2390 g_processCondition.notify_one();
2391 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2392 }
2393 };
2394 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2395 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2396 }
2397
2398 /**
2399 * @tc.name: DownloadAssetTest001
2400 * @tc.desc: Test the asset status after the share table sync
2401 * @tc.type: FUNC
2402 * @tc.require:
2403 * @tc.author: bty
2404 */
2405 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level0)
2406 {
2407 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
2408 /**
2409 * @tc.steps:step1. init data and sync
2410 * @tc.expected: step1. return OK.
2411 */
2412 int cloudCount = 10; // 10 is num of cloud
2413 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2414 CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2415
2416 /**
2417 * @tc.steps:step2. check asset status
2418 * @tc.expected: step2. return OK.
2419 */
2420 SqlCondition condition;
2421 condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2422 condition.readOnly = true;
2423 std::vector<VBucket> records;
2424 EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2425 for (const auto &data: records) {
2426 Assets assets;
2427 CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2428 for (const auto &asset: assets) {
2429 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2430 }
2431 }
2432 EXPECT_EQ(g_virtualAssetLoader->GetBatchDownloadCount(), 0u);
2433 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
2434 }
2435
2436 /**
2437 * @tc.name: DownloadAssetTest002
2438 * @tc.desc: Test asset download failed and re download
2439 * @tc.type: FUNC
2440 * @tc.require:
2441 * @tc.author: liaoyonghuang
2442 */
2443 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level0)
2444 {
2445 /**
2446 * @tc.steps:step1. init data
2447 * @tc.expected: step1. return OK.
2448 */
2449 int cloudCount = 10; // 10 is num of cloud
2450 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2451
2452 /**
2453 * @tc.steps:step2. Set asset download status error and sync
2454 * @tc.expected: step2. sync successful but download assets fail.
2455 */
2456 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2457 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2458
2459 /**
2460 * @tc.steps:step3. Set asset download status OK and sync
2461 * @tc.expected: step3. return OK.
2462 */
2463 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2464 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2465
2466 /**
2467 * @tc.steps:step4. Check assets status
2468 * @tc.expected: step4. status is NORMAL.
2469 */
2470 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2471 sqlite3_stmt *stmt = nullptr;
2472 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2473 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2474 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2475 Type cloudValue;
2476 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2477 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2478 for (const auto &asset : assets) {
2479 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2480 }
2481 }
2482 int errCode = E_OK;
2483 SQLiteUtils::ResetStatement(stmt, true, errCode);
2484 EXPECT_EQ(errCode, E_OK);
2485 }
2486
2487 /**
2488 * @tc.name: DownloadAssetTest003
2489 * @tc.desc: Test asset download after sync task recovery
2490 * @tc.type: FUNC
2491 * @tc.require:
2492 * @tc.author: liaoyonghuang
2493 */
2494 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest003, TestSize.Level0)
2495 {
2496 /**
2497 * @tc.steps:step1. init data
2498 * @tc.expected: step1. return OK.
2499 */
2500 int cloudCount = 10; // 10 is num of cloud
2501 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2502 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2503 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
2504 InsertCloudDBData(0, cloudCount, 0, NO_PRIMARY_TABLE);
2505 /**
2506 * @tc.steps:step2. Set task interrupted before asset download
2507 * @tc.expected: step2. return OK.
2508 */
2509 int queryTime = 0;
__anon3db76c683102(const std::string &, VBucket &) 2510 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2511 queryTime++;
2512 if (queryTime != 1) {
2513 return;
2514 }
2515 Query query = Query::Select().FromTable({NO_PRIMARY_TABLE});
2516 CloudSyncOption option;
2517 option.priorityTask = true;
2518 option.devices = {DEVICE_CLOUD};
2519 option.mode = SYNC_MODE_CLOUD_MERGE;
2520 option.query = query;
2521 ASSERT_EQ(g_delegate->Sync(option, nullptr), OK);
2522 });
2523 /**
2524 * @tc.steps:step3. Sync
2525 * @tc.expected: step3. return OK.
2526 */
2527 int removeTime = 0;
__anon3db76c683202(std::map<std::string, Assets> &assets) 2528 g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
2529 removeTime++;
2530 return OK;
2531 });
2532 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2533 /**
2534 * @tc.steps:step4. Check fork asset download time and observer
2535 * @tc.expected: step4. return OK.
2536 */
2537 EXPECT_EQ(removeTime, cloudCount);
2538 ChangedData expectedChangeData1;
2539 ChangedData expectedChangeData2;
2540 expectedChangeData1.tableName = NO_PRIMARY_TABLE;
2541 expectedChangeData2.tableName = ASSETS_TABLE_NAME;
2542 expectedChangeData1.type = ChangedDataType::ASSET;
2543 expectedChangeData2.type = ChangedDataType::ASSET;
2544 expectedChangeData1.field.push_back(std::string("rowid"));
2545 expectedChangeData2.field.push_back(std::string("id"));
2546 for (int i = 0; i < cloudCount; i++) {
2547 expectedChangeData1.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 1});
2548 expectedChangeData2.primaryData[ChangeType::OP_DELETE].push_back({(int64_t)i});
2549 }
2550 g_observer->SetExpectedResult(expectedChangeData1);
2551 g_observer->SetExpectedResult(expectedChangeData2);
2552 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
2553 g_observer->ClearChangedData();
2554
2555 g_virtualCloudDb->ForkInsertConflict(nullptr);
2556 g_virtualCloudDb->ForkQuery(nullptr);
2557 g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
2558 }
2559
2560 /**
2561 * @tc.name: RecordLockFuncTest001
2562 * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2563 * @tc.type: FUNC
2564 * @tc.author: lijun
2565 */
2566 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level0)
2567 {
2568 /**
2569 * @tc.steps:step1. init local data
2570 * @tc.expected: step1. return OK.
2571 */
2572 int localCount = 100;
2573 int cloudCount = 100;
2574 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2575 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2576 std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2577 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2578 CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2579 CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2580 DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2581
2582 /**
2583 * @tc.steps:step2. init cloud data
2584 * @tc.expected: step2. return OK.
2585 */
2586 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2587 UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2588
2589 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2590 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2591 int index = 0;
2592 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2593 .WillRepeatedly(
__anon3db76c683302(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2594 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2595 LOGD("Download GID:%s %d", gid.c_str(), index);
2596 index++;
2597 if (index <= 30) {
2598 return DBStatus::CLOUD_ERROR;
2599 } else {
2600 return DBStatus::OK;
2601 }
2602
2603 });
2604
2605 std::mutex mtx;
2606 std::condition_variable cv;
2607 int queryIdx = 0;
2608 bool ready = false;
__anon3db76c683402(const std::string &, VBucket &) 2609 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2610 LOGD("query index:%d", ++queryIdx);
2611 if (queryIdx == 2) { // 2 is compensated sync
2612 std::unique_lock<std::mutex> lock(mtx);
2613 ready = true;
2614 cv.notify_one();
2615 }
2616 });
2617 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2618 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2619
2620 {
2621 std::unique_lock<std::mutex> lock(mtx);
__anon3db76c683502null2622 cv.wait(lock, [&]{ return ready; });
2623 }
2624 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2625
2626 std::this_thread::sleep_for(std::chrono::seconds(6));
2627 /**
2628 * @tc.steps:step3. check after compensated sync
2629 * @tc.expected: step3. all is UNLOCKING.
2630 */
2631 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2632 }
2633
2634 /**
2635 * @tc.name: RecordLockFuncTest002
2636 * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2637 * based on the cloud, and the last synchronization data is based on the device.
2638 * @tc.type: FUNC
2639 * @tc.author: lijun
2640 */
2641 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level0)
2642 {
2643 /**
2644 * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2645 * @tc.expected: step1. return OK.
2646 */
2647 int localCount = 120;
2648 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2649 std::vector<std::vector<uint8_t>> hashKey;
2650 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2651 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2652 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2653 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2654 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2655
2656 /**
2657 * @tc.steps:step2. Check the synchronization result and log table status
2658 * @tc.expected: step2.100-109 is LOCK_CHANGE.
2659 */
2660 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2661 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2662 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2663 CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2664 CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2665
2666 /**
2667 * @tc.steps:step3. Synchronize and check the lock_change data status
2668 * @tc.expected: step3.100-119 is LOCK_CHANGE.
2669 */
2670 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2671 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2672 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2673
2674 /**
2675 * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2676 * @tc.expected: step4.100-119 is UNLOCKING.
2677 */
2678 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2679 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2680 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2681
2682 /**
2683 * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2684 * @tc.expected: step5.100-119 is LOCK_CHANGE.
2685 */
2686 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2687 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2688 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2689
2690 /**
2691 * @tc.steps:step6. Synchronize and check the lock_change data status
2692 * @tc.expected: step6.100-119 is LOCK_CHANGE.
2693 */
2694 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2695 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2696 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2697
2698 /**
2699 * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2700 * @tc.expected: step7.100-119 is UNLOCKING.
2701 */
2702 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2703 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2704 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2705
2706 /**
2707 * @tc.steps:step8. Synchronize data
2708 * @tc.expected: step8.return OK.
2709 */
2710 std::mutex mtx;
2711 std::condition_variable cv;
2712 int queryIdx = 0;
2713 bool ready = false;
__anon3db76c683602(const std::string &, VBucket &) 2714 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2715 LOGD("query index:%d", ++queryIdx);
2716 if (queryIdx == 5) { // 5 is compensated sync
2717 std::unique_lock<std::mutex> lock(mtx);
2718 ready = true;
2719 cv.notify_one();
2720 }
2721 });
2722 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2723 {
2724 std::unique_lock<std::mutex> lock(mtx);
__anon3db76c683702null2725 cv.wait(lock, [&]{ return ready; });
2726 }
2727
2728 std::this_thread::sleep_for(std::chrono::seconds(6));
2729 /**
2730 * @tc.steps:step9. check after compensated sync
2731 * @tc.expected: step9. all is UNLOCK.
2732 */
2733 CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2734 }
2735
2736 /**
2737 * @tc.name: CloudTaskStatusTest001
2738 * @tc.desc: Test get cloud task status
2739 * @tc.type: FUNC
2740 * @tc.require:
2741 * @tc.author: liaoyonghuang
2742 */
2743 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest001, TestSize.Level1)
2744 {
2745 /**
2746 * @tc.steps:step1. init data
2747 * @tc.expected: step1. return OK.
2748 */
2749 int cloudCount = 10; // 10 is num of cloud
2750 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2751
2752 /**
2753 * @tc.steps:step2. Sync and get cloud task status
2754 * @tc.expected: step2. OK
2755 */
2756 g_virtualCloudDb->SetBlockTime(1000);
__anon3db76c683802() 2757 std::thread syncThread([&]() {
2758 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2759 });
2760 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2761 SyncProcess process1 = g_delegate->GetCloudTaskStatus(UINT64_MAX);
2762 EXPECT_EQ(process1.errCode, OK);
2763 syncThread.join();
2764 /**
2765 * @tc.steps:step3. Get cloud task status after sync finish
2766 * @tc.expected: step3. NOT_FOUND
2767 */
2768 SyncProcess process2 = g_delegate->GetCloudTaskStatus(1);
2769 EXPECT_EQ(process2.errCode, NOT_FOUND);
2770
2771 /**
2772 * @tc.steps:step4. Get cloud task status after DB closed
2773 * @tc.expected: step4. DB_ERROR
2774 */
2775 auto delegateImpl = static_cast<RelationalStoreDelegateImpl *>(g_delegate);
2776 EXPECT_EQ(delegateImpl->Close(), DBStatus::OK);
2777 SyncProcess process3 = g_delegate->GetCloudTaskStatus(1);
2778 EXPECT_EQ(process3.errCode, DB_ERROR);
2779 }
2780
2781 /**
2782 * @tc.name: CloudTaskStatusTest002
2783 * @tc.desc: Test get cloud task status when task merge
2784 * @tc.type: FUNC
2785 * @tc.require:
2786 * @tc.author: suyue
2787 */
2788 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest002, TestSize.Level1)
2789 {
2790 /**
2791 * @tc.steps:step1. init data
2792 * @tc.expected: step1. return OK
2793 */
2794 int cloudCount = 100; // 100 is num of cloud
2795 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2796
2797 /**
2798 * @tc.steps:step2. sync tasks 2 and 3 that can be merged when synchronizing task 1, get status of the merge task
2799 * @tc.expected: step2. the errCode of task 2 and task 3 is OK or NOT_FOUND
2800 */
2801 g_virtualCloudDb->SetBlockTime(1000);
2802 Query query = Query::Select().FromTable({ASSETS_TABLE_NAME});
__anon3db76c683902(const std::map<std::string, SyncProcess> &process) 2803 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2804 std::unique_lock<std::mutex> lock(g_processMutex);
2805 if (process.begin()->second.process == FINISHED) {
2806 g_processCondition.notify_one();
2807 }
2808 };
2809 CloudSyncOption option = {.devices = {DEVICE_CLOUD}, .mode = SYNC_MODE_CLOUD_MERGE, .query = query,
2810 .waitTime = SYNC_WAIT_TIME, .lockAction = static_cast<LockAction>(0xff)};
2811
__anon3db76c683a02() 2812 std::thread syncThread1([option, callback]() {
2813 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2814 });
2815 option.merge = true;
__anon3db76c683b02() 2816 std::thread syncThread2([option, callback]() {
2817 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2818 });
__anon3db76c683c02() 2819 std::thread syncThread3([option, callback]() {
2820 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2821 });
2822 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2823 SyncProcess process1 = g_delegate->GetCloudTaskStatus(1);
2824 SyncProcess process2 = g_delegate->GetCloudTaskStatus(2);
2825 SyncProcess process3 = g_delegate->GetCloudTaskStatus(3);
2826 syncThread1.join();
2827 syncThread2.join();
2828 syncThread3.join();
2829 // Due to the task execution sequence, task 2 may be combined into 3 or task 3 may be combined into 2.
2830 // Therefore, the errCode of task 2 and 3 may be OK or NOT_FOUND.
2831 EXPECT_TRUE(process2.errCode == OK || process2.errCode == NOT_FOUND);
2832 EXPECT_TRUE(process3.errCode == OK || process3.errCode == NOT_FOUND);
2833 }
2834
2835 /**
2836 * @tc.name: CompensatedSyncTest001
2837 * @tc.desc: test compensated count more than 100.
2838 * @tc.type: FUNC
2839 * @tc.require:
2840 * @tc.author: tankaisheng
2841 */
2842 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CompensatedSyncTest001, TestSize.Level0)
2843 {
2844 /**
2845 * @tc.steps:step1. init data
2846 * @tc.expected: step1. return OK.
2847 */
2848 int dataCount = 120;
2849 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2850 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2851
2852 /**
2853 * @tc.steps:step2. set all data wait compensated.
2854 * @tc.expected: step2. return ok.
2855 */
2856 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
2857 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
2858 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
2859 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2860 reinterpret_cast<void *>(120u), nullptr), SQLITE_OK);
2861
2862 /**
2863 * @tc.steps:step3. sync with compensated.
2864 * @tc.expected: step3. return ok.
2865 */
2866 std::mutex processMutex;
2867 std::vector<SyncProcess> expectProcess;
2868 std::condition_variable cv;
2869 bool finish = false;
2870 auto callback = [&cv, &finish, &processMutex]
__anon3db76c683d02(const std::map<std::string, SyncProcess> &process) 2871 (const std::map<std::string, SyncProcess> &process) {
2872 for (auto &item : process) {
2873 if (item.second.process == FINISHED) {
2874 EXPECT_EQ(item.second.errCode, DBStatus::OK);
2875 std::unique_lock<std::mutex> lock(processMutex);
2876 finish = true;
2877 cv.notify_one();
2878 }
2879 }
2880 };
2881 CloudSyncOption option;
2882 option.devices = {DEVICE_CLOUD};
2883 option.priorityTask = true;
2884 option.compensatedSyncOnly = true;
2885 DBStatus syncResult = g_delegate->Sync(option, callback);
2886 EXPECT_EQ(syncResult, DBStatus::OK);
2887
2888 /**
2889 * @tc.steps:step4. wait sync finish and check data.
2890 * @tc.expected: step4. return ok.
2891 */
2892 std::unique_lock<std::mutex> lock(processMutex);
__anon3db76c683e02() 2893 cv.wait(lock, [&finish]() {
2894 return finish;
2895 });
2896 std::this_thread::sleep_for(std::chrono::seconds(1));
2897 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2898 reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
2899 }
2900 } // namespace
2901 #endif // RELATIONAL_STORE
2902