1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64 {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67 {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69 {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70 {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73 COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76 " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79 COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81 .name = "Phone",
82 .assetId = "0",
83 .subpath = "/local/sync",
84 .uri = "/local/sync",
85 .modifyTime = "123456",
86 .createTime = "",
87 .size = "256",
88 .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90 .name = "Phone_copy_2",
91 .assetId = "0",
92 .subpath = "/local/sync",
93 .uri = "/local/sync",
94 .modifyTime = "123456",
95 .createTime = "",
96 .size = "256",
97 .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128 .fields = CLOUD_FIELDS};
129 dataBaseSchema.tables.push_back(assetsTableSchema);
130 assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131 .fields = NO_PRIMARY_FIELDS};
132 dataBaseSchema.tables.push_back(assetsTableSchema);
133 assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134 .fields = COMPOUND_PRIMARY_FIELDS};
135 dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,bool hasAsset)138 std::pair<std::vector<VBucket>, std::vector<VBucket>> GenerateDataRecords(int64_t begin, int64_t count,
139 int64_t gidStart, bool hasAsset)
140 {
141 std::pair<std::vector<VBucket>, std::vector<VBucket>> res;
142 auto &[record, extend] = res;
143 for (int64_t i = begin; i < begin + count; i++) {
144 Assets assets;
145 if (hasAsset) {
146 Asset asset = ASSET_COPY;
147 asset.name = ASSET_COPY.name + std::to_string(i);
148 assets.emplace_back(asset);
149 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
150 assets.emplace_back(asset);
151 }
152 VBucket data;
153 data.insert_or_assign(COL_ID, i);
154 data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
155 data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
156 data.insert_or_assign(COL_ASSETS, assets);
157 data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
158 record.push_back(data);
159
160 VBucket log;
161 Timestamp now = TimeHelper::GetSysCurrentTime();
162 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
163 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
164 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
165 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
166 extend.push_back(log);
167 }
168 return res;
169 }
170
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)171 void GenerateDataRecords(
172 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
173 {
174 std::tie(record, extend) = GenerateDataRecords(begin, count, gidStart, true);
175 }
176
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)177 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
178 {
179 int errCode;
180 std::vector<VBucket> record;
181 std::vector<VBucket> extend;
182 GenerateDataRecords(begin, count, 0, record, extend);
183 const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
184 for (VBucket vBucket : record) {
185 sqlite3_stmt *stmt = nullptr;
186 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
187 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
188 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
189 ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
190 sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
191 if (isAssetNull) {
192 ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
193 } else {
194 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
195 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
196 }
197 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
198 std::get<Assets>(vBucket[COL_ASSETS]));
199 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
200 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
201 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
202 SQLiteUtils::ResetStatement(stmt, true, errCode);
203 }
204 }
205
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)206 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
207 {
208 int errCode;
209 std::vector<uint8_t> assetBlob;
210 const string sql = "update " + tableName + " set assets=?;";
211 sqlite3_stmt *stmt = nullptr;
212 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
213 if (isEmptyAssets) {
214 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
215 } else {
216 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
217 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
218 }
219 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
220 SQLiteUtils::ResetStatement(stmt, true, errCode);
221 }
222
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)223 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
224 {
225 int errCode;
226 std::vector<uint8_t> assetBlob;
227 const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
228 " and id<=" + std::to_string(end) + ";";
229 sqlite3_stmt *stmt = nullptr;
230 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
231 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
232 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
233 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
234 SQLiteUtils::ResetStatement(stmt, true, errCode);
235 }
236
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)237 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
238 {
239 ASSERT_NE(db, nullptr);
240 for (int64_t i = begin; i < begin + count; i++) {
241 string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
242 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
243 }
244 }
245
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)246 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
247 {
248 for (int64_t i = begin; i < begin + count; i++) {
249 VBucket idMap;
250 idMap.insert_or_assign("#_gid", std::to_string(i));
251 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
252 }
253 }
254
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)255 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
256 const std::string &tableName)
257 {
258 std::this_thread::sleep_for(std::chrono::milliseconds(1));
259 std::vector<VBucket> record;
260 std::vector<VBucket> extend;
261 GenerateDataRecords(begin, count, gidStart, record, extend);
262 for (auto &entry: extend) {
263 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
264 }
265 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
266 std::this_thread::sleep_for(std::chrono::milliseconds(1));
267 }
268
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)269 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
270 {
271 auto status = static_cast<std::vector<int64_t> *>(data);
272 const int decimal = 10;
273 for (int i = 0; i < count; i++) {
274 status->push_back(strtol(colValue[0], nullptr, decimal));
275 }
276 return 0;
277 }
278
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)279 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
280 {
281 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
282 std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
283 " and data_key <=" + std::to_string(endId) + ";";
284 std::vector<int64_t> status;
285 char *str = NULL;
286 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
287 SQLITE_OK);
288 ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
289
290 for (auto stat : status) {
291 ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
292 }
293 }
294
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName,bool hasAsset=true)295 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName,
296 bool hasAsset = true)
297 {
298 auto [record, extend] = GenerateDataRecords(begin, count, gidStart, hasAsset);
299 if (tableName == ASSETS_TABLE_NAME_SHARED) {
300 for (auto &vBucket: record) {
301 vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
302 }
303 }
304 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
305 }
306
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)307 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
308 {
309 std::unique_lock<std::mutex> lock(g_processMutex);
310 bool result = g_processCondition.wait_for(
311 lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
312 ASSERT_EQ(result, true);
313 LOGD("-------------------sync end--------------");
314 }
315
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK,bool isMerge=false)316 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK,
317 bool isMerge = false)
318 {
319 g_syncProcess = {};
320 Query query = Query::Select().FromTable(tableNames);
321 std::vector<SyncProcess> expectProcess;
322 CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
323 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
324 std::unique_lock<std::mutex> lock(g_processMutex);
325 g_syncProcess = process.begin()->second;
326 if (g_syncProcess.process == FINISHED) {
327 g_processCondition.notify_one();
328 ASSERT_EQ(g_syncProcess.errCode, errCode);
329 }
330 };
331 CloudSyncOption option;
332 option.devices = {DEVICE_CLOUD};
333 option.mode = mode;
334 option.query = query;
335 option.waitTime = SYNC_WAIT_TIME;
336 option.lockAction = static_cast<LockAction>(0xff); // lock all
337 option.merge = isMerge;
338 ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
339
340 if (dbStatus == DBStatus::OK) {
341 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
342 }
343 }
344
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)345 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
346 {
347 for (auto &item : assets) {
348 for (auto &asset : item.second) {
349 EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
350 if (index < 4) { // 1-4 is inserted
351 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
352 }
353 LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
354 index);
355 }
356 }
357 }
358
CheckDownloadFailedForTest002(sqlite3 * & db)359 void CheckDownloadFailedForTest002(sqlite3 *&db)
360 {
361 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
362 sqlite3_stmt *stmt = nullptr;
363 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
364 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
365 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
366 Type cloudValue;
367 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
368 std::vector<uint8_t> assetsBlob;
369 Assets assets;
370 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
371 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
372 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
373 for (size_t i = 0; i < assets.size(); ++i) {
374 EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
375 }
376 }
377 int errCode;
378 SQLiteUtils::ResetStatement(stmt, true, errCode);
379 }
380
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)381 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
382 {
383 Assets assets;
384 Asset asset = ASSET_COPY;
385 asset.name = ASSET_COPY.name + std::to_string(id);
386 asset.status = status;
387 assets.emplace_back(asset);
388 asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
389 assets.emplace_back(asset);
390 int errCode;
391 std::vector<uint8_t> assetBlob;
392 const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
393 sqlite3_stmt *stmt = nullptr;
394 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
395 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
396 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
397 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
398 SQLiteUtils::ResetStatement(stmt, true, errCode);
399 }
400
CheckConsistentCount(sqlite3 * db,int64_t expectCount)401 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
402 {
403 EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
404 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
405 }
406
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)407 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
408 {
409 EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
410 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
411 }
412
CloseDb()413 void CloseDb()
414 {
415 if (g_delegate != nullptr) {
416 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
417 g_delegate = nullptr;
418 }
419 delete g_observer;
420 g_virtualCloudDb = nullptr;
421 }
422
423 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
424 public:
425 static void SetUpTestCase(void);
426 static void TearDownTestCase(void);
427 void SetUp();
428 void TearDown();
429
430 protected:
431 void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
432 const std::set<int> &failIndex);
433 void CheckLocalAssetIsEmpty(const std::string &tableName);
434 void CheckCursorData(const std::string &tableName, int begin);
435 void WaitForSync(int &syncCount);
436 const RelationalSyncAbleStorage *GetRelationalStore();
437 void InitDataStatusTest(bool needDownload);
438 void DataStatusTest001(bool needDownload);
439 void DataStatusTest003();
440 void DataStatusTest004();
441 void DataStatusTest005();
442 void DataStatusTest006();
443 void DataStatusTest007();
444 sqlite3 *db = nullptr;
445 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
446 };
447
SetUpTestCase(void)448 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
449 {
450 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
451 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
452 LOGI("The test db is:%s", g_storePath.c_str());
453 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
454 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
455 }
456
TearDownTestCase(void)457 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
458
SetUp(void)459 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
460 {
461 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
462 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
463 LOGE("rm test db files error.");
464 }
465 DistributedDBToolsUnitTest::PrintTestCaseInfo();
466 LOGD("Test dir is %s", g_testDir.c_str());
467 db = RelationalTestUtils::CreateDataBase(g_storePath);
468 ASSERT_NE(db, nullptr);
469 InitDatabase(db);
470 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
471 ASSERT_NE(g_observer, nullptr);
472 ASSERT_EQ(
473 g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
474 DBStatus::OK);
475 ASSERT_NE(g_delegate, nullptr);
476 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
477 ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
478 ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
479 g_virtualCloudDb = make_shared<VirtualCloudDb>();
480 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
481 g_syncProcess = {};
482 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
483 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
484 DataBaseSchema dataBaseSchema;
485 GetCloudDbSchema(dataBaseSchema);
486 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
487 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
488 ASSERT_NE(g_cloudStoreHook, nullptr);
489 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
490 ASSERT_TRUE(communicatorAggregator_ != nullptr);
491 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
492 }
493
TearDown(void)494 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
495 {
496 RefObject::DecObjRef(g_store);
497 g_virtualCloudDb->ForkUpload(nullptr);
498 CloseDb();
499 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
500 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
501 LOGE("rm test db files error.");
502 }
503 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
504 communicatorAggregator_ = nullptr;
505 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
506 }
507
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)508 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
509 const std::string &expectAssetId, const std::set<int> &failIndex)
510 {
511 std::string sql = "SELECT assets FROM " + tableName + ";";
512 sqlite3_stmt *stmt = nullptr;
513 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
514 int index = 0;
515 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
516 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
517 Type cloudValue;
518 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
519 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
520 for (const auto &asset : assets) {
521 index++;
522 if (failIndex.find(index) != failIndex.end()) {
523 EXPECT_EQ(asset.assetId, "0");
524 } else {
525 EXPECT_EQ(asset.assetId, expectAssetId);
526 }
527 }
528 }
529 int errCode = E_OK;
530 SQLiteUtils::ResetStatement(stmt, true, errCode);
531 }
532
CheckLocalAssetIsEmpty(const std::string & tableName)533 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
534 {
535 std::string sql = "SELECT asset FROM " + tableName + ";";
536 sqlite3_stmt *stmt = nullptr;
537 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
538 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
539 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
540 }
541 int errCode = E_OK;
542 SQLiteUtils::ResetStatement(stmt, true, errCode);
543 }
544
CheckCursorData(const std::string & tableName,int begin)545 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
546 {
547 std::string logTableName = DBCommon::GetLogTableName(tableName);
548 std::string sql = "SELECT cursor FROM " + logTableName + ";";
549 sqlite3_stmt *stmt = nullptr;
550 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
551 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
552 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
553 Type cloudValue;
554 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
555 EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
556 begin++;
557 }
558 int errCode = E_OK;
559 SQLiteUtils::ResetStatement(stmt, true, errCode);
560 }
561
WaitForSync(int & syncCount)562 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
563 {
564 std::unique_lock<std::mutex> lock(g_processMutex);
565 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
566 [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
567 ASSERT_EQ(result, true);
568 }
569
GetRelationalStore()570 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
571 {
572 RelationalDBProperties properties;
573 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
574 int errCode = E_OK;
575 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
576 if (g_store == nullptr) {
577 return nullptr;
578 }
579 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
580 }
581
InitDataStatusTest(bool needDownload)582 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
583 {
584 int cloudCount = 20;
585 int localCount = 10;
586 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
587 if (needDownload) {
588 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
589 }
590 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
591 std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
592 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
593 sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
594 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
595 sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
596 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
597 std::this_thread::sleep_for(std::chrono::milliseconds(1));
598 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
599 std::this_thread::sleep_for(std::chrono::milliseconds(1));
600 sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
601 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
602 sql = "update " + logName + " SET status = 1 where data_key in (4);";
603 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
604 }
605
DataStatusTest001(bool needDownload)606 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
607 {
608 int cloudCount = 20;
609 int count = 0;
610 g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
611 count++;
612 if (count == 1) {
613 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
614 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
615 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
616 }
617 if (count == 2) { // 2 is compensated sync
618 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
619 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
620 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
621 g_processCondition.notify_one();
622 }
623 });
624 InitDataStatusTest(needDownload);
625 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
626 WaitForSync(count);
627 }
628
DataStatusTest003()629 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
630 {
631 int count = 0;
632 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
633 count++;
634 if (count == 1) {
635 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
636 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
637 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
638 }
639 if (count == 2) { // 2 is compensated sync
640 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
641 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
642 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
643 g_processCondition.notify_one();
644 }
645 });
646 int downLoadCount = 0;
647 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
648 std::map<std::string, Assets> &assets) {
649 downLoadCount++;
650 if (downLoadCount == 1) {
651 std::vector<std::vector<uint8_t>> hashKey;
652 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
653 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
654 }
655 });
656 InitDataStatusTest(true);
657 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
658 WaitForSync(count);
659 g_virtualAssetLoader->ForkDownload(nullptr);
660 }
661
DataStatusTest004()662 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
663 {
664 int count = 0;
665 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
666 count++;
667 if (count == 1) {
668 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
669 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
670 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
671 }
672 if (count == 2) { // 2 is compensated sync
673 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
674 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
675 CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
676 g_processCondition.notify_one();
677 }
678 });
679 int downLoadCount = 0;
680 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
681 std::map<std::string, Assets> &assets) {
682 downLoadCount++;
683 if (downLoadCount == 1) {
684 std::vector<std::vector<uint8_t>> hashKey;
685 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
686 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
687 std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
688 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
689 }
690 });
691 int queryIdx = 0;
692 g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
693 LOGD("query index:%d", ++queryIdx);
694 if (queryIdx == 4) { // 4 is compensated sync
695 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
696 " SET status = 1 where data_key=15;";
697 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
698 }
699 });
700 InitDataStatusTest(true);
701 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
702 WaitForSync(count);
703 g_virtualAssetLoader->ForkDownload(nullptr);
704 g_virtualCloudDb->ForkQuery(nullptr);
705 }
706
DataStatusTest005()707 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
708 {
709 int count = 0;
710 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
711 count++;
712 if (count == 1) {
713 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
714 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
715 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
716 }
717 if (count == 2) { // 2 is compensated sync
718 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
719 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
720 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
721 g_processCondition.notify_one();
722 }
723 });
724 int downLoadCount = 0;
725 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
726 std::map<std::string, Assets> &assets) {
727 downLoadCount++;
728 if (downLoadCount == 1) {
729 std::vector<std::vector<uint8_t>> hashKey;
730 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
731 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
732 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
733 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
734 }
735 });
736 InitDataStatusTest(true);
737 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
738 WaitForSync(count);
739 g_virtualAssetLoader->ForkDownload(nullptr);
740 }
741
DataStatusTest006()742 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
743 {
744 int count = 0;
745 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
746 count++;
747 if (count == 1) {
748 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
749 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
750 "(status = 0 and data_key in (11))";
751 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
752 }
753 if (count == 2) { // 2 is compensated sync
754 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
755 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
756 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
757 g_processCondition.notify_one();
758 }
759 });
760 int downLoadCount = 0;
761 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
762 std::map<std::string, Assets> &assets) {
763 downLoadCount++;
764 if (downLoadCount == 1) {
765 std::vector<std::vector<uint8_t>> hashKey;
766 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
767 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
768 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
769 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
770 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
771 }
772 });
773 InitDataStatusTest(true);
774 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
775 WaitForSync(count);
776 g_virtualAssetLoader->ForkDownload(nullptr);
777 }
778
DataStatusTest007()779 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
780 {
781 int count = 0;
782 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
783 count++;
784 if (count == 1) {
785 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
786 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
787 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
788 }
789 if (count == 2) { // 2 is compensated sync
790 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
791 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
792 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
793 g_processCondition.notify_one();
794 }
795 });
796 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
797 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
798 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
799 .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
800 std::map<std::string, Assets> &assets) {
801 return CLOUD_ERROR;
802 });
803 InitDataStatusTest(true);
804 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
805 WaitForSync(count);
806 }
807
808 /*
809 * @tc.name: DownloadAssetForDupDataTest001
810 * @tc.desc: Test the download interface call with duplicate data for the same primary key.
811 * @tc.type: FUNC
812 * @tc.require:
813 * @tc.author: liufuchenxing
814 */
815 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level1)
816 {
817 /**
818 * @tc.steps:step1. Mock asset download interface.
819 * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
820 */
821 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
822 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
823 int index = 1;
824 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
825 .Times(2)
826 .WillRepeatedly(
__anon09c4f3ca1102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 827 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
828 LOGD("Download GID:%s", gid.c_str());
829 CheckDownloadForTest001(index, assets);
830 index++;
831 return DBStatus::OK;
832 });
833
834 /**
835 * @tc.steps:step2. Insert local data [0, 10), sync data
836 * @tc.expected: step2. sync success.
837 */
838 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
839 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
840
841 /**
842 * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
843 * @tc.expected: step3. sync success.
844 */
845 DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
846 InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
847 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
848 }
849
850 /**
851 * @tc.name: FillAssetId001
852 * @tc.desc: Test if assetId is filled in single primary key table
853 * @tc.type: FUNC
854 * @tc.require:
855 * @tc.author: chenchaohao
856 */
857 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level1)
858 {
859 /**
860 * @tc.steps:step1. local insert assets and sync, check the local assetId.
861 * @tc.expected: step1. return OK.
862 */
863 int localCount = 50;
864 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
865 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
866 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
867
868 /**
869 * @tc.steps:step2. local update assets and sync ,check the local assetId.
870 * @tc.expected: step2. sync success.
871 */
872 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
873 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
874 CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
875 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
876 }
877
878 /**
879 * @tc.name: FillAssetId002
880 * @tc.desc: Test if assetId is filled in no primary key table
881 * @tc.type: FUNC
882 * @tc.require:
883 * @tc.author: chenchaohao
884 */
885 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level1)
886 {
887 /**
888 * @tc.steps:step1. local insert assets and sync, check the local assetId.
889 * @tc.expected: step1. return OK.
890 */
891 int localCount = 50;
892 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
893 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
894 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
895
896 /**
897 * @tc.steps:step2. local update assets and sync ,check the local assetId.
898 * @tc.expected: step2. sync success.
899 */
900 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
901 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
902 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
903 }
904
905 /**
906 * @tc.name: FillAssetId003
907 * @tc.desc: Test if assetId is filled in compound primary key table
908 * @tc.type: FUNC
909 * @tc.require:
910 * @tc.author: chenchaohao
911 */
912 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level1)
913 {
914 /**
915 * @tc.steps:step1. local insert assets and sync, check the local assetId.
916 * @tc.expected: step1. return OK.
917 */
918 int localCount = 50;
919 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
920 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
921 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
922
923 /**
924 * @tc.steps:step2. local update assets and sync ,check the local assetId.
925 * @tc.expected: step2. sync success.
926 */
927 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
928 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
929 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
930 }
931
932 /**
933 * @tc.name: FillAssetId004
934 * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
935 * @tc.type: FUNC
936 * @tc.require:
937 * @tc.author: chenchaohao
938 */
939 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level1)
940 {
941 /**
942 * @tc.steps:step1. local insert assets and sync, check the local assetId.
943 * @tc.expected: step1. return OK.
944 */
945 int localCount = 50;
946 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
947 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
948 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
949
950 /**
951 * @tc.steps:step2. local update assets and sync ,check the local assetId.
952 * @tc.expected: step2. sync success.
953 */
954 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
955 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
956 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
957 }
958
959 /**
960 * @tc.name: FillAssetId001
961 * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
962 * @tc.type: FUNC
963 * @tc.require:
964 * @tc.author: chenchaohao
965 */
966 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level1)
967 {
968 /**
969 * @tc.steps:step1. local insert assets and sync, check the local assetId.
970 * @tc.expected: step1. return OK.
971 */
972 int localCount = 50;
973 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
974 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
975 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
976
977 /**
978 * @tc.steps:step2. local update assets and sync ,check the local assetId.
979 * @tc.expected: step2. sync success.
980 */
981 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
982 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
983 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
984 }
985
986 /**
987 * @tc.name: FillAssetId006
988 * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
989 * @tc.type: FUNC
990 * @tc.require:
991 * @tc.author: chenchaohao
992 */
993 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level1)
994 {
995 /**
996 * @tc.steps:step1. local insert assets and sync, check the local assetId.
997 * @tc.expected: step1. return OK.
998 */
999 int localCount = 50;
1000 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
1001 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1002 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1003
1004 /**
1005 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1006 * @tc.expected: step2. sync success.
1007 */
1008 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
1009 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1010 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
1011 }
1012
1013 /**
1014 * @tc.name: FillAssetId007
1015 * @tc.desc: Test if assetId is filled when extend lack of assets
1016 * @tc.type: FUNC
1017 * @tc.require:
1018 * @tc.author: chenchaohao
1019 */
1020 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level1)
1021 {
1022 CloudSyncConfig config;
1023 config.maxUploadCount = 200; // max upload 200
1024 g_delegate->SetCloudSyncConfig(config);
1025 /**
1026 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1027 * @tc.expected: step1. return OK.
1028 */
1029 int localCount = 50;
1030 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1202(const std::string &tableName, VBucket &extend) 1031 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1032 extend.erase("assets");
1033 });
1034 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1035 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1036
1037 /**
1038 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1039 * @tc.expected: step2. sync success.
1040 */
1041 int addLocalCount = 10;
1042 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1302(const std::string &tableName, VBucket &extend) 1043 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1044 if (extend.find("assets") != extend.end()) {
1045 for (auto &asset : std::get<Assets>(extend["assets"])) {
1046 asset.name = "pad";
1047 }
1048 }
1049 });
1050 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1051 int beginFailFillNum = 101;
1052 int endFailFillNum = 120;
1053 std::set<int> index;
1054 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1055 index.insert(i);
1056 }
1057 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1058
1059 /**
1060 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1061 * @tc.expected: step2. sync success.
1062 */
1063 g_virtualCloudDb->ForkUpload(nullptr);
1064 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1065 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1066 }
1067
1068 /**
1069 * @tc.name: FillAssetId008
1070 * @tc.desc: Test if assetId is filled when extend lack of assetId
1071 * @tc.type: FUNC
1072 * @tc.require:
1073 * @tc.author: chenchaohao
1074 */
1075 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level1)
1076 {
1077 /**
1078 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1079 * @tc.expected: step1. return OK.
1080 */
1081 int localCount = 50;
1082 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1402(const std::string &tableName, VBucket &extend) 1083 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1084 if (extend.find("assets") != extend.end()) {
1085 for (auto &asset : std::get<Assets>(extend["assets"])) {
1086 asset.assetId = "";
1087 }
1088 }
1089 });
1090 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1091 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1092
1093 /**
1094 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1095 * @tc.expected: step2. sync success.
1096 */
1097 g_virtualCloudDb->ForkUpload(nullptr);
1098 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1099 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1100 }
1101
1102 /**
1103 * @tc.name: FillAssetId009
1104 * @tc.desc: Test if assetId is filled when extend exists useless assets
1105 * @tc.type: FUNC
1106 * @tc.require:
1107 * @tc.author: chenchaohao
1108 */
1109 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level1)
1110 {
1111 /**
1112 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1113 * @tc.expected: step1. return OK.
1114 */
1115 int localCount = 50;
1116 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1502(const std::string &tableName, VBucket &extend) 1117 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1118 if (extend.find("assets") != extend.end()) {
1119 Asset asset = ASSET_COPY2;
1120 Assets &assets = std::get<Assets>(extend["assets"]);
1121 assets.push_back(asset);
1122 }
1123 });
1124 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1125 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1126 }
1127
1128 /**
1129 * @tc.name: FillAssetId010
1130 * @tc.desc: Test if assetId is filled when some success and some fail
1131 * @tc.type: FUNC
1132 * @tc.require:
1133 * @tc.author: chenchaohao
1134 */
1135 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level1)
1136 {
1137 /**
1138 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1139 * @tc.expected: step1. return OK.
1140 */
1141 int localCount = 30;
1142 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1143 g_virtualCloudDb->SetInsertFailed(1);
1144 std::atomic<int> count = 0;
__anon09c4f3ca1602(const std::string &tableName, VBucket &extend) 1145 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1146 if (extend.find("assets") != extend.end() && count == 0) {
1147 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1148 count++;
1149 }
1150 });
1151 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1152 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1153 }
1154
1155 /**
1156 * @tc.name: FillAssetId011
1157 * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1158 * @tc.type: FUNC
1159 * @tc.require:
1160 * @tc.author: chenchaohao
1161 */
1162 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level1)
1163 {
1164 /**
1165 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1166 * @tc.expected: step1. return OK.
1167 */
1168 int localCount = 50;
1169 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1170 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1171 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1172
1173 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1174 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1175 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1176 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1177 }
1178
1179 /**
1180 * @tc.name: FillAssetId012
1181 * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1182 * @tc.type: FUNC
1183 * @tc.require:
1184 * @tc.author: chenchaohao
1185 */
1186 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level1)
1187 {
1188 /**
1189 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1190 * @tc.expected: step1. return OK.
1191 */
1192 int localCount = 50;
1193 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1194 std::atomic<int> count = 1;
1195 g_virtualCloudDb->SetClearExtend(count);
1196 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1197 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1198
1199 /**
1200 * @tc.steps:step2. set extend size normal then sync, check the asseid.
1201 * @tc.expected: step2. return OK.
1202 */
1203 g_virtualCloudDb->SetClearExtend(0);
1204 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1205 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1206
1207 /**
1208 * @tc.steps:step3. set extend size large then sync, check the asseid.
1209 * @tc.expected: step3. return OK.
1210 */
1211 count = -1; // -1 means extend push a empty vBucket
1212 g_virtualCloudDb->SetClearExtend(count);
1213 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1214 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1215 }
1216
1217 /**
1218 * @tc.name: FillAssetId013
1219 * @tc.desc: Test fill assetId and removedevicedata when data is delete
1220 * @tc.type: FUNC
1221 * @tc.require:
1222 * @tc.author: chenchaohao
1223 */
1224 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level1)
1225 {
1226 /**
1227 * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1228 * @tc.expected: step1. return OK.
1229 */
1230 int localCount = 20;
1231 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1232 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1233 int deleteLocalCount = 10;
1234 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1235 int addLocalCount = 30;
1236 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1237 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1238
1239 /**
1240 * @tc.steps:step2. RemoveDeviceData.
1241 * @tc.expected: step2. return OK.
1242 */
1243 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1244 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1245 }
1246
1247 /**
1248 * @tc.name: FillAssetId014
1249 * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1250 * @tc.type: FUNC
1251 * @tc.require:
1252 * @tc.author: bty
1253 */
1254 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level1)
1255 {
1256 /**
1257 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1258 * @tc.expected: step1. return OK.
1259 */
1260 int localCount = 50;
1261 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1262 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1263 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1264
1265 /**
1266 * @tc.steps:step2. RemoveDeviceData
1267 * @tc.expected: step2. return OK.
1268 */
1269 Assets assets;
1270 std::vector<AssetStatus> statusVec = {
1271 AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1272 AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1273 };
1274 for (auto &status : statusVec) {
1275 Asset temp = ASSET_COPY;
1276 temp.name += std::to_string(status);
1277 temp.status = status | AssetStatus::UPLOADING;
1278 assets.emplace_back(temp);
1279 }
1280 UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1281 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1282 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1283
1284 /**
1285 * @tc.steps:step3. check status
1286 * @tc.expected: step3. return OK.
1287 */
1288 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1289 sqlite3_stmt *stmt = nullptr;
1290 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1291 int index = 0;
1292 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1293 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1294 Type cloudValue;
1295 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1296 Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1297 for (const auto &ast : newAssets) {
1298 EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1299 }
1300 }
1301 int errCode = E_OK;
1302 SQLiteUtils::ResetStatement(stmt, true, errCode);
1303 }
1304
1305 /**
1306 * @tc.name: FillAssetId015
1307 * @tc.desc: Test if fill assetId when upload return cloud network error
1308 * @tc.type: FUNC
1309 * @tc.require:
1310 * @tc.author: chenchaohao
1311 */
1312 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level1)
1313 {
1314 /**
1315 * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1316 * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1317 */
1318 int localCount = 20;
1319 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1320 g_virtualCloudDb->SetCloudNetworkError(true);
1321 std::atomic<int> count = 0;
__anon09c4f3ca1702(const std::string &tableName, VBucket &extend) 1322 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1323 if (extend.find("assets") != extend.end() && count == 0) {
1324 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1325 count++;
1326 }
1327 });
1328 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1329 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1330 g_virtualCloudDb->SetCloudNetworkError(false);
1331 g_virtualCloudDb->ForkUpload(nullptr);
1332 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1333 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1334
1335 /**
1336 * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1337 * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1338 */
1339 int addLocalCount = 10;
1340 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1341 std::atomic<int> num = 0;
__anon09c4f3ca1802(const std::string &tableName, VBucket &extend) 1342 g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1343 if (extend.find("assets") != extend.end() && num == 0) {
1344 for (auto &asset : std::get<Assets>(extend["assets"])) {
1345 asset.name = "pad";
1346 break;
1347 }
1348 num++;
1349 }
1350 });
1351 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1352 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1353 }
1354
1355 /**
1356 * @tc.name: FillAssetId016
1357 * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1358 * @tc.type: FUNC
1359 * @tc.require:
1360 * @tc.author: chenchaohao
1361 */
1362 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level1)
1363 {
1364 /**
1365 * @tc.steps:step1. local insert data and sync, then delete last local data
1366 * @tc.expected: step1. return OK.
1367 */
1368 int localCount = 20;
1369 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1370 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1371 int deletLocalCount = 10;
1372 DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1373 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1374
1375 /**
1376 * @tc.steps:step2. RemoveDeviceData.
1377 * @tc.expected: step2. return OK.
1378 */
1379 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1380 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1381 }
1382
1383 /**
1384 * @tc.name: FillAssetId017
1385 * @tc.desc: Test cursor when download not change
1386 * @tc.type: FUNC
1387 * @tc.require:
1388 * @tc.author: chenchaohao
1389 */
1390 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level1)
1391 {
1392 /**
1393 * @tc.steps:step1. local insert data and sync,check cursor.
1394 * @tc.expected: step1. return OK.
1395 */
1396 int localCount = 20;
1397 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1398 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1399 CheckCursorData(ASSETS_TABLE_NAME, 1);
1400
1401 /**
1402 * @tc.steps:step2. sync again and optype is not change, check cursor.
1403 * @tc.expected: step2. return OK.
1404 */
1405 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1406 CheckCursorData(ASSETS_TABLE_NAME, 1);
1407 }
1408
1409 /**
1410 * @tc.name: FillAssetId018
1411 * @tc.desc: Test if assetId is filled when contains "#_error"
1412 * @tc.type: FUNC
1413 * @tc.require:
1414 * @tc.author: zhaoliang
1415 */
1416 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level1)
1417 {
1418 /**
1419 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1420 * @tc.expected: step1. return OK.
1421 */
1422 int localCount = 30;
1423 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1424 std::atomic<int> count = 0;
__anon09c4f3ca1902(const std::string &tableName, VBucket &extend) 1425 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1426 if (extend.find("assets") != extend.end() && count == 0) {
1427 extend["#_error"] = std::string("test");
1428 count++;
1429 }
1430 });
1431 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1432 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1433 }
1434
1435 /**
1436 * @tc.name: DownloadAssetForDupDataTest002
1437 * @tc.desc: Test download failed
1438 * @tc.type: FUNC
1439 * @tc.require:
1440 * @tc.author: bty
1441 */
1442 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level1)
1443 {
1444 /**
1445 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1446 * @tc.expected: step1. return OK
1447 */
1448 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1449 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1450 int index = 0;
1451 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1452 .WillRepeatedly(
__anon09c4f3ca1a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1453 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1454 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1455 return DBStatus::CLOUD_ERROR;
1456 });
1457
1458 /**
1459 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1460 * @tc.expected: step2. sync success.
1461 */
1462 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1463 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1464
1465 /**
1466 * @tc.steps:step3. check if the hash of assets in db is empty
1467 * @tc.expected: step3. OK
1468 */
1469 CheckDownloadFailedForTest002(db);
1470 }
1471
1472 /**
1473 * @tc.name: DownloadAssetForDupDataTest003
1474 * @tc.desc: Test download failed and flag was modified
1475 * @tc.type: FUNC
1476 * @tc.require:
1477 * @tc.author: bty
1478 */
1479 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level1)
1480 {
1481 /**
1482 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1483 * @tc.expected: step1. return OK
1484 */
1485 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1486 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1487 int index = 0;
1488 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1489 .WillRepeatedly(
__anon09c4f3ca1b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1490 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1491 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1492 for (auto &item : assets) {
1493 for (auto &asset : item.second) {
1494 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1495 }
1496 }
1497 return DBStatus::CLOUD_ERROR;
1498 });
1499
1500 /**
1501 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1502 * @tc.expected: step2. sync success.
1503 */
1504 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1505 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1506
1507 /**
1508 * @tc.steps:step3. check if the hash of assets in db is empty
1509 * @tc.expected: step3. OK
1510 */
1511 CheckDownloadFailedForTest002(db);
1512 }
1513
1514 /**
1515 * @tc.name: DownloadAssetForDupDataTest004
1516 * @tc.desc: test sync with deleted assets
1517 * @tc.type: FUNC
1518 * @tc.require:
1519 * @tc.author: bty
1520 */
1521 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level1)
1522 {
1523 /**
1524 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1525 * @tc.expected: step1. return OK
1526 */
1527 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1528 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1529 int index = 0;
1530 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1531 .WillRepeatedly(
__anon09c4f3ca1c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1532 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1533 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1534 return DBStatus::OK;
1535 });
1536
1537 /**
1538 * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1539 * @tc.expected: step2. return OK
1540 */
1541 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1542 UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1543 UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1544 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1545
1546 /**
1547 * @tc.steps:step3. sync, check download num
1548 * @tc.expected: step3. return OK
1549 */
1550 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1551 EXPECT_GE(index, 2); // 2 is download num
1552 }
1553
1554 /**
1555 * @tc.name: DownloadAssetForDupDataTest005
1556 * @tc.desc: test DOWNLOADING status of asset after uploading
1557 * @tc.type: FUNC
1558 * @tc.require:
1559 * @tc.author: bty
1560 */
1561 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level1)
1562 {
1563 /**
1564 * @tc.steps:step1. init data and sync
1565 * @tc.expected: step1. return OK
1566 */
1567 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1568 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1569 UpdateAssetsForLocal(db, 6, AssetStatus::DOWNLOADING); // 6 is id
1570 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1571
1572 /**
1573 * @tc.steps:step2. check asset status
1574 * @tc.expected: step2. return OK
1575 */
1576 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1577 sqlite3_stmt *stmt = nullptr;
1578 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1579 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1580 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1581 Type cloudValue;
1582 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1583 std::vector<uint8_t> assetsBlob;
1584 Assets assets;
1585 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1586 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1587 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1588 for (size_t i = 0; i < assets.size(); ++i) {
1589 EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1590 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1591 }
1592 }
1593 int errCode;
1594 SQLiteUtils::ResetStatement(stmt, true, errCode);
1595 }
1596
1597 /**
1598 * @tc.name: FillAssetId019
1599 * @tc.desc: Test the stability of cleaning asset id
1600 * @tc.type: FUNC
1601 * @tc.require:
1602 * @tc.author: bty
1603 */
1604 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level1)
1605 {
1606 /**
1607 * @tc.steps:step1. local insert assets and sync.
1608 * @tc.expected: step1. return OK.
1609 */
1610 int localCount = 20;
1611 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1612 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1613
1614 /**
1615 * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1616 * @tc.expected: step2. return OK.
1617 */
1618 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1619 + " set data_key='999' where data_key>'10';";
1620 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1621 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1622 }
1623
1624 /**
1625 * @tc.name: FillAssetId020
1626 * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1627 * @tc.type: FUNC
1628 * @tc.require:
1629 * @tc.author: zhangtao
1630 */
1631 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level1)
1632 {
1633 CloudSyncConfig config;
1634 config.maxUploadCount = 200; // max upload 200
1635 g_delegate->SetCloudSyncConfig(config);
1636
1637 /**
1638 * @tc.steps:step1. local insert assets and erase assets extends
1639 * @tc.expected: step1. return OK.
1640 */
1641 int localCount = 50;
1642 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1d02(const std::string &tableName, VBucket &extend) 1643 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1644 extend.erase("assets");
1645 });
1646 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1647 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1648
1649 /**
1650 * @tc.steps:step2. local insert assets and modify assetId to empty
1651 * @tc.expected: step2. return OK.
1652 */
1653 int addLocalCount = 10;
1654 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1e02(const std::string &tableName, VBucket &extend) 1655 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1656 if (extend.find("assets") != extend.end()) {
1657 for (auto &asset : std::get<Assets>(extend["assets"])) {
1658 asset.assetId = "";
1659 }
1660 }
1661 });
1662 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1663 int beginFailFillNum = 101;
1664 int endFailFillNum = 120;
1665 std::set<int> index;
1666 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1667 index.insert(i);
1668 }
1669 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1670
1671 /**
1672 * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1673 * @tc.expected: step3. return OK.
1674 */
1675 InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon09c4f3ca1f02(const std::string &tableName, VBucket &extend) 1676 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1677 if (extend.find("assets") != extend.end()) {
1678 for (auto &asset : std::get<Assets>(extend["assets"])) {
1679 asset.name = "mod_pat";
1680 }
1681 }
1682 });
1683 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1684 beginFailFillNum = 121;
1685 endFailFillNum = 140;
1686 std::set<int> newIndex;
1687 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1688 newIndex.insert(i);
1689 }
1690 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1691
1692 /**
1693 * @tc.steps:step4. local update assets and sync, check the local assetId.
1694 * @tc.expected: step4. sync success.
1695 */
1696 g_virtualCloudDb->ForkUpload(nullptr);
1697 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1698 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1699 }
1700
1701 /**
1702 * @tc.name: FillAssetId021
1703 * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1704 * @tc.type: FUNC
1705 * @tc.require:
1706 * @tc.author: zhangtao
1707 */
1708 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level1)
1709 {
1710 CloudSyncConfig config;
1711 config.maxUploadCount = 200; // max upload 200
1712 g_delegate->SetCloudSyncConfig(config);
1713
1714 /**
1715 * @tc.steps:step1. local insert assets and erase assets extends
1716 * @tc.expected: step1. return OK.
1717 */
1718 int localCount = 50;
1719 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1720
1721 /**
1722 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1723 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1724 */
1725 int uploadFailId = 0;
1726 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1727 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1728 uploadFailId++;
1729 if (uploadFailId == 25) { // 25 is the middle record
1730 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1731 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1732 }
1733 return OK;
1734 });
1735
1736 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1737 int beginFailFillNum = 49;
1738 int endFailFillNum = 50;
1739 std::set<int> index;
1740 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1741 index.insert(i);
1742 }
1743 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1744 g_virtualCloudDb->ForkUpload(nullptr);
1745 }
1746
1747 /**
1748 * @tc.name: FillAssetId022
1749 * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1750 * @tc.type: FUNC
1751 * @tc.require:
1752 * @tc.author: zhangtao
1753 */
1754 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level1)
1755 {
1756 CloudSyncConfig config;
1757 config.maxUploadCount = 200; // max upload 200
1758 g_delegate->SetCloudSyncConfig(config);
1759
1760 /**
1761 * @tc.steps:step1. local insert assets and erase assets extends
1762 * @tc.expected: step1. return OK.
1763 */
1764 int localCount = 50;
1765 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1766
1767 /**
1768 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1769 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1770 */
1771 int uploadFailId = 0;
1772 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1773 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1774 uploadFailId++;
1775 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1776 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1777 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1778 }
1779 return OK;
1780 });
1781
1782 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1783 int beginFailFillNum = 49;
1784 int endFailFillNum = 54;
1785 std::set<int> index;
1786 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1787 index.insert(i);
1788 }
1789 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1790 g_virtualCloudDb->ForkUpload(nullptr);
1791 }
1792
1793 /**
1794 * @tc.name: FillAssetId023
1795 * @tc.desc: Test if BatchUpdate with local assets missing
1796 * @tc.type: FUNC
1797 * @tc.require:
1798 * @tc.author: zhangtao
1799 */
1800 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level1)
1801 {
1802 /**
1803 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1804 * @tc.expected: step1. return OK.
1805 */
1806 int localCount = 50;
1807 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1808 std::atomic<int> count = 1;
1809 g_virtualCloudDb->SetClearExtend(count);
1810 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1811 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1812
1813 /**
1814 * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1815 * @tc.expected: step2. return OK.
1816 */
1817 g_virtualCloudDb->SetClearExtend(0);
1818
1819 int uploadFailId = 0;
1820 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1821 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1822 uploadFailId++;
1823 if (uploadFailId == 25) { // 25 is the middle record
1824 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1825 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1826 }
1827 return OK;
1828 });
1829 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1830 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1831 }
1832
1833 /**
1834 * @tc.name: FillAssetId024
1835 * @tc.desc: Test if BatchUpdate with multiple local assets missing
1836 * @tc.type: FUNC
1837 * @tc.require:
1838 * @tc.author: zhangtao
1839 */
1840 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level1)
1841 {
1842 /**
1843 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1844 * @tc.expected: step1. return OK.
1845 */
1846 int localCount = 50;
1847 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1848 std::atomic<int> count = 1;
1849 g_virtualCloudDb->SetClearExtend(count);
1850 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1851 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1852
1853 /**
1854 * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1855 * @tc.expected: step2. return OK.
1856 */
1857 g_virtualCloudDb->SetClearExtend(0);
1858
1859 int uploadFailId = 0;
1860 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1861 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1862 uploadFailId++;
1863 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1864 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1865 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1866 }
1867 return OK;
1868 });
1869 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1870 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1871 }
1872
1873 /**
1874 * @tc.name: FillAssetId025
1875 * @tc.desc: Test if BatchInsert with local assets missing and missing record added into successCount
1876 * @tc.type: FUNC
1877 * @tc.require:
1878 * @tc.author: zhangtao
1879 */
1880 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId025, TestSize.Level1)
1881 {
1882 CloudSyncConfig config;
1883 config.maxUploadCount = 200; // max upload 200
1884 g_delegate->SetCloudSyncConfig(config);
1885 /**
1886 * @tc.steps:step1. insert local data.
1887 * @tc.expected: step1. return OK.
1888 */
1889 int localCount = 40;
1890 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1891
1892 /**
1893 * @tc.steps:step2. BatchInsert with local assets missing then sync, check the asseid.
1894 * @tc.expected: step2. return OK.
1895 */
1896 int uploadFailId = 0;
1897 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon09c4f3ca2402(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1898 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1899 uploadFailId++;
1900 if (uploadFailId == 25) { // 25 is the middle record
1901 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1902 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1903 }
1904 return OK;
1905 });
1906 g_syncProcess = {};
1907 Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
1908 std::vector<TableProcessInfo> expectProcess = {
1909 { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
1910 { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
1911 };
1912
1913 /**
1914 * @tc.steps:step3. Check if sync process consistent with exptectProcess
1915 * @tc.expected: step3. return OK.
1916 */
1917 int index = 0;
__anon09c4f3ca2502(const std::map<std::string, SyncProcess> &process) 1918 CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
1919 g_syncProcess = std::move(process.begin()->second);
1920 ASSERT_LT(index, 2);
1921 for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
1922 EXPECT_EQ(info.process, expectProcess[index].process);
1923 EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
1924 EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
1925 EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
1926 EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
1927 }
1928 index++;
1929 if (g_syncProcess.process == FINISHED) {
1930 g_processCondition.notify_one();
1931 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1932 }
1933 };
1934 ASSERT_EQ(g_delegate->Sync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
1935 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
1936
1937 /**
1938 * @tc.steps:step4. Check assets results
1939 * @tc.expected: step4. return OK.
1940 */
1941 int beginFailFillNum = 49;
1942 int endFailFillNum = 50;
1943 std::set<int> indexes;
1944 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1945 indexes.insert(i);
1946 }
1947 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", indexes);
1948 g_virtualCloudDb->ForkUpload(nullptr);
1949 }
1950
1951 /**
1952 * @tc.name: ConsistentFlagTest001
1953 * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1954 * @tc.type: FUNC
1955 * @tc.require:
1956 * @tc.author: bty
1957 */
1958 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level1)
1959 {
1960 /**
1961 * @tc.steps:step1. init data for the different asset, sync and check flag
1962 * @tc.expected: step1. return OK.
1963 */
1964 int localCount = 10; // 10 is num of local
1965 int cloudCount = 20; // 20 is num of cloud
1966 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1967 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1968 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1969 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1970 CheckConsistentCount(db, cloudCount);
1971
1972 /**
1973 * @tc.steps:step2. update local data, sync and check flag
1974 * @tc.expected: step2. return OK.
1975 */
1976 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1977 DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1978 CheckConsistentCount(db, 0L);
1979 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1980 CheckConsistentCount(db, cloudCount);
1981 }
1982
1983 /**
1984 * @tc.name: ConsistentFlagTest002
1985 * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1986 * @tc.type: FUNC
1987 * @tc.require:
1988 * @tc.author: bty
1989 */
1990 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level1)
1991 {
1992 /**
1993 * @tc.steps:step1. init data for the same asset, sync and check flag
1994 * @tc.expected: step1. return OK.
1995 */
1996 int cloudCount = 20; // 20 is num of cloud
1997 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1998 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1999 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2000 CheckConsistentCount(db, cloudCount);
2001
2002 /**
2003 * @tc.steps:step2. update local data, sync and check flag
2004 * @tc.expected: step2. return OK.
2005 */
2006 int deleteLocalCount = 5;
2007 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
2008 CheckConsistentCount(db, cloudCount - deleteLocalCount);
2009 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2010 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2011 CheckConsistentCount(db, cloudCount);
2012 }
2013
2014 /**
2015 * @tc.name: ConsistentFlagTest003
2016 * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
2017 * @tc.type: FUNC
2018 * @tc.require:
2019 * @tc.author: bty
2020 */
2021 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level1)
2022 {
2023 /**
2024 * @tc.steps:step1. init data
2025 * @tc.expected: step1. return OK.
2026 */
2027 int localCount = 20; // 20 is num of local
2028 int cloudCount = 10; // 10 is num of cloud
2029 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2030 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2031 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2032
2033 /**
2034 * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
2035 * @tc.expected: step2. return OK.
2036 */
2037 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2038 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2039 int index = 0;
2040 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2041 .WillRepeatedly(
__anon09c4f3ca2602(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2042 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2043 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2044 if (index == 1) { // 1 is first download
2045 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2046 }
2047 return DBStatus::OK;
2048 });
2049
2050 /**
2051 * @tc.steps:step3. fork upload, check consistent count
2052 * @tc.expected: step3. return OK.
2053 */
2054 int upIdx = 0;
__anon09c4f3ca2702(const std::string &tableName, VBucket &extend) 2055 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2056 LOGD("upload index:%d", ++upIdx);
2057 if (upIdx == 1) { // 1 is first upload
2058 CheckConsistentCount(db, localCount - cloudCount - 1);
2059 }
2060 });
2061
2062 /**
2063 * @tc.steps:step4. fork query, check consistent count
2064 * @tc.expected: step4. return OK.
2065 */
2066 int queryIdx = 0;
__anon09c4f3ca2802(const std::string &, VBucket &) 2067 g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
2068 LOGD("query index:%d", ++queryIdx);
2069 if (queryIdx == 3) { // 3 is the last query
2070 CheckConsistentCount(db, localCount - 1);
2071 }
2072 });
2073 int count = 0;
__anon09c4f3ca2902() 2074 g_cloudStoreHook->SetSyncFinishHook([&count]() {
2075 count++;
2076 if (count == 2) { // 2 is compensated sync
2077 g_processCondition.notify_one();
2078 }
2079 });
2080 /**
2081 * @tc.steps:step5. sync, check consistent count
2082 * @tc.expected: step5. return OK.
2083 */
2084 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2085 WaitForSync(count);
2086 CheckConsistentCount(db, localCount);
2087 }
2088
2089 /**
2090 * @tc.name: ConsistentFlagTest004
2091 * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
2092 * @tc.type: FUNC
2093 * @tc.require:
2094 * @tc.author: bty
2095 */
2096 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level1)
2097 {
2098 /**
2099 * @tc.steps:step1. init data
2100 * @tc.expected: step1. return OK.
2101 */
2102 int localCount = 20; // 20 is num of local
2103 int cloudCount = 10; // 10 is num of cloud
2104 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2105 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2106 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2107
2108 /**
2109 * @tc.steps:step2. fork upload, return error filed of CLOUD_NETWORK_ERROR
2110 * @tc.expected: step2. return OK.
2111 */
2112 int upIdx = 0;
__anon09c4f3ca2a02(const std::string &tableName, VBucket &extend) 2113 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2114 LOGD("upload index:%d", ++upIdx);
2115 if (upIdx == 1) {
2116 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2117 }
2118 });
2119
2120 /**
2121 * @tc.steps:step3. sync, check consistent count
2122 * @tc.expected: step3. return OK.
2123 */
2124 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2125 CheckConsistentCount(db, localCount - 1);
2126
2127 /**
2128 * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2129 * @tc.expected: step4. return OK.
2130 */
2131 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2132 upIdx = 0;
__anon09c4f3ca2b02(const std::string &tableName, VBucket &extend) 2133 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2134 LOGD("upload index:%d", ++upIdx);
2135 if (upIdx == 1) {
2136 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2137 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2138 }
2139 if (upIdx == 2) {
2140 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2141 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2142 }
2143 });
2144
2145 /**
2146 * @tc.steps:step5. sync, check consistent count
2147 * @tc.expected: step5. return OK.
2148 */
2149 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2150 CheckConsistentCount(db, localCount - 2);
2151 }
2152
2153 /**
2154 * @tc.name: ConsistentFlagTest005
2155 * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2156 * @tc.type: FUNC
2157 * @tc.require:
2158 * @tc.author: bty
2159 */
2160 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level1)
2161 {
2162 /**
2163 * @tc.steps:step1. init data
2164 * @tc.expected: step1. return OK.
2165 */
2166 int localCount = 20; // 20 is num of local
2167 int cloudCount = 10; // 10 is num of cloud
2168 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2169 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2170 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2171
2172 /**
2173 * @tc.steps:step2. fork download, update local assets where id=2
2174 * @tc.expected: step2. return OK.
2175 */
2176 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2177 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2178 int index = 0;
2179 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2180 .WillRepeatedly(
2181 [this, &index](const std::string &, const std::string &gid, const Type &,
__anon09c4f3ca2c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2182 std::map<std::string, Assets> &assets) {
2183 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2184 if (index == 1) { // 1 is first download
2185 std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2186 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2187 }
2188 return DBStatus::OK;
2189 });
2190
2191 /**
2192 * @tc.steps:step3. fork upload, check consistent count
2193 * @tc.expected: step3. return OK.
2194 */
2195 int upIdx = 0;
__anon09c4f3ca2d02(const std::string &tableName, VBucket &extend) 2196 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2197 LOGD("upload index:%d", ++upIdx);
2198 if (upIdx == 1) { // 1 is first upload
2199 CheckConsistentCount(db, localCount - cloudCount - 1);
2200 }
2201 });
2202
2203 /**
2204 * @tc.steps:step4. sync, check consistent count
2205 * @tc.expected: step4. return OK.
2206 */
2207 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2208 CheckConsistentCount(db, localCount);
2209 }
2210
2211 /**
2212 * @tc.name: ConsistentFlagTest006
2213 * @tc.desc:
2214 * @tc.type: FUNC
2215 * @tc.require:
2216 * @tc.author: bty
2217 */
2218 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level1)
2219 {
2220 /**
2221 * @tc.steps:step1. init data
2222 * @tc.expected: step1. return OK.
2223 */
2224 int cloudCount = 10; // 10 is num of cloud
2225 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2226 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2227
2228 /**
2229 * @tc.steps:step2. fork download, update local assets where id=2
2230 * @tc.expected: step2. return OK.
2231 */
2232 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2233 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2234 int delCount = 3; // 3 is num of cloud
2235 DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2236 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2237 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2238 int index = 0;
2239 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2240 .WillRepeatedly(
2241 [&index](const std::string &, const std::string &gid, const Type &,
__anon09c4f3ca2e02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2242 std::map<std::string, Assets> &assets) {
2243 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2244 if (index == 1) { // 1 is first download
2245 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2246 }
2247 return DBStatus::OK;
2248 });
2249
2250 /**
2251 * @tc.steps:step3. fork upload, check consistent count
2252 * @tc.expected: step3. return OK.
2253 */
2254 int upIdx = 0;
__anon09c4f3ca2f02(const std::string &tableName, VBucket &extend) 2255 g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2256 LOGD("upload index:%d", ++upIdx);
2257 if (upIdx == 1) { // 1 is first upload
2258 CheckConsistentCount(db, delCount);
2259 CheckCompensatedCount(db, 0L);
2260 }
2261 });
2262
2263 /**
2264 * @tc.steps:step4. sync, check consistent count
2265 * @tc.expected: step4. return OK.
2266 */
2267 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2268 CheckConsistentCount(db, cloudCount);
2269 }
2270
2271 /**
2272 * @tc.name: SyncDataStatusTest001
2273 * @tc.desc: No need to download asset, check status after sync
2274 * @tc.type: FUNC
2275 * @tc.require:
2276 * @tc.author: bty
2277 */
2278 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level1)
2279 {
2280 DataStatusTest001(false);
2281 }
2282
2283 /**
2284 * @tc.name: SyncDataStatusTest002
2285 * @tc.desc: Need to download asset, check status after sync
2286 * @tc.type: FUNC
2287 * @tc.require:
2288 * @tc.author: bty
2289 */
2290 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level1)
2291 {
2292 DataStatusTest001(true);
2293 }
2294
2295 /**
2296 * @tc.name: SyncDataStatusTest003
2297 * @tc.desc: Lock during download and check status
2298 * @tc.type: FUNC
2299 * @tc.require:
2300 * @tc.author: bty
2301 */
2302 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level1)
2303 {
2304 DataStatusTest003();
2305 }
2306
2307 /**
2308 * @tc.name: SyncDataStatusTest004
2309 * @tc.desc: Lock and delete during download, check status
2310 * @tc.type: FUNC
2311 * @tc.require:
2312 * @tc.author: bty
2313 */
2314 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level1)
2315 {
2316 DataStatusTest004();
2317 }
2318
2319 /**
2320 * @tc.name: SyncDataStatusTest005
2321 * @tc.desc: Lock and update during download, check status
2322 * @tc.type: FUNC
2323 * @tc.require:
2324 * @tc.author: bty
2325 */
2326 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level1)
2327 {
2328 DataStatusTest005();
2329 }
2330
2331 /**
2332 * @tc.name: SyncDataStatusTest006
2333 * @tc.desc: Lock and update and Unlock during download, check status
2334 * @tc.type: FUNC
2335 * @tc.require:
2336 * @tc.author: bty
2337 */
2338 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level1)
2339 {
2340 DataStatusTest006();
2341 }
2342
2343 /**
2344 * @tc.name: SyncDataStatusTest007
2345 * @tc.desc: Download return error, check status
2346 * @tc.type: FUNC
2347 * @tc.require:
2348 * @tc.author: bty
2349 */
2350 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level1)
2351 {
2352 DataStatusTest007();
2353 }
2354
2355 /**
2356 * @tc.name: SyncDataStatusTest008
2357 * @tc.desc: Test upload process when data locked
2358 * @tc.type: FUNC
2359 * @tc.require:
2360 * @tc.author: bty
2361 */
2362 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level1)
2363 {
2364 /**
2365 * @tc.steps:step1. init local data
2366 * @tc.expected: step1. return OK.
2367 */
2368 int localCount = 40;
2369 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2370 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2371 std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2372 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2373
2374 /**
2375 * @tc.steps:step2. sync and check process
2376 * @tc.expected: step2. return OK.
2377 */
2378 g_syncProcess = {};
2379 Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2380 std::vector<TableProcessInfo> expectProcess = {
2381 { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2382 { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2383 };
2384 int index = 0;
2385 CloudSyncConfig config;
2386 config.maxUploadCount = 100; // max upload 100
2387 g_delegate->SetCloudSyncConfig(config);
__anon09c4f3ca3002(const std::map<std::string, SyncProcess> &process) 2388 CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2389 g_syncProcess = std::move(process.begin()->second);
2390 ASSERT_LT(index, 2);
2391 for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2392 EXPECT_EQ(info.process, expectProcess[index].process);
2393 EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2394 EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2395 EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2396 EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2397 }
2398 index++;
2399 if (g_syncProcess.process == FINISHED) {
2400 g_processCondition.notify_one();
2401 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2402 }
2403 };
2404 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2405 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2406 }
2407
2408 /**
2409 * @tc.name: DownloadAssetTest001
2410 * @tc.desc: Test the asset status after the share table sync
2411 * @tc.type: FUNC
2412 * @tc.require:
2413 * @tc.author: bty
2414 */
2415 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level1)
2416 {
2417 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
2418 /**
2419 * @tc.steps:step1. init data and sync
2420 * @tc.expected: step1. return OK.
2421 */
2422 int cloudCount = 10; // 10 is num of cloud
2423 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2424 CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2425
2426 /**
2427 * @tc.steps:step2. check asset status
2428 * @tc.expected: step2. return OK.
2429 */
2430 SqlCondition condition;
2431 condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2432 condition.readOnly = true;
2433 std::vector<VBucket> records;
2434 EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2435 for (const auto &data: records) {
2436 Assets assets;
2437 CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2438 for (const auto &asset: assets) {
2439 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2440 }
2441 }
2442 EXPECT_EQ(g_virtualAssetLoader->GetBatchDownloadCount(), 0u);
2443 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
2444 }
2445
2446 /**
2447 * @tc.name: DownloadAssetTest002
2448 * @tc.desc: Test asset download failed and re download
2449 * @tc.type: FUNC
2450 * @tc.require:
2451 * @tc.author: liaoyonghuang
2452 */
2453 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level1)
2454 {
2455 /**
2456 * @tc.steps:step1. init data
2457 * @tc.expected: step1. return OK.
2458 */
2459 int cloudCount = 10; // 10 is num of cloud
2460 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2461
2462 /**
2463 * @tc.steps:step2. Set asset download status error and sync
2464 * @tc.expected: step2. sync successful but download assets fail.
2465 */
2466 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2467 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2468
2469 /**
2470 * @tc.steps:step3. Set asset download status OK and sync
2471 * @tc.expected: step3. return OK.
2472 */
2473 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2474 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2475
2476 /**
2477 * @tc.steps:step4. Check assets status
2478 * @tc.expected: step4. status is NORMAL.
2479 */
2480 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2481 sqlite3_stmt *stmt = nullptr;
2482 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2483 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2484 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2485 Type cloudValue;
2486 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2487 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2488 for (const auto &asset : assets) {
2489 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2490 }
2491 }
2492 int errCode = E_OK;
2493 SQLiteUtils::ResetStatement(stmt, true, errCode);
2494 EXPECT_EQ(errCode, E_OK);
2495 }
2496
2497 /**
2498 * @tc.name: DownloadAssetTest003
2499 * @tc.desc: Test asset download after sync task recovery
2500 * @tc.type: FUNC
2501 * @tc.require:
2502 * @tc.author: liaoyonghuang
2503 */
2504 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest003, TestSize.Level1)
2505 {
2506 /**
2507 * @tc.steps:step1. init data
2508 * @tc.expected: step1. return OK.
2509 */
2510 int cloudCount = 10; // 10 is num of cloud
2511 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2512 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2513 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
2514 InsertCloudDBData(0, cloudCount, 0, NO_PRIMARY_TABLE);
2515 /**
2516 * @tc.steps:step2. Set task interrupted before asset download
2517 * @tc.expected: step2. return OK.
2518 */
2519 int queryTime = 0;
__anon09c4f3ca3102(const std::string &, VBucket &) 2520 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2521 queryTime++;
2522 if (queryTime != 1) {
2523 return;
2524 }
2525 Query query = Query::Select().FromTable({NO_PRIMARY_TABLE});
2526 CloudSyncOption option;
2527 option.priorityTask = true;
2528 option.devices = {DEVICE_CLOUD};
2529 option.mode = SYNC_MODE_CLOUD_MERGE;
2530 option.query = query;
2531 ASSERT_EQ(g_delegate->Sync(option, nullptr), OK);
2532 });
2533 /**
2534 * @tc.steps:step3. Sync
2535 * @tc.expected: step3. return OK.
2536 */
2537 int removeTime = 0;
__anon09c4f3ca3202(std::map<std::string, Assets> &assets) 2538 g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&](std::map<std::string, Assets> &assets) {
2539 removeTime++;
2540 return OK;
2541 });
2542 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2543 /**
2544 * @tc.steps:step4. Check fork asset download time and observer
2545 * @tc.expected: step4. return OK.
2546 */
2547 EXPECT_EQ(removeTime, cloudCount);
2548 ChangedData expectedChangeData1;
2549 ChangedData expectedChangeData2;
2550 expectedChangeData1.tableName = NO_PRIMARY_TABLE;
2551 expectedChangeData2.tableName = ASSETS_TABLE_NAME;
2552 expectedChangeData1.type = ChangedDataType::ASSET;
2553 expectedChangeData2.type = ChangedDataType::ASSET;
2554 expectedChangeData1.field.push_back(std::string("rowid"));
2555 expectedChangeData2.field.push_back(std::string("id"));
2556 for (int i = 0; i < cloudCount; i++) {
2557 expectedChangeData1.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 1});
2558 expectedChangeData2.primaryData[ChangeType::OP_DELETE].push_back({(int64_t)i});
2559 }
2560 g_observer->SetExpectedResult(expectedChangeData1);
2561 g_observer->SetExpectedResult(expectedChangeData2);
2562 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
2563 g_observer->ClearChangedData();
2564
2565 g_virtualCloudDb->ForkInsertConflict(nullptr);
2566 g_virtualCloudDb->ForkQuery(nullptr);
2567 g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
2568 }
2569
2570 /**
2571 * @tc.name: DownloadAssetTest004
2572 * @tc.desc: Test asset download failed with no asset data
2573 * @tc.type: FUNC
2574 * @tc.require:
2575 * @tc.author: zqq
2576 */
2577 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest004, TestSize.Level1)
2578 {
2579 /**
2580 * @tc.steps:step1. init data
2581 * @tc.expected: step1. return OK.
2582 */
2583 int cloudCount = 10; // 10 is num of cloud
2584 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME, true);
2585 InsertCloudDBData(cloudCount, cloudCount, cloudCount, ASSETS_TABLE_NAME, false);
2586 /**
2587 * @tc.steps:step2. Set asset download status error and sync
2588 * @tc.expected: step2. sync successful but download assets fail.
2589 */
2590 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2591 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2592 /**
2593 * @tc.steps:step3. Check FLAG_DEVICE_CLOUD_INCONSISTENCY data
2594 * @tc.expected: step3. 10 data is cloud consistency because no asset.
2595 */
2596 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2597 std::string sql = "SELECT COUNT(1) FROM " + logName + " WHERE flag & 0x20 = 0";
2598 int count = 0;
2599 ASSERT_EQ(SQLiteUtils::GetCountBySql(db, sql, count), E_OK);
2600 EXPECT_EQ(count, cloudCount);
2601 }
2602
2603 /**
2604 * @tc.name: RecordLockFuncTest001
2605 * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2606 * @tc.type: FUNC
2607 * @tc.author: lijun
2608 */
2609 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level1)
2610 {
2611 /**
2612 * @tc.steps:step1. init local data
2613 * @tc.expected: step1. return OK.
2614 */
2615 int localCount = 100;
2616 int cloudCount = 100;
2617 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2618 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2619 std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2620 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2621 CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2622 CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2623 DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2624
2625 /**
2626 * @tc.steps:step2. init cloud data
2627 * @tc.expected: step2. return OK.
2628 */
2629 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2630 UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2631
2632 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2633 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2634 int index = 0;
2635 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2636 .WillRepeatedly(
__anon09c4f3ca3302(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2637 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2638 LOGD("Download GID:%s %d", gid.c_str(), index);
2639 index++;
2640 if (index <= 30) {
2641 return DBStatus::CLOUD_ERROR;
2642 } else {
2643 return DBStatus::OK;
2644 }
2645
2646 });
2647
2648 std::mutex mtx;
2649 std::condition_variable cv;
2650 int queryIdx = 0;
2651 bool ready = false;
__anon09c4f3ca3402(const std::string &, VBucket &) 2652 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2653 LOGD("query index:%d", ++queryIdx);
2654 if (queryIdx == 2) { // 2 is compensated sync
2655 std::unique_lock<std::mutex> lock(mtx);
2656 ready = true;
2657 cv.notify_one();
2658 }
2659 });
2660 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2661 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2662
2663 {
2664 std::unique_lock<std::mutex> lock(mtx);
__anon09c4f3ca3502null2665 cv.wait(lock, [&]{ return ready; });
2666 }
2667 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2668
2669 std::this_thread::sleep_for(std::chrono::seconds(6));
2670 /**
2671 * @tc.steps:step3. check after compensated sync
2672 * @tc.expected: step3. all is UNLOCKING.
2673 */
2674 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2675 }
2676
2677 /**
2678 * @tc.name: RecordLockFuncTest002
2679 * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2680 * based on the cloud, and the last synchronization data is based on the device.
2681 * @tc.type: FUNC
2682 * @tc.author: lijun
2683 */
2684 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level1)
2685 {
2686 /**
2687 * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2688 * @tc.expected: step1. return OK.
2689 */
2690 int localCount = 120;
2691 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2692 std::vector<std::vector<uint8_t>> hashKey;
2693 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2694 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2695 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2696 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2697 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2698
2699 /**
2700 * @tc.steps:step2. Check the synchronization result and log table status
2701 * @tc.expected: step2.100-109 is LOCK_CHANGE.
2702 */
2703 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2704 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2705 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2706 CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2707 CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2708
2709 /**
2710 * @tc.steps:step3. Synchronize and check the lock_change data status
2711 * @tc.expected: step3.100-119 is LOCK_CHANGE.
2712 */
2713 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2714 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2715 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2716
2717 /**
2718 * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2719 * @tc.expected: step4.100-119 is UNLOCKING.
2720 */
2721 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2722 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2723 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2724
2725 /**
2726 * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2727 * @tc.expected: step5.100-119 is LOCK_CHANGE.
2728 */
2729 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2730 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2731 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2732
2733 /**
2734 * @tc.steps:step6. Synchronize and check the lock_change data status
2735 * @tc.expected: step6.100-119 is LOCK_CHANGE.
2736 */
2737 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2738 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2739 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2740
2741 /**
2742 * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2743 * @tc.expected: step7.100-119 is UNLOCKING.
2744 */
2745 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2746 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2747 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2748
2749 /**
2750 * @tc.steps:step8. Synchronize data
2751 * @tc.expected: step8.return OK.
2752 */
2753 std::mutex mtx;
2754 std::condition_variable cv;
2755 int queryIdx = 0;
2756 bool ready = false;
__anon09c4f3ca3602(const std::string &, VBucket &) 2757 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2758 LOGD("query index:%d", ++queryIdx);
2759 if (queryIdx == 5) { // 5 is compensated sync
2760 std::unique_lock<std::mutex> lock(mtx);
2761 ready = true;
2762 cv.notify_one();
2763 }
2764 });
2765 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2766 {
2767 std::unique_lock<std::mutex> lock(mtx);
__anon09c4f3ca3702null2768 cv.wait(lock, [&]{ return ready; });
2769 }
2770
2771 std::this_thread::sleep_for(std::chrono::seconds(6));
2772 /**
2773 * @tc.steps:step9. check after compensated sync
2774 * @tc.expected: step9. all is UNLOCK.
2775 */
2776 CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2777 }
2778
2779 /**
2780 * @tc.name: CloudTaskStatusTest001
2781 * @tc.desc: Test get cloud task status
2782 * @tc.type: FUNC
2783 * @tc.require:
2784 * @tc.author: liaoyonghuang
2785 */
2786 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest001, TestSize.Level1)
2787 {
2788 /**
2789 * @tc.steps:step1. init data
2790 * @tc.expected: step1. return OK.
2791 */
2792 int cloudCount = 10; // 10 is num of cloud
2793 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2794
2795 /**
2796 * @tc.steps:step2. Sync and get cloud task status
2797 * @tc.expected: step2. OK
2798 */
2799 g_virtualCloudDb->SetBlockTime(1000);
__anon09c4f3ca3802() 2800 std::thread syncThread([&]() {
2801 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2802 });
2803 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2804 SyncProcess process1 = g_delegate->GetCloudTaskStatus(UINT64_MAX);
2805 EXPECT_EQ(process1.errCode, OK);
2806 syncThread.join();
2807 /**
2808 * @tc.steps:step3. Get cloud task status after sync finish
2809 * @tc.expected: step3. NOT_FOUND
2810 */
2811 SyncProcess process2 = g_delegate->GetCloudTaskStatus(1);
2812 EXPECT_EQ(process2.errCode, NOT_FOUND);
2813
2814 /**
2815 * @tc.steps:step4. Get cloud task status after DB closed
2816 * @tc.expected: step4. DB_ERROR
2817 */
2818 auto delegateImpl = static_cast<RelationalStoreDelegateImpl *>(g_delegate);
2819 EXPECT_EQ(delegateImpl->Close(), DBStatus::OK);
2820 SyncProcess process3 = g_delegate->GetCloudTaskStatus(1);
2821 EXPECT_EQ(process3.errCode, DB_ERROR);
2822 }
2823
2824 /**
2825 * @tc.name: CloudTaskStatusTest002
2826 * @tc.desc: Test get cloud task status when task merge
2827 * @tc.type: FUNC
2828 * @tc.require:
2829 * @tc.author: suyue
2830 */
2831 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudTaskStatusTest002, TestSize.Level1)
2832 {
2833 /**
2834 * @tc.steps:step1. init data
2835 * @tc.expected: step1. return OK
2836 */
2837 int cloudCount = 100; // 100 is num of cloud
2838 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2839
2840 /**
2841 * @tc.steps:step2. sync tasks 2 and 3 that can be merged when synchronizing task 1, get status of the merge task
2842 * @tc.expected: step2. the errCode of task 2 and task 3 is OK or NOT_FOUND
2843 */
2844 g_virtualCloudDb->SetBlockTime(1000);
2845 Query query = Query::Select().FromTable({ASSETS_TABLE_NAME});
__anon09c4f3ca3902(const std::map<std::string, SyncProcess> &process) 2846 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2847 std::unique_lock<std::mutex> lock(g_processMutex);
2848 if (process.begin()->second.process == FINISHED) {
2849 g_processCondition.notify_one();
2850 }
2851 };
2852 CloudSyncOption option = {.devices = {DEVICE_CLOUD}, .mode = SYNC_MODE_CLOUD_MERGE, .query = query,
2853 .waitTime = SYNC_WAIT_TIME, .lockAction = static_cast<LockAction>(0xff)};
2854
__anon09c4f3ca3a02() 2855 std::thread syncThread1([option, callback]() {
2856 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2857 });
2858 option.merge = true;
__anon09c4f3ca3b02() 2859 std::thread syncThread2([option, callback]() {
2860 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2861 });
__anon09c4f3ca3c02() 2862 std::thread syncThread3([option, callback]() {
2863 ASSERT_EQ(g_delegate->Sync(option, callback), DBStatus::OK);
2864 });
2865 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2866 SyncProcess process1 = g_delegate->GetCloudTaskStatus(1);
2867 SyncProcess process2 = g_delegate->GetCloudTaskStatus(2);
2868 SyncProcess process3 = g_delegate->GetCloudTaskStatus(3);
2869 syncThread1.join();
2870 syncThread2.join();
2871 syncThread3.join();
2872 // Due to the task execution sequence, task 2 may be combined into 3 or task 3 may be combined into 2.
2873 // Therefore, the errCode of task 2 and 3 may be OK or NOT_FOUND.
2874 EXPECT_TRUE(process2.errCode == OK || process2.errCode == NOT_FOUND);
2875 EXPECT_TRUE(process3.errCode == OK || process3.errCode == NOT_FOUND);
2876 }
2877
2878 /**
2879 * @tc.name: CompensatedSyncTest001
2880 * @tc.desc: test compensated count more than 100.
2881 * @tc.type: FUNC
2882 * @tc.require:
2883 * @tc.author: tankaisheng
2884 */
2885 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CompensatedSyncTest001, TestSize.Level1)
2886 {
2887 /**
2888 * @tc.steps:step1. init data
2889 * @tc.expected: step1. return OK.
2890 */
2891 int dataCount = 120;
2892 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2893 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2894
2895 /**
2896 * @tc.steps:step2. set all data wait compensated.
2897 * @tc.expected: step2. return ok.
2898 */
2899 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
2900 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
2901 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
2902 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2903 reinterpret_cast<void *>(120u), nullptr), SQLITE_OK);
2904
2905 /**
2906 * @tc.steps:step3. sync with compensated.
2907 * @tc.expected: step3. return ok.
2908 */
2909 std::mutex processMutex;
2910 std::vector<SyncProcess> expectProcess;
2911 std::condition_variable cv;
2912 bool finish = false;
2913 auto callback = [&cv, &finish, &processMutex]
__anon09c4f3ca3d02(const std::map<std::string, SyncProcess> &process) 2914 (const std::map<std::string, SyncProcess> &process) {
2915 for (auto &item : process) {
2916 if (item.second.process == FINISHED) {
2917 EXPECT_EQ(item.second.errCode, DBStatus::OK);
2918 std::unique_lock<std::mutex> lock(processMutex);
2919 finish = true;
2920 cv.notify_one();
2921 }
2922 }
2923 };
2924 CloudSyncOption option;
2925 option.devices = {DEVICE_CLOUD};
2926 option.priorityTask = true;
2927 option.compensatedSyncOnly = true;
2928 DBStatus syncResult = g_delegate->Sync(option, callback);
2929 EXPECT_EQ(syncResult, DBStatus::OK);
2930
2931 /**
2932 * @tc.steps:step4. wait sync finish and check data.
2933 * @tc.expected: step4. return ok.
2934 */
2935 std::unique_lock<std::mutex> lock(processMutex);
__anon09c4f3ca3e02() 2936 cv.wait(lock, [&finish]() {
2937 return finish;
2938 });
2939 std::this_thread::sleep_for(std::chrono::seconds(1));
2940 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
2941 reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
2942 }
2943
2944 /**
2945 * @tc.name: CloudCooperationTrackerTableSync001
2946 * @tc.desc: test CLOUD_COOPERATION tracker table sync.
2947 * @tc.type: FUNC
2948 * @tc.require:
2949 * @tc.author: tankaisheng
2950 */
2951 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, CloudCooperationTrackerTableSync001, TestSize.Level1)
2952 {
2953 /**
2954 * @tc.steps:step1. init data
2955 * @tc.expected: step1. return OK.
2956 */
2957 int dataCount = 120;
2958 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
2959 TrackerSchema trackerSchema = {
2960 .tableName = ASSETS_TABLE_NAME, .extendColNames = {"int_field1"}, .trackerColNames = {"int_field1"}};
2961 g_delegate->SetTrackerTable(trackerSchema);
2962 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
2963 }
2964
2965 /**
2966 * @tc.name: FillAssetId026
2967 * @tc.desc: Test if assetId is null when removedevicedata in FLAG_AND_DATA
2968 * @tc.type: FUNC
2969 * @tc.require:
2970 * @tc.author: tankaisheng
2971 */
2972 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId026, TestSize.Level0)
2973 {
2974 /**
2975 * @tc.steps:step1. local insert assets and sync, check the local assetId.
2976 * @tc.expected: step1. return OK.
2977 */
2978 int localCount = 10;
2979 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
2980 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2981 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2982
2983 /**
2984 * @tc.steps:step2. remove device data then check record.
2985 * @tc.expected: step2. return OK.
2986 */
2987 g_delegate->RemoveDeviceData("", FLAG_AND_DATA);
2988 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
2989 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2990 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2991 g_delegate->RemoveDeviceData("", CLEAR_SHARED_TABLE);
2992 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
2993 }
2994 } // namespace
2995 #endif // RELATIONAL_STORE
2996