1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "mock_asset_loader.h"
34
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 using namespace std;
39
40 namespace {
41 string g_storeID = "Relational_Store_SYNC";
42 const string g_tableName1 = "worker1";
43 const string g_tableName2 = "worker2";
44 const string g_tableName3 = "worker3";
45 const string g_tableName4 = "worker4";
46 const string DEVICE_CLOUD = "cloud_dev";
47 const string DB_SUFFIX = ".db";
48 const int64_t g_syncWaitTime = 60;
49 const int g_arrayHalfSub = 2;
50 int g_syncIndex = 0;
51 string g_testDir;
52 string g_storePath;
53 std::mutex g_processMutex;
54 std::condition_variable g_processCondition;
55 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
56 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
57 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
58 RelationalStoreObserverUnitTest *g_observer = nullptr;
59 RelationalStoreDelegate *g_delegate = nullptr;
60 SyncProcess g_syncProcess;
61 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
62 const std::string CREATE_LOCAL_TABLE_SQL =
63 "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
64 "name TEXT PRIMARY KEY," \
65 "height REAL ," \
66 "married BOOLEAN ," \
67 "photo BLOB NOT NULL," \
68 "assert BLOB," \
69 "age INT);";
70 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
71 "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
72 "id INTEGER PRIMARY KEY," \
73 "name TEXT ," \
74 "height REAL ," \
75 "photo BLOB ," \
76 "asserts BLOB," \
77 "age INT);";
78 const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
79 const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
80 "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
81 "name TEXT," \
82 "height REAL ," \
83 "married BOOLEAN ," \
84 "photo BLOB NOT NULL," \
85 "assert BLOB," \
86 "age INT);";
87 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
88 "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
89 "id INTEGER PRIMARY KEY," \
90 "name TEXT ," \
91 "height REAL ," \
92 "photo BLOB ," \
93 "asserts BLOB," \
94 "age INT);";
95 const std::vector<Field> g_cloudFiled1 = {
96 {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
97 {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
98 {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
99 };
100 const std::vector<Field> g_invalidCloudFiled1 = {
101 {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
102 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
103 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
104 };
105 const std::vector<Field> g_cloudFiled2 = {
106 {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
107 {"height", TYPE_INDEX<double>}, {"photo", TYPE_INDEX<Bytes>},
108 {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
109 };
110 const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
111 {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
112 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
113 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
114 };
115 const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
116 const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
117 const std::vector<string> g_prefix = {"Local", ""};
118 const Asset g_localAsset = {
119 .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
120 .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
121 };
122 const Asset g_cloudAsset = {
123 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
124 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
125 };
126
CreateUserDBAndTable(sqlite3 * & db)127 void CreateUserDBAndTable(sqlite3 *&db)
128 {
129 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
130 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
131 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
132 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
133 }
134
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)135 void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
136 {
137 std::string photo(photoSize, 'v');
138 int errCode;
139 std::vector<uint8_t> assetBlob;
140 for (int64_t i = begin; i < begin + count; ++i) {
141 Asset asset = g_localAsset;
142 asset.name = asset.name + std::to_string(i);
143 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
144 string sql = "INSERT OR REPLACE INTO " + g_tableName1
145 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
146 "', '175.8', '0', '" + photo + "', ? , '18');";
147 sqlite3_stmt *stmt = nullptr;
148 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
149 if (assetIsNull) {
150 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
151 } else {
152 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
153 }
154 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
155 SQLiteUtils::ResetStatement(stmt, true, errCode);
156 }
157 for (int64_t i = begin; i < begin + count; ++i) {
158 std::vector<Asset> assets;
159 Asset asset = g_localAsset;
160 asset.name = g_localAsset.name + std::to_string(i);
161 assets.push_back(asset);
162 asset.name = g_localAsset.name + std::to_string(i + 1);
163 assets.push_back(asset);
164 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
165 string sql = "INSERT OR REPLACE INTO " + g_tableName2
166 + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
167 + std::to_string(i) + "', '155.10', '"+ photo + "', ? , '21');";
168 sqlite3_stmt *stmt = nullptr;
169 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
170 if (assetIsNull) {
171 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
172 } else {
173 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
174 }
175 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
176 SQLiteUtils::ResetStatement(stmt, true, errCode);
177 }
178 LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
179 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
180 }
181
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)182 void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
183 {
184 for (size_t i = 0; i < g_tables.size(); i++) {
185 string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
186 for (int64_t j = begin; j < begin + count; ++j) {
187 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
188 }
189 updateAge.pop_back();
190 updateAge += ");";
191 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
192 }
193 LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
194 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
195 }
196
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)197 void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
198 {
199 for (size_t i = 0; i < g_tables.size(); i++) {
200 string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
201 for (int64_t j = begin; j < count; ++j) {
202 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
203 }
204 updateAge.pop_back();
205 updateAge += ");";
206 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
207 }
208 LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
209 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
210 }
211
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)212 void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
213 {
214 std::vector<uint8_t> photo(photoSize, 'v');
215 std::string photoLocal(photoSize, 'v');
216 Asset asset = { .version = 1, .name = "Phone" };
217 std::vector<uint8_t> assetBlob;
218 RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
219 std::string assetStr(assetBlob.begin(), assetBlob.end());
220 std::vector<VBucket> record1;
221 std::vector<VBucket> extend1;
222 for (int64_t i = begin; i < count; ++i) {
223 Timestamp now = TimeHelper::GetSysCurrentTime();
224 VBucket data;
225 data.insert_or_assign("name", "Cloud" + std::to_string(i));
226 data.insert_or_assign("height", 166.0); // 166.0 is random double value
227 data.insert_or_assign("married", (bool)0);
228 data.insert_or_assign("photo", photo);
229 data.insert_or_assign("assert", KEY_1);
230 data.insert_or_assign("age", 13L);
231 record1.push_back(data);
232 VBucket log;
233 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
234 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
236 extend1.push_back(log);
237 std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wait for 1 ms
238 }
239 int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
240 ASSERT_EQ(errCode, DBStatus::OK);
241 for (int64_t i = begin; i < count; ++i) {
242 string sql = "INSERT OR REPLACE INTO " + g_tableName3
243 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
244 "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
245 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
246 }
247 }
248
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)249 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
250 {
251 std::vector<uint8_t> photo(photoSize, 'v');
252 std::vector<VBucket> record1;
253 std::vector<VBucket> extend1;
254 std::vector<VBucket> record2;
255 std::vector<VBucket> extend2;
256 Timestamp now = TimeHelper::GetSysCurrentTime();
257 for (int64_t i = begin; i < begin + count; ++i) {
258 VBucket data;
259 data.insert_or_assign("name", "Cloud" + std::to_string(i));
260 data.insert_or_assign("height", 166.0); // 166.0 is random double value
261 data.insert_or_assign("married", false);
262 data.insert_or_assign("photo", photo);
263 data.insert_or_assign("AGE", 13L);
264 Asset asset = g_cloudAsset;
265 asset.name = asset.name + std::to_string(i);
266 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
267 record1.push_back(data);
268 VBucket log;
269 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
270 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
272 extend1.push_back(log);
273
274 std::vector<Asset> assets;
275 data.insert_or_assign("id", i);
276 data.insert_or_assign("height", 180.3); // 180.3 is random double value
277 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
278 asset.name = g_cloudAsset.name + std::to_string(j);
279 assets.push_back(asset);
280 }
281 data.erase("assert");
282 data.erase("married");
283 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
284 record2.push_back(data);
285 extend2.push_back(log);
286 }
287 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
288 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
289 LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
290 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
291 std::this_thread::sleep_for(std::chrono::milliseconds(count));
292 }
293
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)294 void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
295 {
296 string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
297 std::vector<uint8_t> assetBlob;
298 int errCode;
299 Asset asset = g_cloudAsset;
300 asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
301 if (opType == AssetOpType::UPDATE) {
302 asset.uri = "/data/test";
303 asset.hash = "";
304 } else if (opType == AssetOpType::INSERT) {
305 asset.name = "Test10";
306 }
307 asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
308 sqlite3_stmt *stmt = nullptr;
309 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
310 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
311 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
312 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
313 }
314 SQLiteUtils::ResetStatement(stmt, true, errCode);
315 }
316
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)317 void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
318 {
319 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
320 Asset asset1 = g_localAsset;
321 Asset asset2 = g_localAsset;
322 Assets assets;
323 asset1.name = g_localAsset.name + std::to_string(rowid);
324 asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
325 asset2.name = g_localAsset.name + std::to_string(rowid + 1);
326 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
327 if (opType == AssetOpType::UPDATE) {
328 assets.push_back(asset1);
329 asset2.uri = "/data/test";
330 asset2.hash = "";
331 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
332 assets.push_back(asset2);
333 } else if (opType == AssetOpType::INSERT) {
334 assets.push_back(asset1);
335 assets.push_back(asset2);
336 Asset asset3;
337 asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
338 asset3.name = "Test10";
339 assets.push_back(asset3);
340 } else if (opType == AssetOpType::DELETE) {
341 assets.push_back(asset1);
342 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
343 assets.push_back(asset2);
344 } else {
345 assets.push_back(asset1);
346 assets.push_back(asset2);
347 }
348 sqlite3_stmt *stmt = nullptr;
349 std::vector<uint8_t> assetsBlob;
350 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
351 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
352 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
353 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
354 }
355 int errCode;
356 SQLiteUtils::ResetStatement(stmt, true, errCode);
357 }
358
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)359 void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
360 {
361 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
362 std::vector<uint8_t> assetsBlob;
363 int errCode;
364 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
365 sqlite3_stmt *stmt = nullptr;
366 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
367 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
368 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
369 }
370 SQLiteUtils::ResetStatement(stmt, true, errCode);
371 }
372
UpdateDiffType(int64_t begin)373 void UpdateDiffType(int64_t begin)
374 {
375 std::vector<std::string> hash = {"DEC", "update_", "insert_"};
376 std::vector<std::string> name = {
377 g_cloudAsset.name + std::to_string(0),
378 g_cloudAsset.name + std::to_string(1),
379 g_cloudAsset.name + std::to_string(3) // 3 is insert id
380 };
381 std::vector<VBucket> record;
382 std::vector<VBucket> extend;
383 Assets assets;
384 for (int i = 0; i < 3; i ++) { // 3 is type num
385 Asset asset = g_cloudAsset;
386 asset.name = name[i];
387 asset.hash = hash[i];
388 assets.push_back(asset);
389 }
390 VBucket data;
391 data.insert_or_assign("name", "Cloud" + std::to_string(0));
392 data.insert_or_assign("id", 0L);
393 data.insert_or_assign("asserts", assets);
394 Timestamp now = TimeHelper::GetSysCurrentTime();
395 VBucket log;
396 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
397 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
398 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
399 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400 record.push_back(data);
401 extend.push_back(log);
402 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
403 }
404
CheckDiffTypeAsset(sqlite3 * & db)405 void CheckDiffTypeAsset(sqlite3 *&db)
406 {
407 std::vector<std::string> names = {
408 g_cloudAsset.name + std::to_string(0),
409 g_cloudAsset.name + std::to_string(1),
410 g_cloudAsset.name + std::to_string(3) // 3 is insert id
411 };
412 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
413 sqlite3_stmt *stmt = nullptr;
414 int index = 0;
415 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
416 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
417 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
418 Type cloudValue;
419 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
420 std::vector<uint8_t> assetsBlob;
421 Assets assets;
422 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
423 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
424 for (const Asset &asset: assets) {
425 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
426 ASSERT_EQ(asset.name, names[index]);
427 LOGE("lyh_test: name: %s", names[index].c_str());
428 index++;
429 }
430 }
431 int errCode;
432 SQLiteUtils::ResetStatement(stmt, true, errCode);
433 }
434
CheckAssetForAssetTest006()435 void CheckAssetForAssetTest006()
436 {
437 VBucket extend;
438 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
439 std::vector<VBucket> data;
440 g_virtualCloudDb->Query(g_tables[1], extend, data);
441 for (size_t j = 0; j < data.size(); ++j) {
442 ASSERT_NE(data[j].find("asserts"), data[j].end());
443 ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
444 Assets &assets = std::get<Assets>(data[j]["asserts"]);
445 ASSERT_TRUE(assets.size() > 0);
446 Asset &asset = assets[0];
447 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
448 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
449 }
450 }
451
CheckFillAssetForTest10(sqlite3 * & db)452 void CheckFillAssetForTest10(sqlite3 *&db)
453 {
454 std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
455 sqlite3_stmt *stmt = nullptr;
456 int index = 0;
457 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
458 int suffixId = 6;
459 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
460 if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
461 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
462 Type cloudValue;
463 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
464 std::vector<uint8_t> assetBlob;
465 Asset asset;
466 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
467 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
468 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
469 if (index == 0) {
470 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471 } else if (index == 1) {
472 ASSERT_EQ(asset.name, "Test10");
473 } else {
474 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
475 ASSERT_EQ(asset.uri, "/data/test");
476 }
477 } else {
478 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
479 }
480 index++;
481 }
482 int errCode;
483 SQLiteUtils::ResetStatement(stmt, true, errCode);
484 }
485
CheckFillAssetsForTest10(sqlite3 * & db)486 void CheckFillAssetsForTest10(sqlite3 *&db)
487 {
488 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
489 sqlite3_stmt *stmt = nullptr;
490 int index = 0;
491 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
492 int insertIndex = 2;
493 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
494 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
495 Type cloudValue;
496 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
497 std::vector<uint8_t> assetsBlob;
498 Assets assets;
499 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
500 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
501 if (index == 0) {
502 ASSERT_EQ(assets.size(), 2u);
503 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
504 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
505 } else if (index == 1) {
506 ASSERT_EQ(assets.size(), 3u);
507 ASSERT_EQ(assets[insertIndex].name, "Test10");
508 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
509 } else if (index == 2) { // 2 is the third element
510 ASSERT_EQ(assets.size(), 1u);
511 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
512 } else {
513 ASSERT_EQ(assets.size(), 2u);
514 ASSERT_EQ(assets[1].uri, "/data/test");
515 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
516 }
517 index++;
518 }
519 int errCode;
520 SQLiteUtils::ResetStatement(stmt, true, errCode);
521 }
522
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)523 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
524 {
525 if (count != 1) {
526 return 0;
527 }
528 auto expectCount = reinterpret_cast<int64_t>(data);
529 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
530 return 0;
531 }
532
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")533 void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
534 {
535 for (size_t i = 0; i < g_tables.size(); ++i) {
536 string queryDownload = "select count(*) from " + g_tables[i] + " where name "
537 + " like '" + keyStr + "%'";
538 EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
539 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
540 }
541 }
542
CheckCloudTotalCount(std::vector<int64_t> expectCounts)543 void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
544 {
545 VBucket extend;
546 for (size_t i = 0; i < g_tables.size(); ++i) {
547 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
548 int64_t realCount = 0;
549 std::vector<VBucket> data;
550 g_virtualCloudDb->Query(g_tables[i], extend, data);
551 for (size_t j = 0; j < data.size(); ++j) {
552 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
553 if (entry != data[j].end() && std::get<bool>(entry->second)) {
554 continue;
555 }
556 realCount++;
557 }
558 EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
559 }
560 }
561
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)562 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
563 {
564 TableSchema tableSchema1 = {
565 .name = g_tableName1,
566 .sharedTableName = g_tableName1 + "_shared",
567 .fields = g_cloudFiled1
568 };
569 TableSchema tableSchema2 = {
570 .name = g_tableName2,
571 .sharedTableName = g_tableName2 + "_shared",
572 .fields = g_cloudFiled2
573 };
574 TableSchema tableSchemaWithOutPrimaryKey = {
575 .name = g_tableName3,
576 .sharedTableName = g_tableName3 + "_shared",
577 .fields = g_cloudFiledWithOutPrimaryKey3
578 };
579 TableSchema tableSchema4 = {
580 .name = g_tableName4,
581 .sharedTableName = g_tableName4 + "_shared",
582 .fields = g_cloudFiled2
583 };
584 dataBaseSchema.tables.push_back(tableSchema1);
585 dataBaseSchema.tables.push_back(tableSchema2);
586 dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
587 dataBaseSchema.tables.push_back(tableSchema4);
588 }
589
590
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)591 void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
592 {
593 TableSchema tableSchema1 = {
594 .name = g_tableName1,
595 .sharedTableName = "",
596 .fields = g_invalidCloudFiled1
597 };
598 TableSchema tableSchema2 = {
599 .name = g_tableName2,
600 .sharedTableName = "",
601 .fields = g_cloudFiled2
602 };
603 dataBaseSchema.tables.push_back(tableSchema1);
604 dataBaseSchema.tables.push_back(tableSchema2);
605 }
606
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)607 void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
608 std::vector<SyncProcess> &expectProcess)
609 {
610 expectProcess.clear();
611 std::vector<TableProcessInfo> infos;
612 uint32_t index = 1;
613 infos.push_back(TableProcessInfo{
614 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615 });
616 infos.push_back(TableProcessInfo{
617 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
618 });
619
620 infos.push_back(TableProcessInfo{
621 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622 });
623 infos.push_back(TableProcessInfo{
624 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
625 });
626
627 infos.push_back(TableProcessInfo{
628 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629 });
630 infos.push_back(TableProcessInfo{
631 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
632 });
633
634 infos.push_back(TableProcessInfo{
635 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
636 });
637 infos.push_back(TableProcessInfo{
638 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
639 });
640
641 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
642 SyncProcess syncProcess;
643 syncProcess.errCode = OK;
644 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
645 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
646 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
647 expectProcess.push_back(syncProcess);
648 }
649 }
650
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)651 void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
652 {
653 expectProcess.clear();
654 std::vector<TableProcessInfo> infos;
655 // first notify, first table
656 infos.push_back(TableProcessInfo{
657 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
658 });
659 // first notify, second table
660 infos.push_back(TableProcessInfo{
661 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
662 });
663 // second notify, first table
664 infos.push_back(TableProcessInfo{
665 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
666 });
667 // second notify, second table
668 infos.push_back(TableProcessInfo{
669 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
670 });
671
672 infos.push_back(TableProcessInfo{
673 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
674 });
675 // second notify, second table
676 infos.push_back(TableProcessInfo{
677 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
678 });
679 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
680 SyncProcess syncProcess;
681 syncProcess.errCode = OK;
682 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
683 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
684 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
685 expectProcess.push_back(syncProcess);
686 }
687 }
688
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)689 void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
690 std::vector<SyncProcess> &expectProcess)
691 {
692 expectProcess.clear();
693 std::vector<TableProcessInfo> infos;
694 uint32_t index = 1;
695 infos.push_back(TableProcessInfo{
696 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697 });
698 infos.push_back(TableProcessInfo{
699 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
700 });
701
702 infos.push_back(TableProcessInfo{
703 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704 });
705 infos.push_back(TableProcessInfo{
706 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
707 });
708
709 infos.push_back(TableProcessInfo{
710 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
711 });
712 infos.push_back(TableProcessInfo{
713 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
714 });
715
716 infos.push_back(TableProcessInfo{
717 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
718 });
719 infos.push_back(TableProcessInfo{
720 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
721 });
722
723 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
724 SyncProcess syncProcess;
725 syncProcess.errCode = OK;
726 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
727 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
728 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
729 expectProcess.push_back(syncProcess);
730 }
731 }
732
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)733 void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
734 std::vector<SyncProcess> &expectProcess)
735 {
736 expectProcess.clear();
737 std::vector<TableProcessInfo> infos;
738 uint32_t index = 1;
739 infos.push_back(TableProcessInfo{
740 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741 });
742 infos.push_back(TableProcessInfo{
743 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
744 });
745
746 infos.push_back(TableProcessInfo{
747 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748 });
749 infos.push_back(TableProcessInfo{
750 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
751 });
752
753 infos.push_back(TableProcessInfo{
754 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755 });
756 infos.push_back(TableProcessInfo{
757 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
758 });
759
760 infos.push_back(TableProcessInfo{
761 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
762 });
763 infos.push_back(TableProcessInfo{
764 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
765 });
766
767 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
768 SyncProcess syncProcess;
769 syncProcess.errCode = OK;
770 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
771 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
772 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
773 expectProcess.push_back(syncProcess);
774 }
775 }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)776 void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
777 std::vector<SyncProcess> &expectProcess)
778 {
779 g_syncIndex = 0;
780 callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
781 LOGI("devices size = %d", process.size());
782 ASSERT_EQ(process.size(), 1u);
783 syncProcess = std::move(process.begin()->second);
784 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
785 ASSERT_NE(syncProcess.tableProcess.empty(), true);
786 LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
787 std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
788 auto table1 = syncProcess.tableProcess.find(item);
789 if (table1 != syncProcess.tableProcess.end()) {
790 LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
791 "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
792 item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
793 table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
794 table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
795 table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
796 table1->second.upLoadInfo.failCount);
797 }
798 });
799 if (expectProcess.empty()) {
800 if (syncProcess.process == FINISHED) {
801 g_processCondition.notify_one();
802 }
803 return;
804 }
805 ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
806 for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
807 SyncProcess head = expectProcess[g_syncIndex];
808 for (auto &expect : head.tableProcess) {
809 auto real = syncProcess.tableProcess.find(expect.first);
810 ASSERT_NE(real, syncProcess.tableProcess.end());
811 EXPECT_EQ(expect.second.process, real->second.process);
812 EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
813 EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
814 EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
815 EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
816 EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
817 EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
818 EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
819 EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
820 }
821 }
822 g_syncIndex++;
823 if (syncProcess.process == FINISHED) {
824 g_processCondition.notify_one();
825 }
826 };
827 }
828
CheckAllAssetAfterUpload(int64_t localCount)829 void CheckAllAssetAfterUpload(int64_t localCount)
830 {
831 VBucket extend;
832 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
833 std::vector<VBucket> data1;
834 g_virtualCloudDb->Query(g_tables[0], extend, data1);
835 for (size_t j = 0; j < data1.size(); ++j) {
836 Type entry;
837 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
838 ASSERT_TRUE(isExisted);
839 Asset asset = std::get<Asset>(entry);
840 bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
841 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
842 EXPECT_EQ(asset.version, baseAsset.version);
843 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
844 EXPECT_EQ(asset.uri, baseAsset.uri);
845 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
846 EXPECT_EQ(asset.createTime, baseAsset.createTime);
847 EXPECT_EQ(asset.size, baseAsset.size);
848 EXPECT_EQ(asset.hash, baseAsset.hash);
849 }
850
851 std::vector<VBucket> data2;
852 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
853 g_virtualCloudDb->Query(g_tables[1], extend, data2);
854 for (size_t j = 0; j < data2.size(); ++j) {
855 Type entry;
856 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
857 ASSERT_TRUE(isExisted);
858 Assets assets = std::get<Assets>(entry);
859 Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
860 int index = static_cast<int>(j);
861 for (const auto &asset: assets) {
862 EXPECT_EQ(asset.version, baseAsset.version);
863 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
864 EXPECT_EQ(asset.uri, baseAsset.uri);
865 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
866 EXPECT_EQ(asset.createTime, baseAsset.createTime);
867 EXPECT_EQ(asset.size, baseAsset.size);
868 EXPECT_EQ(asset.hash, baseAsset.hash);
869 }
870 }
871 }
872
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)873 void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
874 {
875 string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
876 for (int64_t i = 0; i < localCount; ++i) {
877 queryDownload += "'" + std::to_string(i) + "',";
878 }
879 queryDownload.pop_back();
880 queryDownload += ");";
881 sqlite3_stmt *stmt = nullptr;
882 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
883 int index = 0;
884 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
885 std::vector<uint8_t> blobValue;
886 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
887 Assets assets;
888 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
889 bool isLocal = index >= localCount / g_arrayHalfSub;
890 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
891 int nameIndex = index;
892 for (const auto &asset: assets) {
893 EXPECT_EQ(asset.version, baseAsset.version);
894 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
895 EXPECT_EQ(asset.uri, baseAsset.uri);
896 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
897 EXPECT_EQ(asset.createTime, baseAsset.createTime);
898 EXPECT_EQ(asset.size, baseAsset.size);
899 EXPECT_EQ(asset.hash, baseAsset.hash);
900 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
901 nameIndex++;
902 }
903 index++;
904 }
905 int errCode;
906 SQLiteUtils::ResetStatement(stmt, true, errCode);
907 }
908
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)909 void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
910 {
911 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
912 for (int64_t i = 0; i < localCount; ++i) {
913 queryDownload += "'" + std::to_string(i) + "',";
914 }
915 queryDownload.pop_back();
916 queryDownload += ");";
917 sqlite3_stmt *stmt = nullptr;
918 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
919 int index = 0;
920 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
921 std::vector<uint8_t> blobValue;
922 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
923 Asset asset;
924 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
925 bool isCloud = index >= localCount;
926 Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
927 EXPECT_EQ(asset.version, baseAsset.version);
928 EXPECT_EQ(asset.name,
929 baseAsset.name + std::to_string(isCloud ? index - localCount / g_arrayHalfSub : index));
930 EXPECT_EQ(asset.uri, baseAsset.uri);
931 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
932 EXPECT_EQ(asset.createTime, baseAsset.createTime);
933 EXPECT_EQ(asset.size, baseAsset.size);
934 EXPECT_EQ(asset.hash, baseAsset.hash);
935 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
936 index++;
937 }
938 int errCode;
939 SQLiteUtils::ResetStatement(stmt, true, errCode);
940 }
941
UpdateCloudAssetForDownloadAssetTest003()942 void UpdateCloudAssetForDownloadAssetTest003()
943 {
944 VBucket data;
945 std::vector<uint8_t> photo(1, 'x');
946 data.insert_or_assign("name", "Cloud" + std::to_string(0));
947 data.insert_or_assign("photo", photo);
948 data.insert_or_assign("assert", g_cloudAsset);
949 Timestamp now = TimeHelper::GetSysCurrentTime();
950 VBucket log;
951 std::vector<VBucket> record;
952 std::vector<VBucket> extend;
953 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
954 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
955 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
956 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957 record.push_back(data);
958 extend.push_back(log);
959 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
960 }
961
CheckAssetForDownloadAssetTest003(sqlite3 * & db)962 void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
963 {
964 string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
965 sqlite3_stmt *stmt = nullptr;
966 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
967 int index = 0;
968 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
969 std::vector<uint8_t> blobValue;
970 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
971 Asset asset;
972 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
973 EXPECT_EQ(asset.name, g_cloudAsset.name);
974 EXPECT_EQ(asset.hash, g_cloudAsset.hash);
975 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
976 index++;
977 }
978 int errCode;
979 SQLiteUtils::ResetStatement(stmt, true, errCode);
980 }
981
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)982 void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
983 {
984 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
985 for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
986 queryDownload += "'" + std::to_string(i) + "',";
987 }
988 queryDownload.pop_back();
989 queryDownload += ");";
990 sqlite3_stmt *stmt = nullptr;
991 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
992 int index = 0;
993 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
994 std::vector<uint8_t> blobValue;
995 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
996 Asset asset;
997 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
998 EXPECT_EQ(asset.version, g_cloudAsset.version);
999 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1000 index++;
1001 }
1002 int errCode;
1003 SQLiteUtils::ResetStatement(stmt, true, errCode);
1004 }
1005
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1006 void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1007 {
1008 VBucket data;
1009 std::vector<uint8_t> photo(1, 'v');
1010 data.insert_or_assign("name", "Local" + std::to_string(0));
1011 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1012 data.insert_or_assign("married", false);
1013 data.insert_or_assign("age", 13L);
1014 data.insert_or_assign("photo", photo);
1015 Asset asset = g_cloudAsset;
1016 asset.name = asset.name + std::to_string(0);
1017 data.insert_or_assign("assert", asset);
1018 record.push_back(data);
1019 VBucket log;
1020 Timestamp now = TimeHelper::GetSysCurrentTime();
1021 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1022 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024 log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1025 extend.push_back(log);
1026 }
1027
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1028 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1029 {
1030 std::unique_lock<std::mutex> lock(g_processMutex);
1031 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1032 return syncProcess.process == FINISHED;
1033 });
1034 ASSERT_EQ(result, true);
1035 LOGD("-------------------sync end--------------");
1036 }
1037
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1038 void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1039 {
1040 g_syncProcess = {};
1041 Query query = Query::Select().FromTable(tableNames);
1042 std::vector<SyncProcess> expectProcess;
1043 CloudSyncStatusCallback callback;
1044 GetCallback(g_syncProcess, callback, expectProcess);
1045 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1046 if (dbStatus == DBStatus::OK) {
1047 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1048 }
1049 }
1050
CloseDb()1051 void CloseDb()
1052 {
1053 delete g_observer;
1054 g_virtualCloudDb = nullptr;
1055 if (g_delegate != nullptr) {
1056 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1057 g_delegate = nullptr;
1058 }
1059 }
1060
InitMockAssetLoader(DBStatus & status,int & index)1061 void InitMockAssetLoader(DBStatus &status, int &index)
1062 {
1063 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1064 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1065 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1066 .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1067 std::map<std::string, Assets> &assets) {
1068 LOGD("Download GID:%s", gid.c_str());
1069 for (auto &item: assets) {
1070 for (auto &asset: item.second) {
1071 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1072 EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1073 lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1074 LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1075 asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1076 }
1077 }
1078 return status;
1079 });
1080 }
1081
1082 class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1083 public:
1084 static void SetUpTestCase(void);
1085 static void TearDownTestCase(void);
1086 void SetUp();
1087 void TearDown();
1088 protected:
1089 sqlite3 *db = nullptr;
1090 };
1091
1092
SetUpTestCase(void)1093 void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1094 {
1095 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1096 g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1097 LOGI("The test db is:%s", g_testDir.c_str());
1098 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1099 }
1100
TearDownTestCase(void)1101 void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1102 {}
1103
SetUp(void)1104 void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1105 {
1106 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1107 LOGE("rm test db files error.");
1108 }
1109 DistributedDBToolsUnitTest::PrintTestCaseInfo();
1110 LOGD("Test dir is %s", g_testDir.c_str());
1111 db = RelationalTestUtils::CreateDataBase(g_storePath);
1112 ASSERT_NE(db, nullptr);
1113 CreateUserDBAndTable(db);
1114 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1115 ASSERT_NE(g_observer, nullptr);
1116 ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1117 g_delegate), DBStatus::OK);
1118 ASSERT_NE(g_delegate, nullptr);
1119 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1120 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1121 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1122 g_virtualCloudDb = make_shared<VirtualCloudDb>();
1123 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1124 g_syncProcess = {};
1125 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1126 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1127 // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1128 Query query = Query::Select().FromTable(g_tables);
1129 CloudSyncStatusCallback callback;
1130 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1131 DBStatus::SCHEMA_MISMATCH);
1132 DataBaseSchema dataBaseSchema;
1133 GetCloudDbSchema(dataBaseSchema);
1134 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1135 }
1136
TearDown(void)1137 void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1138 {
1139 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1140 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1141 LOGE("rm test db files error.");
1142 }
1143 }
1144
1145 /**
1146 * @tc.name: CloudSyncTest001
1147 * @tc.desc: Cloud data is older than local data.
1148 * @tc.type: FUNC
1149 * @tc.require:
1150 * @tc.author: bty
1151 */
1152 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1153 {
1154 int64_t paddingSize = 10;
1155 int64_t cloudCount = 20;
1156 int64_t localCount = cloudCount / g_arrayHalfSub;
1157 ChangedData changedDataForTable1;
1158 ChangedData changedDataForTable2;
1159 changedDataForTable1.tableName = g_tableName1;
1160 changedDataForTable2.tableName = g_tableName2;
1161 changedDataForTable1.field.push_back(std::string("name"));
1162 changedDataForTable2.field.push_back(std::string("id"));
1163 for (int i = 0; i < cloudCount; i++) {
1164 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1165 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1166 }
1167 g_observer->SetExpectedResult(changedDataForTable1);
1168 g_observer->SetExpectedResult(changedDataForTable2);
1169 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1170 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1171 Query query = Query::Select().FromTable(g_tables);
1172 std::vector<SyncProcess> expectProcess;
1173 InitProcessForTest1(cloudCount, localCount, expectProcess);
1174 CloudSyncStatusCallback callback;
1175 GetCallback(g_syncProcess, callback, expectProcess);
1176 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1177 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1178 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1179 g_observer->ClearChangedData();
1180 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1181 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1182 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1183 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1184 CloseDb();
1185 }
1186
1187 /**
1188 * @tc.name: CloudSyncTest002
1189 * @tc.desc: Local data is older than cloud data.
1190 * @tc.type: FUNC
1191 * @tc.require:
1192 * @tc.author: bty
1193 */
1194 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1195 {
1196 int64_t localCount = 20;
1197 int64_t cloudCount = 10;
1198 int64_t paddingSize = 100;
1199 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1200 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1201 Query query = Query::Select().FromTable(g_tables);
1202 std::vector<SyncProcess> expectProcess;
1203 InitProcessForTest2(cloudCount, localCount, expectProcess);
1204 CloudSyncStatusCallback callback;
1205 GetCallback(g_syncProcess, callback, expectProcess);
1206 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1207 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1208 LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1209 CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1210 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1211 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1212 CloseDb();
1213 }
1214
1215 /**
1216 * @tc.name: CloudSyncTest003
1217 * @tc.desc: test with update and delete operator
1218 * @tc.type: FUNC
1219 * @tc.require:
1220 * @tc.author: bty
1221 */
1222 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1223 {
1224 int64_t paddingSize = 10;
1225 int cloudCount = 20;
1226 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1227 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1228 Query query = Query::Select().FromTable(g_tables);
1229 std::vector<SyncProcess> expectProcess;
1230 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1231 CloudSyncStatusCallback callback;
1232 GetCallback(g_syncProcess, callback, expectProcess);
1233 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1234 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1235 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1236 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1237
1238 int updateCount = 10;
1239 UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1240 g_syncProcess = {};
1241 InitProcessForTest1(cloudCount, updateCount, expectProcess);
1242 GetCallback(g_syncProcess, callback, expectProcess);
1243 LOGD("-------------------sync after update--------------");
1244 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1245 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1246
1247 VBucket extend;
1248 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1249 std::vector<VBucket> data1;
1250 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1251 for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1252 EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1253 }
1254
1255 std::vector<VBucket> data2;
1256 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1257 for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1258 EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1259 }
1260
1261 int deleteCount = 3;
1262 DeleteUserTableRecord(db, 0, deleteCount);
1263 g_syncProcess = {};
1264 InitProcessForTest1(updateCount, deleteCount, expectProcess);
1265 GetCallback(g_syncProcess, callback, expectProcess);
1266 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1267 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1268
1269 CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1270 CloseDb();
1271 }
1272
1273 /**
1274 * @tc.name: CloudSyncTest004
1275 * @tc.desc: Random write of local and cloud data
1276 * @tc.type: FUNC
1277 * @tc.require:
1278 * @tc.author: bty
1279 */
1280 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1281 {
1282 int64_t paddingSize = 1024 * 8;
1283 vector<thread> threads;
1284 int cloudCount = 1024;
1285 threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1286 threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1287 for (auto &thread: threads) {
1288 thread.join();
1289 }
1290 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1291 CloseDb();
1292 }
1293
1294 /**
1295 * @tc.name: CloudSyncTest005
1296 * @tc.desc: sync with device sync query
1297 * @tc.type: FUNC
1298 * @tc.require:
1299 * @tc.author: bty
1300 */
1301 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1302 {
1303 Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1304 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1305 DBStatus::NOT_SUPPORT);
1306
1307 query = Query::Select();
1308 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1309 DBStatus::INVALID_ARGS);
1310 CloseDb();
1311 }
1312
1313 /**
1314 * @tc.name: CloudSyncTest006
1315 * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1316 * @tc.type: FUNC
1317 * @tc.require:
1318 * @tc.author: wanyi
1319 */
1320 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1321 {
1322 int64_t paddingSize = 10;
1323 int cloudCount = 20;
1324 ChangedData changedDataForTable1;
1325 ChangedData changedDataForTable2;
1326 changedDataForTable1.tableName = g_tableName1;
1327 changedDataForTable2.tableName = g_tableName2;
1328 changedDataForTable1.field.push_back(std::string("name"));
1329 changedDataForTable2.field.push_back(std::string("id"));
1330 for (int i = 0; i < cloudCount; i++) {
1331 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1332 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1333 }
1334 g_observer->SetExpectedResult(changedDataForTable1);
1335 g_observer->SetExpectedResult(changedDataForTable2);
1336 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1337 InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1338 // Set correct cloudDbSchema (correct version)
1339 DataBaseSchema correctSchema;
1340 GetCloudDbSchema(correctSchema);
1341 ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1342 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1343 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1344 g_observer->ClearChangedData();
1345 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1346 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1347 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1348 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1349
1350 // Reset cloudDbSchema (invalid version - null)
1351 DataBaseSchema nullSchema;
1352 ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1353 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1354
1355 // Reset cloudDbSchema (invalid version - field mismatch)
1356 DataBaseSchema invalidSchema;
1357 GetInvalidCloudDbSchema(invalidSchema);
1358 ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1359 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1360 CloseDb();
1361 }
1362
1363 /**
1364 * @tc.name: CloudSyncTest007
1365 * @tc.desc: Check the asset types after sync
1366 * @tc.type: FUNC
1367 * @tc.require:
1368 * @tc.author: bty
1369 */
1370 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1371 {
1372 int64_t paddingSize = 100;
1373 int localCount = 20;
1374 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1375 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1376 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1377
1378 CheckAssetAfterDownload(db, localCount);
1379 CheckAllAssetAfterUpload(localCount);
1380 CheckAssetsAfterDownload(db, localCount);
1381 CloseDb();
1382 }
1383
1384 /*
1385 * @tc.name: CloudSyncTest008
1386 * @tc.desc: Test sync with invalid param
1387 * @tc.type: FUNC
1388 * @tc.require:
1389 * @tc.author: zhangqiquan
1390 */
1391 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1392 {
1393 ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK); // it will not happen because cloudb has been set in SetUp()
1394 Query query = Query::Select().FromTable({g_tableName3});
1395 // clouddb has been set in SetUp() and it's not null
1396 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1397 CloseDb();
1398 }
1399
1400 /**
1401 * @tc.name: CloudSyncTest009
1402 * @tc.desc: The second time there was no data change and sync was called.
1403 * @tc.type: FUNC
1404 * @tc.require:
1405 * @tc.author: bty
1406 */
1407 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1408 {
1409 int64_t paddingSize = 10;
1410 int cloudCount = 20;
1411 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1412 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1413 Query query = Query::Select().FromTable(g_tables);
1414 std::vector<SyncProcess> expectProcess;
1415 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1416 CloudSyncStatusCallback callback;
1417 GetCallback(g_syncProcess, callback, expectProcess);
1418 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1419 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1420 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1421 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1422 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1423 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1424
1425 g_syncProcess = {};
1426 InitProcessForTest9(cloudCount, 0, expectProcess);
1427 GetCallback(g_syncProcess, callback, expectProcess);
1428 LOGD("--------------the second sync-------------");
1429 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1430 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1431 CloseDb();
1432 }
1433
1434 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1435 {
1436 int64_t paddingSize = 10;
1437 int cloudCount = 20;
1438 int localCount = 10;
1439 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1440 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1441 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1442
1443 int rowid = 27;
1444 UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1445 UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1446 UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1447 UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1448
1449 int id = 0;
1450 UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1451 UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1452 UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1453 UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1454
1455 LOGD("--------------the second sync-------------");
1456 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1457
1458 CheckFillAssetForTest10(db);
1459 CheckFillAssetsForTest10(db);
1460 CloseDb();
1461 }
1462
1463 /**
1464 * @tc.name: CloudSyncTest011
1465 * @tc.desc: Test sync with same table name.
1466 * @tc.type: FUNC
1467 * @tc.require:
1468 * @tc.author: zhangqiquan
1469 */
1470 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1471 {
1472 Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1473 bool syncFinish = false;
1474 std::mutex syncMutex;
1475 std::condition_variable cv;
1476 std::atomic<int> callCount = 0;
1477 CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anona8df80760602( const std::map<std::string, SyncProcess> &onProcess) 1478 const std::map<std::string, SyncProcess> &onProcess) {
1479 ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1480 SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1481 callCount++;
1482 if (syncProcess.process == FINISHED) {
1483 std::lock_guard<std::mutex> autoLock(syncMutex);
1484 syncFinish = true;
1485 }
1486 cv.notify_all();
1487 };
1488 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1489 std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anona8df80760702() 1490 cv.wait(uniqueLock, [&syncFinish]() {
1491 return syncFinish;
1492 });
1493 RuntimeContext::GetInstance()->StopTaskPool();
1494 EXPECT_EQ(callCount, 2); // 2 is onProcess count
1495 CloseDb();
1496 }
1497
1498 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1499 {
1500 int64_t localCount = 20;
1501 int64_t cloudCount = 10;
1502 int64_t paddingSize = 10;
1503 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1504 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1505 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1506
1507 InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1508 InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1509 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1510
1511 InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1512 InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1513 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1514
1515
1516 InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1517 InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1518 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1519 CloseDb();
1520 }
1521
1522 /*
1523 * @tc.name: CloudSyncTest013
1524 * @tc.desc: test increment watermark when cloud db query data size is 0
1525 * @tc.type: FUNC
1526 * @tc.require:
1527 * @tc.author: zhuwentao
1528 */
1529 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1530 {
1531 /**
1532 * @tc.steps: insert some data into cloud db
1533 * @tc.expected: return ok.
1534 */
1535 int64_t paddingSize = 10;
1536 int64_t cloudCount = 10;
1537 SyncProcess syncProcess;
1538 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1539 /**
1540 * @tc.steps: try to cloud sync
1541 * @tc.expected: return ok.
1542 */
1543 Query query = Query::Select().FromTable(g_tables);
__anona8df80760802(const std::map<std::string, SyncProcess> &process) 1544 CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1545 LOGI("devices size = %d", process.size());
1546 ASSERT_EQ(process.size(), 1u);
1547 syncProcess = std::move(process.begin()->second);
1548 if (syncProcess.process == FINISHED) {
1549 g_processCondition.notify_one();
1550 }
1551 };
1552 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1553 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1554 uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1555 /**
1556 * @tc.steps: insert some increment data into cloud db
1557 * @tc.expected: return ok.
1558 */
1559 VBucket data;
1560 Timestamp now = TimeHelper::GetSysCurrentTime();
1561 data.insert_or_assign("name", "Cloud" + std::to_string(0));
1562 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1563 data.insert_or_assign("married", false);
1564 data.insert_or_assign("age", 13L);
1565 VBucket log;
1566 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1567 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1568 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1569 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1570 log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1571 g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1572 syncProcess.process = PREPARED;
1573 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1574 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1575 uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1576 ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1577 CloseDb();
1578 }
1579
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1580 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1581 {
1582 std::mutex dataMutex;
1583 std::condition_variable cv;
1584 bool finish = false;
1585 DBStatus res = OK;
1586 CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1587 const std::map<std::string, SyncProcess> &process) {
1588 std::map<std::string, SyncProcess> syncProcess;
1589 {
1590 std::lock_guard<std::mutex> autoLock(dataMutex);
1591 syncProcess = process;
1592 if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1593 finish = true;
1594 }
1595 res = syncProcess[DEVICE_CLOUD].errCode;
1596 }
1597 cv.notify_one();
1598 };
1599 Query query = Query::Select().FromTable({g_tableName3});
1600 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1601 {
1602 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1603 cv.wait(uniqueLock, [&finish] {
1604 return finish;
1605 });
1606 }
1607 EXPECT_EQ(res, expectStatus);
1608 }
1609
1610 /*
1611 * @tc.name: CloudSyncTest015
1612 * @tc.desc: Test sync with cloud error
1613 * @tc.type: FUNC
1614 * @tc.require:
1615 * @tc.author: zhangqiquan
1616 */
1617 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1618 {
1619 g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1620 TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1621
1622 g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1623 TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1624
1625 g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1626 TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1627
1628 g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1629 TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1630
1631 g_virtualCloudDb->SetActionStatus(DB_ERROR);
1632 TestSyncForStatus(g_delegate, CLOUD_ERROR);
1633
1634 g_virtualCloudDb->SetActionStatus(OK);
1635 CloseDb();
1636 }
1637
1638 /*
1639 * @tc.name: CloudSyncTest014
1640 * @tc.desc: Test sync with s4
1641 * @tc.type: FUNC
1642 * @tc.require:
1643 * @tc.author: zhangqiquan
1644 */
1645 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1646 {
1647 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1648 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1649
1650 // sync failed because get security option failed
__anona8df80760b02(const std::string&, SecurityOption &option) 1651 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1652 option.securityLabel = S0;
1653 return DB_ERROR;
1654 });
1655 Query query = Query::Select().FromTable({g_tableName3});
1656 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1657 SECURITY_OPTION_CHECK_ERROR);
1658
1659 // sync failed because get S4
__anona8df80760c02(const std::string&, SecurityOption &option) 1660 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1661 option.securityLabel = S4;
1662 return NOT_SUPPORT;
1663 });
1664 Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1665 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1666 NOT_SUPPORT);
1667
1668 // sync failed because get S4
__anona8df80760d02(const std::string&, SecurityOption &option) 1669 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1670 option.securityLabel = S4;
1671 return OK;
1672 });
1673 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1674 SECURITY_OPTION_CHECK_ERROR);
1675
1676 // sync failed because S4 has been cached
__anona8df80760e02(const std::string&, SecurityOption &option) 1677 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1678 option.securityLabel = S0;
1679 return OK;
1680 });
1681 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1682 SECURITY_OPTION_CHECK_ERROR);
1683 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1684 CloseDb();
1685 }
1686
1687 /*
1688 * @tc.name: CloudSyncTest016
1689 * @tc.desc: Test sync when push before merge
1690 * @tc.type: FUNC
1691 * @tc.require:
1692 * @tc.author: chenchaohao
1693 */
1694 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1695 {
1696 int64_t localCount = 10;
1697 int64_t paddingSize = 10;
1698 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1699 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1700 CheckCloudTotalCount({10L, 10L});
1701 UpdateUserTableRecord(db, 0, localCount);
1702 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1703
1704 VBucket extend;
1705 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1706 std::vector<VBucket> data1;
1707 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1708 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1709 EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1710 }
1711
1712 std::vector<VBucket> data2;
1713 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1714 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1715 EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1716 }
1717
1718 CloseDb();
1719 }
1720
1721 /*
1722 * @tc.name: DataNotifier001
1723 * @tc.desc: Notify data without primary key
1724 * @tc.type: FUNC
1725 * @tc.require:
1726 * @tc.author: wanyi
1727 */
1728 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1729 {
1730 int64_t paddingSize = 10;
1731 int localCount = 20;
1732 InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1733 callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1734 CloseDb();
1735 }
1736
1737 /**
1738 * @tc.name: CloudSyncAssetTest001
1739 * @tc.desc:
1740 * @tc.type: FUNC
1741 * @tc.require:
1742 * @tc.author: wanyi
1743 */
1744 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1745 {
1746 int64_t paddingSize = 100;
1747 int localCount = 20;
1748 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1749 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1750 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1751
1752 CheckAssetAfterDownload(db, localCount);
1753 CheckAllAssetAfterUpload(localCount);
1754 CloseDb();
1755 }
1756
1757 /*
1758 * @tc.name: MannualNotify001
1759 * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1760 * @tc.type: FUNC
1761 * @tc.require:
1762 * @tc.author: huangboxin
1763 */
1764 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1765 {
1766 int64_t paddingSize = 10;
1767 int localCount = 10;
1768 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1769 Query query = Query::Select().FromTable(g_tables);
1770 std::vector<SyncProcess> expectProcess;
1771 InitProcessForMannualSync1(expectProcess);
1772 CloudSyncStatusCallback callback;
1773 GetCallback(g_syncProcess, callback, expectProcess);
1774 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1775 DBStatus::OK);
1776 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1777 CloseDb();
1778 }
1779
1780 /**
1781 * @tc.name: CloudProcessNotify001
1782 * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1783 * @tc.type: FUNC
1784 * @tc.require:
1785 * @tc.author: liufuchenxing
1786 */
1787 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1788 {
1789 /**
1790 * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1791 * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1792 */
1793 int64_t paddingSize = 10;
1794 int64_t localCount = 1;
1795 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1796 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1797 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1798 g_observer->ClearChangedData();
1799 LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1800 CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1801 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1802 CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1803
1804 /**
1805 * @tc.steps: step2. reset data
1806 * @tc.expected: step2. return ok.
1807 */
1808 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1809 g_syncProcess = {};
1810 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1811
1812 /**
1813 * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1814 * @tc.expected: step3. return ok.
1815 */
1816 VBucket idMap;
1817 idMap.insert_or_assign("#_gid", std::to_string(0));
1818 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1819
1820 /**
1821 * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1822 * @tc.expected: step4. return ok.
1823 */
1824 std::vector<VBucket> record1;
1825 std::vector<VBucket> extend1;
1826 InsertCloudForCloudProcessNotify001(record1, extend1);
1827 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1828
1829 /**
1830 * @tc.steps: step5. sync() and check local data.
1831 * @tc.expected: step5. return ok.
1832 */
1833 ChangedData changedDataForTable1;
1834 changedDataForTable1.tableName = g_tableName1;
1835 changedDataForTable1.field.push_back(std::string("name"));
1836 changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1837 g_observer->SetExpectedResult(changedDataForTable1);
1838
1839 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1840 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1841 g_observer->ClearChangedData();
1842 LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1843 // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1844 CheckDownloadResult(db, {1L, 1L}, "Local");
1845 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1846 CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1847
1848 /**
1849 * @tc.steps: step6. CloseDb().
1850 * @tc.expected: step6. return ok.
1851 */
1852 CloseDb();
1853 }
1854
1855 /*
1856 * @tc.name: CloudSyncAssetTest002
1857 * @tc.desc:
1858 * @tc.type: FUNC
1859 * @tc.require:
1860 * @tc.author: huangboxin
1861 */
1862 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1863 {
1864 int64_t paddingSize = 10;
1865 int localCount = 3;
1866 int cloudCount = 3;
1867 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1868 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1869 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1870 CloseDb();
1871 }
1872
1873 /*
1874 * @tc.name: CloudSyncAssetTest003
1875 * @tc.desc:
1876 * @tc.type: FUNC
1877 * @tc.require:
1878 * @tc.author: bty
1879 */
1880 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1881 {
1882 int64_t paddingSize = 10;
1883 int localCount = 3;
1884 int cloudCount = 3;
1885 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1886 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1887 Assets assets;
1888 assets.push_back(g_localAsset);
1889 assets.push_back(g_localAsset);
1890 UpdateLocalAssets(db, assets, 1);
1891 Query query = Query::Select().FromTable(g_tables);
1892 std::vector<SyncProcess> expectProcess;
__anona8df80760f02(const std::map<std::string, SyncProcess> &process) 1893 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1894 ASSERT_EQ(process.size(), 1u);
1895 g_syncProcess = std::move(process.begin()->second);
1896
1897 if (g_syncProcess.process == FINISHED) {
1898 g_processCondition.notify_one();
1899 }
1900 };
1901 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1902 DBStatus::OK);
1903 {
1904 std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761002() 1905 g_processCondition.wait(lock, []() {
1906 return g_syncProcess.process == FINISHED;
1907 });
1908 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1909 }
1910 CloseDb();
1911 }
1912
1913 /*
1914 * @tc.name: CloudSyncAssetTest004
1915 * @tc.desc:
1916 * @tc.type: FUNC
1917 * @tc.require:
1918 * @tc.author: bty
1919 */
1920 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1921 {
1922 int64_t paddingSize = 10;
1923 int localCount = 3;
1924 int cloudCount = 3;
1925 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1926 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1927 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1928
1929 UpdateDiffType(localCount);
1930 g_syncProcess = {};
__anona8df80761102(const std::map<std::string, SyncProcess> &process) 1931 CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1932 ASSERT_EQ(process.size(), 1u);
1933 g_syncProcess = std::move(process.begin()->second);
1934 if (g_syncProcess.process == FINISHED) {
1935 g_processCondition.notify_one();
1936 }
1937 };
1938 Query query = Query::Select().FromTable(g_tables);
1939 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1940 DBStatus::OK);
1941 {
1942 std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761202() 1943 g_processCondition.wait(lock, []() {
1944 return g_syncProcess.process == FINISHED;
1945 });
1946 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1947 }
1948 CheckDiffTypeAsset(db);
1949 CloseDb();
1950 }
1951
1952 /*
1953 * @tc.name: CloudSyncAssetTest005
1954 * @tc.desc: Test erase all no change Asset
1955 * @tc.type: FUNC
1956 * @tc.require:
1957 * @tc.author: bty
1958 */
1959 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
1960 {
1961 /**
1962 * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
1963 * @tc.expected: step1. return ok.
1964 */
1965 int64_t paddingSize = 10;
1966 int localCount = 3;
1967 int cloudCount = 3;
1968 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1969 Assets assets;
1970 for (int64_t j = 0; j < cloudCount; j++) {
1971 Asset asset = g_cloudAsset;
1972 asset.name = g_cloudAsset.name + std::to_string(j);
1973 assets.push_back(asset);
1974 }
1975 UpdateLocalAssets(db, assets, 0);
1976 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
1977
1978 /**
1979 * @tc.steps:step2. Construct cloud data
1980 * @tc.expected: step2. return ok.
1981 */
1982 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1983
1984 /**
1985 * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
1986 * @tc.expected: step3. return ok.
1987 */
1988 Query query = Query::Select().FromTable(g_tables);
1989 std::vector<SyncProcess> expectProcess;
__anona8df80761302(const std::map<std::string, SyncProcess> &process) 1990 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1991 ASSERT_EQ(process.size(), 1u);
1992 g_syncProcess = std::move(process.begin()->second);
1993
1994 if (g_syncProcess.process == FINISHED) {
1995 g_processCondition.notify_one();
1996 }
1997 };
1998 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1999 DBStatus::OK);
2000 {
2001 std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761402() 2002 g_processCondition.wait(lock, []() {
2003 return g_syncProcess.process == FINISHED;
2004 });
2005 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2006 }
2007 CloseDb();
2008 }
2009
2010 /*
2011 * @tc.name: CloudSyncAssetTest006
2012 * @tc.desc: Test upload new data without assets
2013 * @tc.type: FUNC
2014 * @tc.require:
2015 * @tc.author: bty
2016 */
2017 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2018 {
2019 /**
2020 * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2021 * @tc.expected: step1. return ok.
2022 */
2023 int64_t paddingSize = 10;
2024 int localCount = 6;
2025 int cloudCount = 3;
2026 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2027 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2028 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2029
2030 /**
2031 * @tc.steps:step2. sync, upload new data without assets,
2032 * @tc.expected: step2. return ok.
2033 */
2034 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2035 CloseDb();
2036 }
2037
2038 /*
2039 * @tc.name: CloudSyncAssetTest007
2040 * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2041 * regarded as NO-CHANGE, rather than delete
2042 * @tc.type: FUNC
2043 * @tc.require:
2044 * @tc.author: wanyi
2045 */
2046 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2047 {
2048 /**
2049 * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2050 * @tc.expected: step1. return ok.
2051 */
2052 int64_t paddingSize = 10;
2053 int localCount = 1;
2054 int cloudCount = 1;
2055 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2056 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2057 /**
2058 * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2059 * @tc.expected: step2. return ok.
2060 */
2061 Assets assets;
2062 for (int64_t j = 0; j < cloudCount; j++) {
2063 Asset asset = g_cloudAsset;
2064 asset.name = g_cloudAsset.name + std::to_string(j);
2065 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2066 assets.push_back(asset);
2067 }
2068 UpdateLocalAssets(db, assets, 0);
2069 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2070 /**
2071 * @tc.steps:step3. Do sync
2072 * @tc.expected: step3. return ok.
2073 */
2074 Query query = Query::Select().FromTable(g_tables);
2075 std::vector<SyncProcess> expectProcess;
__anona8df80761502(const std::map<std::string, SyncProcess> &process) 2076 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2077 ASSERT_EQ(process.size(), 1u);
2078 g_syncProcess = std::move(process.begin()->second);
2079
2080 if (g_syncProcess.process == FINISHED) {
2081 g_processCondition.notify_one();
2082 }
2083 };
2084 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2085 DBStatus::OK);
2086 {
2087 std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761602() 2088 g_processCondition.wait(lock, []() {
2089 return g_syncProcess.process == FINISHED;
2090 });
2091 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2092 }
2093 /**
2094 * @tc.steps:step4. Check result. Cloud db should not contain asset.
2095 * @tc.expected: step4. return ok.
2096 */
2097 CheckAssetForAssetTest006();
2098 CloseDb();
2099 }
2100
2101 /**
2102 * @tc.name: DownloadAssetTest001
2103 * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2104 * @tc.type: FUNC
2105 * @tc.require:
2106 * @tc.author: bty
2107 */
2108 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2109 {
2110 /**
2111 * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2112 * @tc.expected: step1. return ok.
2113 */
2114 DBStatus expectStatus = DBStatus::OK;
2115 int index = 0;
2116 InitMockAssetLoader(expectStatus, index);
2117
2118 /**
2119 * @tc.steps:step2. init download data
2120 * @tc.expected: step2. return ok.
2121 */
2122 int64_t paddingSize = 1;
2123 int localCount = 120;
2124 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2125 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2126
2127 /**
2128 * @tc.steps:step3. sync
2129 * @tc.expected: step3. return ok.
2130 */
2131 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2132
2133 /**
2134 * @tc.steps:step4. Expect all states to be normal
2135 * @tc.expected: step4. return ok.
2136 */
2137 CheckAssetAfterDownload(db, localCount);
2138 CloseDb();
2139 }
2140
2141 /*
2142 * @tc.name: CloudSyncAssetTest008
2143 * @tc.desc: sync failed with download asset
2144 * @tc.type: FUNC
2145 * @tc.require:
2146 * @tc.author: zhangqiquan
2147 */
2148 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2149 {
2150 /**
2151 * @tc.steps:step1. prepare asset data
2152 */
2153 int64_t paddingSize = 10;
2154 int localCount = 1;
2155 int cloudCount = 1;
2156 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2157 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2158 /**
2159 * @tc.steps:step2. set download asset status failed
2160 */
2161 g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2162 Query query = Query::Select().FromTable(g_tables);
2163 std::vector<SyncProcess> expectProcess;
__anona8df80761702(const std::map<std::string, SyncProcess> &process) 2164 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2165 for (const auto &item: process) {
2166 g_syncProcess = item.second;
2167 }
2168 if (g_syncProcess.process == FINISHED) {
2169 g_processCondition.notify_one();
2170 }
2171 };
2172 /**
2173 * @tc.steps:step3. sync and wait sync finished.
2174 * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2175 */
2176 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2177 DBStatus::OK);
2178 {
2179 std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761802() 2180 g_processCondition.wait(lock, []() {
2181 return g_syncProcess.process == FINISHED;
2182 });
2183 ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2184 }
2185 /**
2186 * @tc.steps:step4. clear data.
2187 */
2188 g_virtualAssetLoader->SetDownloadStatus(OK);
2189 CloseDb();
2190 }
2191
2192 /*
2193 * @tc.name: CloudSyncAssetTest009
2194 * @tc.desc:
2195 * @tc.type: FUNC
2196 * @tc.require:
2197 * @tc.author: zhangqiquan
2198 */
2199 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level0)
2200 {
2201 // insert 3 data with asset 3 data without asset into local
2202 // sync them to cloud
2203 int64_t paddingSize = 10;
2204 int localCount = 3;
2205 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2206 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2207 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2208 // update these data and sync again
2209 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2210 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2211 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2212 EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2213 CloseDb();
2214 }
2215
2216 /**
2217 * @tc.name: DownloadAssetTest002
2218 * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2219 * @tc.type: FUNC
2220 * @tc.require:
2221 * @tc.author: bty
2222 */
2223 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2224 {
2225 /**
2226 * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2227 * @tc.expected: step1. return ok.
2228 */
2229 DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2230 int index = 0;
2231 InitMockAssetLoader(expectStatus, index);
2232 int64_t paddingSize = 1;
2233 int localCount = 100;
2234
2235 /**
2236 * @tc.steps:step2. init download data
2237 * @tc.expected: step2. return ok.
2238 */
2239 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2240 InsertCloudTableRecord(0, localCount, paddingSize, false);
2241
2242 /**
2243 * @tc.steps:step3. sync
2244 * @tc.expected: step3. return ok.
2245 */
2246 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2247
2248 /**
2249 * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2250 * @tc.expected: step4. return ok.
2251 */
2252 CheckAssetAfterDownload2(db, localCount);
2253 CloseDb();
2254 }
2255
2256 /**
2257 * @tc.name: DownloadAssetTest003
2258 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2259 * @tc.type: FUNC
2260 * @tc.require:
2261 * @tc.author: bty
2262 */
2263 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2264 {
2265 /**
2266 * @tc.steps:step1. Init data and sync
2267 * @tc.expected: step1. return ok.
2268 */
2269 int64_t paddingSize = 1;
2270 int localCount = 10;
2271 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2272 InsertCloudTableRecord(0, localCount, paddingSize, false);
2273 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2274
2275 /**
2276 * @tc.steps:step2. update cloud Asset where gid = 0
2277 * @tc.expected: step2. return ok.
2278 */
2279 UpdateCloudAssetForDownloadAssetTest003();
2280
2281 /**
2282 * @tc.steps:step3. sync again
2283 * @tc.expected: step3. return ok.
2284 */
2285 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2286
2287 /**
2288 * @tc.steps:step4. check asset after download where gid = 0
2289 * @tc.expected: step4. return ok.
2290 */
2291 CheckAssetForDownloadAssetTest003(db);
2292 CloseDb();
2293 }
2294
2295 /**
2296 * @tc.name: DownloadAssetTest004
2297 * @tc.desc: Test total count, fail count and success count when drop table
2298 * @tc.type: FUNC
2299 * @tc.require:
2300 * @tc.author: liufuchenxing
2301 */
2302 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2303 {
2304 /**
2305 * @tc.steps:step1. Init data and sync
2306 * @tc.expected: step1. return ok.
2307 */
2308 int64_t paddingSize = 1;
2309 int count = 10;
2310 InsertUserTableRecord(db, 0, count, paddingSize, false);
2311 g_syncProcess = {};
__anona8df80761902(const std::map<std::string, SyncProcess> &process) 2312 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2313 for (const auto &item : process) {
2314 g_syncProcess = item.second;
2315 }
2316 if (g_syncProcess.process == FINISHED) {
2317 g_processCondition.notify_one();
2318 }
2319 };
2320 Query query = Query::Select().FromTable(g_tables);
2321 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2322 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2323
2324 /**
2325 * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2326 * @tc.expected: step2. total = 20, success=0, fail=20
2327 */
2328 g_syncProcess = {};
2329 InsertCloudTableRecord(0, count, paddingSize, false);
2330 EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2331 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2332 DBStatus::NOT_FOUND);
2333
2334 /**
2335 * @tc.steps:step3. close db.
2336 * @tc.expected: step3. close success.
2337 */
2338 CloseDb();
2339 }
2340
2341 /**
2342 * @tc.name: SchemaTest001
2343 * @tc.desc: Create table with Cloud cooperation mode and do sync
2344 * @tc.type: FUNC
2345 * @tc.require:
2346 * @tc.author: wanyi
2347 */
2348 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2349 {
2350 /**
2351 * @tc.steps:step1. Create table with Cloud cooperation mode
2352 * @tc.expected: step1. return ok.
2353 */
2354 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2355 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2356 /**
2357 * @tc.steps:step1. do sync
2358 * @tc.expected: step1. return ok.
2359 */
2360 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2361 CloseDb();
2362 }
2363
2364 /**
2365 * @tc.name: SchemaTest002
2366 * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2367 * @tc.type: FUNC
2368 * @tc.require:
2369 * @tc.author: wanyi
2370 */
2371 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2372 {
2373 /**
2374 * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2375 * @tc.expected: step1. return ok.
2376 */
2377 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2378 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2379 /**
2380 * @tc.steps:step1. do sync
2381 * @tc.expected: step1. return NOT_SUPPORT.
2382 */
2383 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2384 CloseDb();
2385 }
2386
2387 /**
2388 * @tc.name: CloudCursorTest001
2389 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2390 * @tc.type: FUNC
2391 * @tc.require:
2392 * @tc.author: bty
2393 */
2394 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2395 {
2396 /**
2397 * @tc.steps:step1. Init data and sync
2398 * @tc.expected: step1. return ok.
2399 */
2400 int64_t paddingSize = 1;
2401 int localCount = 10;
2402 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2403 InsertCloudTableRecord(0, localCount, paddingSize, true);
2404 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2405
2406 /**
2407 * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2408 * although it is unTrackerTable
2409 * @tc.expected: step2. return ok.
2410 */
2411 string sql = "select cursor from " + DBConstant::RELATIONAL_PREFIX + g_tableName1 + "_log";
2412 sqlite3_stmt *stmt = nullptr;
2413 EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2414 int64_t index = 0;
2415 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2416 EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2417 }
2418 int errCode;
2419 SQLiteUtils::ResetStatement(stmt, true, errCode);
2420 CloseDb();
2421 }
2422 }
2423 #endif // RELATIONAL_STORE
2424