1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "mock_asset_loader.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_storeID = "Relational_Store_SYNC";
43 const string g_tableName1 = "worker1";
44 const string g_tableName2 = "worker2";
45 const string g_tableName3 = "worker3";
46 const string g_tableName4 = "worker4";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string DB_SUFFIX = ".db";
49 const int64_t g_syncWaitTime = 60;
50 const int g_arrayHalfSub = 2;
51 int g_syncIndex = 0;
52 string g_testDir;
53 string g_storePath;
54 std::mutex g_processMutex;
55 std::condition_variable g_processCondition;
56 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
57 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
58 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
59 RelationalStoreObserverUnitTest *g_observer = nullptr;
60 RelationalStoreDelegate *g_delegate = nullptr;
61 SyncProcess g_syncProcess;
62 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
63 const std::string CREATE_LOCAL_TABLE_SQL =
64 "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
65 "name TEXT PRIMARY KEY," \
66 "height REAL ," \
67 "married BOOLEAN ," \
68 "photo BLOB NOT NULL," \
69 "assert BLOB," \
70 "age INT);";
71 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
72 "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
73 "id INTEGER PRIMARY KEY," \
74 "name TEXT ," \
75 "height REAL ," \
76 "photo BLOB ," \
77 "asserts BLOB," \
78 "age INT);";
79 const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
80 const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
81 "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
82 "name TEXT," \
83 "height REAL ," \
84 "married BOOLEAN ," \
85 "photo BLOB NOT NULL," \
86 "assert BLOB," \
87 "age INT);";
88 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
89 "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
90 "id INTEGER PRIMARY KEY," \
91 "name TEXT ," \
92 "height REAL ," \
93 "photo BLOB ," \
94 "asserts BLOB," \
95 "age INT);";
96 const std::vector<Field> g_cloudFiled1 = {
97 {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
98 {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
99 {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
100 };
101 const std::vector<Field> g_invalidCloudFiled1 = {
102 {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
103 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
104 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
105 };
106 const std::vector<Field> g_cloudFiled2 = {
107 {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
108 {"height", TYPE_INDEX<double>}, {"photo", TYPE_INDEX<Bytes>},
109 {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
110 };
111 const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
112 {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
113 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
114 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
115 };
116 const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
117 const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
118 const std::vector<string> g_prefix = {"Local", ""};
119 const Asset g_localAsset = {
120 .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
121 .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
122 };
123 const Asset g_cloudAsset = {
124 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
125 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
126 };
127
CreateUserDBAndTable(sqlite3 * & db)128 void CreateUserDBAndTable(sqlite3 *&db)
129 {
130 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
131 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
132 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
133 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
134 }
135
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)136 void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
137 {
138 std::string photo(photoSize, 'v');
139 int errCode;
140 std::vector<uint8_t> assetBlob;
141 for (int64_t i = begin; i < begin + count; ++i) {
142 Asset asset = g_localAsset;
143 asset.name = asset.name + std::to_string(i);
144 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
145 string sql = "INSERT OR REPLACE INTO " + g_tableName1
146 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
147 "', '175.8', '0', '" + photo + "', ? , '18');";
148 sqlite3_stmt *stmt = nullptr;
149 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
150 if (assetIsNull) {
151 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
152 } else {
153 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
154 }
155 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
156 SQLiteUtils::ResetStatement(stmt, true, errCode);
157 }
158 for (int64_t i = begin; i < begin + count; ++i) {
159 std::vector<Asset> assets;
160 Asset asset = g_localAsset;
161 asset.name = g_localAsset.name + std::to_string(i);
162 assets.push_back(asset);
163 asset.name = g_localAsset.name + std::to_string(i + 1);
164 assets.push_back(asset);
165 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
166 string sql = "INSERT OR REPLACE INTO " + g_tableName2
167 + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
168 + std::to_string(i) + "', '155.10', '"+ photo + "', ? , '21');";
169 sqlite3_stmt *stmt = nullptr;
170 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
171 if (assetIsNull) {
172 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
173 } else {
174 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
175 }
176 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
177 SQLiteUtils::ResetStatement(stmt, true, errCode);
178 }
179 LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
180 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
181 }
182
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)183 void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
184 {
185 for (size_t i = 0; i < g_tables.size(); i++) {
186 string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
187 for (int64_t j = begin; j < begin + count; ++j) {
188 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
189 }
190 updateAge.pop_back();
191 updateAge += ");";
192 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
193 }
194 LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
195 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
196 }
197
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)198 void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
199 {
200 for (size_t i = 0; i < g_tables.size(); i++) {
201 string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
202 for (int64_t j = begin; j < count; ++j) {
203 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
204 }
205 updateAge.pop_back();
206 updateAge += ");";
207 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
208 }
209 LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
210 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
211 }
212
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)213 void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
214 {
215 std::vector<uint8_t> photo(photoSize, 'v');
216 std::string photoLocal(photoSize, 'v');
217 Asset asset = { .version = 1, .name = "Phone" };
218 std::vector<uint8_t> assetBlob;
219 RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
220 std::string assetStr(assetBlob.begin(), assetBlob.end());
221 std::vector<VBucket> record1;
222 std::vector<VBucket> extend1;
223 for (int64_t i = begin; i < count; ++i) {
224 Timestamp now = TimeHelper::GetSysCurrentTime();
225 VBucket data;
226 data.insert_or_assign("name", "Cloud" + std::to_string(i));
227 data.insert_or_assign("height", 166.0); // 166.0 is random double value
228 data.insert_or_assign("married", (bool)0);
229 data.insert_or_assign("photo", photo);
230 data.insert_or_assign("assert", KEY_1);
231 data.insert_or_assign("age", 13L);
232 record1.push_back(data);
233 VBucket log;
234 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
236 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
237 extend1.push_back(log);
238 std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wait for 1 ms
239 }
240 int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
241 ASSERT_EQ(errCode, DBStatus::OK);
242 for (int64_t i = begin; i < count; ++i) {
243 string sql = "INSERT OR REPLACE INTO " + g_tableName3
244 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
245 "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
246 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
247 }
248 }
249
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)250 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
251 {
252 std::vector<uint8_t> photo(photoSize, 'v');
253 std::vector<VBucket> record1;
254 std::vector<VBucket> extend1;
255 std::vector<VBucket> record2;
256 std::vector<VBucket> extend2;
257 Timestamp now = TimeHelper::GetSysCurrentTime();
258 for (int64_t i = begin; i < begin + count; ++i) {
259 VBucket data;
260 data.insert_or_assign("name", "Cloud" + std::to_string(i));
261 data.insert_or_assign("height", 166.0); // 166.0 is random double value
262 data.insert_or_assign("married", false);
263 data.insert_or_assign("photo", photo);
264 data.insert_or_assign("AGE", 13L);
265 Asset asset = g_cloudAsset;
266 asset.name = asset.name + std::to_string(i);
267 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
268 record1.push_back(data);
269 VBucket log;
270 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
272 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
273 extend1.push_back(log);
274
275 std::vector<Asset> assets;
276 data.insert_or_assign("id", i);
277 data.insert_or_assign("height", 180.3); // 180.3 is random double value
278 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
279 asset.name = g_cloudAsset.name + std::to_string(j);
280 assets.push_back(asset);
281 }
282 data.erase("assert");
283 data.erase("married");
284 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
285 record2.push_back(data);
286 extend2.push_back(log);
287 }
288 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
289 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
290 LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
291 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
292 std::this_thread::sleep_for(std::chrono::milliseconds(count));
293 }
294
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)295 void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
296 {
297 string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
298 std::vector<uint8_t> assetBlob;
299 int errCode;
300 Asset asset = g_cloudAsset;
301 asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
302 if (opType == AssetOpType::UPDATE) {
303 asset.uri = "/data/test";
304 asset.hash = "";
305 } else if (opType == AssetOpType::INSERT) {
306 asset.name = "Test10";
307 }
308 asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
309 sqlite3_stmt *stmt = nullptr;
310 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
311 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
312 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
313 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
314 }
315 SQLiteUtils::ResetStatement(stmt, true, errCode);
316 }
317
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)318 void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
319 {
320 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
321 Asset asset1 = g_localAsset;
322 Asset asset2 = g_localAsset;
323 Assets assets;
324 asset1.name = g_localAsset.name + std::to_string(rowid);
325 asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326 asset2.name = g_localAsset.name + std::to_string(rowid + 1);
327 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
328 if (opType == AssetOpType::UPDATE) {
329 assets.push_back(asset1);
330 asset2.uri = "/data/test";
331 asset2.hash = "";
332 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
333 assets.push_back(asset2);
334 } else if (opType == AssetOpType::INSERT) {
335 assets.push_back(asset1);
336 assets.push_back(asset2);
337 Asset asset3;
338 asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
339 asset3.name = "Test10";
340 assets.push_back(asset3);
341 } else if (opType == AssetOpType::DELETE) {
342 assets.push_back(asset1);
343 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
344 assets.push_back(asset2);
345 } else {
346 assets.push_back(asset1);
347 assets.push_back(asset2);
348 }
349 sqlite3_stmt *stmt = nullptr;
350 std::vector<uint8_t> assetsBlob;
351 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
352 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
353 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
354 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
355 }
356 int errCode;
357 SQLiteUtils::ResetStatement(stmt, true, errCode);
358 }
359
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)360 void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
361 {
362 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
363 std::vector<uint8_t> assetsBlob;
364 int errCode;
365 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
366 sqlite3_stmt *stmt = nullptr;
367 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
368 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
369 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
370 }
371 SQLiteUtils::ResetStatement(stmt, true, errCode);
372 }
373
UpdateDiffType(int64_t begin)374 void UpdateDiffType(int64_t begin)
375 {
376 std::vector<std::string> hash = {"DEC", "update_", "insert_"};
377 std::vector<std::string> name = {
378 g_cloudAsset.name + std::to_string(0),
379 g_cloudAsset.name + std::to_string(1),
380 g_cloudAsset.name + std::to_string(3) // 3 is insert id
381 };
382 std::vector<VBucket> record;
383 std::vector<VBucket> extend;
384 Assets assets;
385 for (int i = 0; i < 3; i ++) { // 3 is type num
386 Asset asset = g_cloudAsset;
387 asset.name = name[i];
388 asset.hash = hash[i];
389 assets.push_back(asset);
390 }
391 VBucket data;
392 data.insert_or_assign("name", "Cloud" + std::to_string(0));
393 data.insert_or_assign("id", 0L);
394 data.insert_or_assign("asserts", assets);
395 Timestamp now = TimeHelper::GetSysCurrentTime();
396 VBucket log;
397 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
398 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
399 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
401 record.push_back(data);
402 extend.push_back(log);
403 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
404 }
405
CheckDiffTypeAsset(sqlite3 * & db)406 void CheckDiffTypeAsset(sqlite3 *&db)
407 {
408 std::vector<std::string> names = {
409 g_cloudAsset.name + std::to_string(0),
410 g_cloudAsset.name + std::to_string(1),
411 g_cloudAsset.name + std::to_string(3) // 3 is insert id
412 };
413 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
414 sqlite3_stmt *stmt = nullptr;
415 int index = 0;
416 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
417 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
418 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
419 Type cloudValue;
420 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
421 std::vector<uint8_t> assetsBlob;
422 Assets assets;
423 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
424 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
425 for (const Asset &asset: assets) {
426 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
427 ASSERT_EQ(asset.name, names[index]);
428 index++;
429 }
430 }
431 int errCode;
432 SQLiteUtils::ResetStatement(stmt, true, errCode);
433 }
434
CheckAssetForAssetTest006()435 void CheckAssetForAssetTest006()
436 {
437 VBucket extend;
438 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
439 std::vector<VBucket> data;
440 g_virtualCloudDb->Query(g_tables[1], extend, data);
441 for (size_t j = 0; j < data.size(); ++j) {
442 ASSERT_NE(data[j].find("asserts"), data[j].end());
443 ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
444 Assets &assets = std::get<Assets>(data[j]["asserts"]);
445 ASSERT_TRUE(assets.size() > 0);
446 Asset &asset = assets[0];
447 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
448 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
449 }
450 }
451
CheckFillAssetForTest10(sqlite3 * & db)452 void CheckFillAssetForTest10(sqlite3 *&db)
453 {
454 std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
455 sqlite3_stmt *stmt = nullptr;
456 int index = 0;
457 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
458 int suffixId = 6;
459 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
460 if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
461 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
462 Type cloudValue;
463 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
464 std::vector<uint8_t> assetBlob;
465 Asset asset;
466 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
467 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
468 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
469 if (index == 0) {
470 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471 } else if (index == 1) {
472 ASSERT_EQ(asset.name, "Test10");
473 } else {
474 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
475 ASSERT_EQ(asset.uri, "/data/test");
476 }
477 } else {
478 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
479 }
480 index++;
481 }
482 int errCode;
483 SQLiteUtils::ResetStatement(stmt, true, errCode);
484 }
485
CheckFillAssetsForTest10(sqlite3 * & db)486 void CheckFillAssetsForTest10(sqlite3 *&db)
487 {
488 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
489 sqlite3_stmt *stmt = nullptr;
490 int index = 0;
491 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
492 int insertIndex = 2;
493 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
494 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
495 Type cloudValue;
496 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
497 std::vector<uint8_t> assetsBlob;
498 Assets assets;
499 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
500 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
501 if (index == 0) {
502 ASSERT_EQ(assets.size(), 2u);
503 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
504 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
505 } else if (index == 1) {
506 ASSERT_EQ(assets.size(), 3u);
507 ASSERT_EQ(assets[insertIndex].name, "Test10");
508 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
509 } else if (index == 2) { // 2 is the third element
510 ASSERT_EQ(assets.size(), 1u);
511 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
512 } else {
513 ASSERT_EQ(assets.size(), 2u);
514 ASSERT_EQ(assets[1].uri, "/data/test");
515 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
516 }
517 index++;
518 }
519 int errCode;
520 SQLiteUtils::ResetStatement(stmt, true, errCode);
521 }
522
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)523 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
524 {
525 if (count != 1) {
526 return 0;
527 }
528 auto expectCount = reinterpret_cast<int64_t>(data);
529 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
530 return 0;
531 }
532
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")533 void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
534 {
535 for (size_t i = 0; i < g_tables.size(); ++i) {
536 string queryDownload = "select count(*) from " + g_tables[i] + " where name "
537 + " like '" + keyStr + "%'";
538 EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
539 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
540 }
541 }
542
CheckCloudTotalCount(std::vector<int64_t> expectCounts)543 void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
544 {
545 VBucket extend;
546 for (size_t i = 0; i < g_tables.size(); ++i) {
547 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
548 int64_t realCount = 0;
549 std::vector<VBucket> data;
550 g_virtualCloudDb->Query(g_tables[i], extend, data);
551 for (size_t j = 0; j < data.size(); ++j) {
552 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
553 if (entry != data[j].end() && std::get<bool>(entry->second)) {
554 continue;
555 }
556 realCount++;
557 }
558 EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
559 }
560 }
561
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)562 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
563 {
564 TableSchema tableSchema1 = {
565 .name = g_tableName1,
566 .sharedTableName = g_tableName1 + "_shared",
567 .fields = g_cloudFiled1
568 };
569 TableSchema tableSchema2 = {
570 .name = g_tableName2,
571 .sharedTableName = g_tableName2 + "_shared",
572 .fields = g_cloudFiled2
573 };
574 TableSchema tableSchemaWithOutPrimaryKey = {
575 .name = g_tableName3,
576 .sharedTableName = g_tableName3 + "_shared",
577 .fields = g_cloudFiledWithOutPrimaryKey3
578 };
579 TableSchema tableSchema4 = {
580 .name = g_tableName4,
581 .sharedTableName = g_tableName4 + "_shared",
582 .fields = g_cloudFiled2
583 };
584 dataBaseSchema.tables.push_back(tableSchema1);
585 dataBaseSchema.tables.push_back(tableSchema2);
586 dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
587 dataBaseSchema.tables.push_back(tableSchema4);
588 }
589
590
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)591 void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
592 {
593 TableSchema tableSchema1 = {
594 .name = g_tableName1,
595 .sharedTableName = "",
596 .fields = g_invalidCloudFiled1
597 };
598 TableSchema tableSchema2 = {
599 .name = g_tableName2,
600 .sharedTableName = "",
601 .fields = g_cloudFiled2
602 };
603 dataBaseSchema.tables.push_back(tableSchema1);
604 dataBaseSchema.tables.push_back(tableSchema2);
605 }
606
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)607 void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
608 std::vector<SyncProcess> &expectProcess)
609 {
610 expectProcess.clear();
611 std::vector<TableProcessInfo> infos;
612 uint32_t index = 1;
613 infos.push_back(TableProcessInfo{
614 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615 });
616 infos.push_back(TableProcessInfo{
617 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
618 });
619
620 infos.push_back(TableProcessInfo{
621 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622 });
623 infos.push_back(TableProcessInfo{
624 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
625 });
626
627 infos.push_back(TableProcessInfo{
628 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629 });
630 infos.push_back(TableProcessInfo{
631 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
632 });
633
634 infos.push_back(TableProcessInfo{
635 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
636 });
637 infos.push_back(TableProcessInfo{
638 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
639 });
640
641 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
642 SyncProcess syncProcess;
643 syncProcess.errCode = OK;
644 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
645 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
646 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
647 expectProcess.push_back(syncProcess);
648 }
649 }
650
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)651 void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
652 {
653 expectProcess.clear();
654 std::vector<TableProcessInfo> infos;
655 // first notify, first table
656 infos.push_back(TableProcessInfo{
657 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
658 });
659 // first notify, second table
660 infos.push_back(TableProcessInfo{
661 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
662 });
663 // second notify, first table
664 infos.push_back(TableProcessInfo{
665 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
666 });
667 // second notify, second table
668 infos.push_back(TableProcessInfo{
669 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
670 });
671
672 infos.push_back(TableProcessInfo{
673 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
674 });
675 // second notify, second table
676 infos.push_back(TableProcessInfo{
677 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
678 });
679 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
680 SyncProcess syncProcess;
681 syncProcess.errCode = OK;
682 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
683 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
684 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
685 expectProcess.push_back(syncProcess);
686 }
687 }
688
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)689 void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
690 std::vector<SyncProcess> &expectProcess)
691 {
692 expectProcess.clear();
693 std::vector<TableProcessInfo> infos;
694 uint32_t index = 1;
695 infos.push_back(TableProcessInfo{
696 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697 });
698 infos.push_back(TableProcessInfo{
699 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
700 });
701
702 infos.push_back(TableProcessInfo{
703 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704 });
705 infos.push_back(TableProcessInfo{
706 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
707 });
708
709 infos.push_back(TableProcessInfo{
710 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
711 });
712 infos.push_back(TableProcessInfo{
713 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
714 });
715
716 infos.push_back(TableProcessInfo{
717 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
718 });
719 infos.push_back(TableProcessInfo{
720 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
721 });
722
723 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
724 SyncProcess syncProcess;
725 syncProcess.errCode = OK;
726 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
727 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
728 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
729 expectProcess.push_back(syncProcess);
730 }
731 }
732
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)733 void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
734 std::vector<SyncProcess> &expectProcess)
735 {
736 expectProcess.clear();
737 std::vector<TableProcessInfo> infos;
738 uint32_t index = 1;
739 infos.push_back(TableProcessInfo{
740 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741 });
742 infos.push_back(TableProcessInfo{
743 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
744 });
745
746 infos.push_back(TableProcessInfo{
747 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748 });
749 infos.push_back(TableProcessInfo{
750 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
751 });
752
753 infos.push_back(TableProcessInfo{
754 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755 });
756 infos.push_back(TableProcessInfo{
757 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
758 });
759
760 infos.push_back(TableProcessInfo{
761 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
762 });
763 infos.push_back(TableProcessInfo{
764 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
765 });
766
767 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
768 SyncProcess syncProcess;
769 syncProcess.errCode = OK;
770 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
771 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
772 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
773 expectProcess.push_back(syncProcess);
774 }
775 }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)776 void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
777 std::vector<SyncProcess> &expectProcess)
778 {
779 g_syncIndex = 0;
780 callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
781 LOGI("devices size = %d", process.size());
782 ASSERT_EQ(process.size(), 1u);
783 syncProcess = std::move(process.begin()->second);
784 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
785 ASSERT_NE(syncProcess.tableProcess.empty(), true);
786 LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
787 std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
788 auto table1 = syncProcess.tableProcess.find(item);
789 if (table1 != syncProcess.tableProcess.end()) {
790 LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
791 "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
792 item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
793 table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
794 table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
795 table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
796 table1->second.upLoadInfo.failCount);
797 }
798 });
799 if (expectProcess.empty()) {
800 if (syncProcess.process == FINISHED) {
801 g_processCondition.notify_one();
802 }
803 return;
804 }
805 ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
806 for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
807 SyncProcess head = expectProcess[g_syncIndex];
808 for (auto &expect : head.tableProcess) {
809 auto real = syncProcess.tableProcess.find(expect.first);
810 ASSERT_NE(real, syncProcess.tableProcess.end());
811 EXPECT_EQ(expect.second.process, real->second.process);
812 EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
813 EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
814 EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
815 EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
816 EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
817 EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
818 EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
819 EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
820 }
821 }
822 g_syncIndex++;
823 if (syncProcess.process == FINISHED) {
824 g_processCondition.notify_one();
825 }
826 };
827 }
828
CheckAllAssetAfterUpload(int64_t localCount)829 void CheckAllAssetAfterUpload(int64_t localCount)
830 {
831 VBucket extend;
832 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
833 std::vector<VBucket> data1;
834 g_virtualCloudDb->Query(g_tables[0], extend, data1);
835 for (size_t j = 0; j < data1.size(); ++j) {
836 Type entry;
837 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
838 ASSERT_TRUE(isExisted);
839 Asset asset = std::get<Asset>(entry);
840 bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
841 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
842 EXPECT_EQ(asset.version, baseAsset.version);
843 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
844 EXPECT_EQ(asset.uri, baseAsset.uri);
845 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
846 EXPECT_EQ(asset.createTime, baseAsset.createTime);
847 EXPECT_EQ(asset.size, baseAsset.size);
848 EXPECT_EQ(asset.hash, baseAsset.hash);
849 }
850
851 std::vector<VBucket> data2;
852 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
853 g_virtualCloudDb->Query(g_tables[1], extend, data2);
854 for (size_t j = 0; j < data2.size(); ++j) {
855 Type entry;
856 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
857 ASSERT_TRUE(isExisted);
858 Assets assets = std::get<Assets>(entry);
859 Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
860 int index = static_cast<int>(j);
861 for (const auto &asset: assets) {
862 EXPECT_EQ(asset.version, baseAsset.version);
863 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
864 EXPECT_EQ(asset.uri, baseAsset.uri);
865 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
866 EXPECT_EQ(asset.createTime, baseAsset.createTime);
867 EXPECT_EQ(asset.size, baseAsset.size);
868 EXPECT_EQ(asset.hash, baseAsset.hash);
869 }
870 }
871 }
872
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)873 void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
874 {
875 string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
876 for (int64_t i = 0; i < localCount; ++i) {
877 queryDownload += "'" + std::to_string(i) + "',";
878 }
879 queryDownload.pop_back();
880 queryDownload += ");";
881 sqlite3_stmt *stmt = nullptr;
882 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
883 int index = 0;
884 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
885 std::vector<uint8_t> blobValue;
886 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
887 Assets assets;
888 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
889 bool isLocal = index >= localCount / g_arrayHalfSub;
890 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
891 int nameIndex = index;
892 for (const auto &asset: assets) {
893 EXPECT_EQ(asset.version, baseAsset.version);
894 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
895 EXPECT_EQ(asset.uri, baseAsset.uri);
896 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
897 EXPECT_EQ(asset.createTime, baseAsset.createTime);
898 EXPECT_EQ(asset.size, baseAsset.size);
899 EXPECT_EQ(asset.hash, baseAsset.hash);
900 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
901 nameIndex++;
902 }
903 index++;
904 }
905 int errCode;
906 SQLiteUtils::ResetStatement(stmt, true, errCode);
907 }
908
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)909 void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
910 {
911 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
912 for (int64_t i = 0; i < localCount; ++i) {
913 queryDownload += "'" + std::to_string(i) + "',";
914 }
915 queryDownload.pop_back();
916 queryDownload += ");";
917 sqlite3_stmt *stmt = nullptr;
918 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
919 int index = 0;
920 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
921 std::vector<uint8_t> blobValue;
922 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
923 Asset asset;
924 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
925 bool isCloud = index >= localCount;
926 Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
927 EXPECT_EQ(asset.version, baseAsset.version);
928 EXPECT_EQ(asset.name,
929 baseAsset.name + std::to_string(isCloud ? index - localCount / g_arrayHalfSub : index));
930 EXPECT_EQ(asset.uri, baseAsset.uri);
931 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
932 EXPECT_EQ(asset.createTime, baseAsset.createTime);
933 EXPECT_EQ(asset.size, baseAsset.size);
934 EXPECT_EQ(asset.hash, baseAsset.hash);
935 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
936 index++;
937 }
938 int errCode;
939 SQLiteUtils::ResetStatement(stmt, true, errCode);
940 }
941
UpdateCloudAssetForDownloadAssetTest003()942 void UpdateCloudAssetForDownloadAssetTest003()
943 {
944 VBucket data;
945 std::vector<uint8_t> photo(1, 'x');
946 data.insert_or_assign("name", "Cloud" + std::to_string(0));
947 data.insert_or_assign("photo", photo);
948 data.insert_or_assign("assert", g_cloudAsset);
949 Timestamp now = TimeHelper::GetSysCurrentTime();
950 VBucket log;
951 std::vector<VBucket> record;
952 std::vector<VBucket> extend;
953 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
954 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
955 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
956 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957 record.push_back(data);
958 extend.push_back(log);
959 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
960 }
961
CheckAssetForDownloadAssetTest003(sqlite3 * & db)962 void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
963 {
964 string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
965 sqlite3_stmt *stmt = nullptr;
966 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
967 int index = 0;
968 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
969 std::vector<uint8_t> blobValue;
970 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
971 Asset asset;
972 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
973 EXPECT_EQ(asset.name, g_cloudAsset.name);
974 EXPECT_EQ(asset.hash, g_cloudAsset.hash);
975 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
976 index++;
977 }
978 int errCode;
979 SQLiteUtils::ResetStatement(stmt, true, errCode);
980 }
981
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)982 void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
983 {
984 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
985 for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
986 queryDownload += "'" + std::to_string(i) + "',";
987 }
988 queryDownload.pop_back();
989 queryDownload += ");";
990 sqlite3_stmt *stmt = nullptr;
991 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
992 int index = 0;
993 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
994 std::vector<uint8_t> blobValue;
995 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
996 Asset asset;
997 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
998 EXPECT_EQ(asset.version, g_cloudAsset.version);
999 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1000 index++;
1001 }
1002 int errCode;
1003 SQLiteUtils::ResetStatement(stmt, true, errCode);
1004 }
1005
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1006 void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1007 {
1008 VBucket data;
1009 std::vector<uint8_t> photo(1, 'v');
1010 data.insert_or_assign("name", "Local" + std::to_string(0));
1011 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1012 data.insert_or_assign("married", false);
1013 data.insert_or_assign("age", 13L);
1014 data.insert_or_assign("photo", photo);
1015 Asset asset = g_cloudAsset;
1016 asset.name = asset.name + std::to_string(0);
1017 data.insert_or_assign("assert", asset);
1018 record.push_back(data);
1019 VBucket log;
1020 Timestamp now = TimeHelper::GetSysCurrentTime();
1021 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1022 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024 log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1025 extend.push_back(log);
1026 }
1027
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1028 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1029 {
1030 std::unique_lock<std::mutex> lock(g_processMutex);
1031 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1032 return syncProcess.process == FINISHED;
1033 });
1034 ASSERT_EQ(result, true);
1035 LOGD("-------------------sync end--------------");
1036 }
1037
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1038 void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1039 {
1040 g_syncProcess = {};
1041 Query query = Query::Select().FromTable(tableNames);
1042 std::vector<SyncProcess> expectProcess;
1043 CloudSyncStatusCallback callback;
1044 GetCallback(g_syncProcess, callback, expectProcess);
1045 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1046 if (dbStatus == DBStatus::OK) {
1047 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1048 }
1049 }
1050
CloseDb()1051 void CloseDb()
1052 {
1053 if (g_delegate != nullptr) {
1054 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1055 g_delegate = nullptr;
1056 }
1057 if (g_observer != nullptr) {
1058 delete g_observer;
1059 g_observer = nullptr;
1060 }
1061 g_virtualCloudDb = nullptr;
1062 }
1063
InitMockAssetLoader(DBStatus & status,int & index)1064 void InitMockAssetLoader(DBStatus &status, int &index)
1065 {
1066 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1067 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1068 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1069 .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1070 std::map<std::string, Assets> &assets) {
1071 LOGD("Download GID:%s", gid.c_str());
1072 for (auto &item: assets) {
1073 for (auto &asset: item.second) {
1074 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1075 EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1076 lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1077 LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1078 asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1079 }
1080 }
1081 return status;
1082 });
1083 }
1084
1085 class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1086 public:
1087 static void SetUpTestCase(void);
1088 static void TearDownTestCase(void);
1089 void SetUp();
1090 void TearDown();
1091 protected:
1092 sqlite3 *db = nullptr;
1093 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
1094 };
1095
1096
SetUpTestCase(void)1097 void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1098 {
1099 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1100 g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1101 LOGI("The test db is:%s", g_testDir.c_str());
1102 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1103 }
1104
TearDownTestCase(void)1105 void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1106 {}
1107
SetUp(void)1108 void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1109 {
1110 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1111 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1112 LOGE("rm test db files error.");
1113 }
1114 DistributedDBToolsUnitTest::PrintTestCaseInfo();
1115 LOGD("Test dir is %s", g_testDir.c_str());
1116 db = RelationalTestUtils::CreateDataBase(g_storePath);
1117 ASSERT_NE(db, nullptr);
1118 CreateUserDBAndTable(db);
1119 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1120 ASSERT_NE(g_observer, nullptr);
1121 ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1122 g_delegate), DBStatus::OK);
1123 ASSERT_NE(g_delegate, nullptr);
1124 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1125 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1126 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1127 g_virtualCloudDb = make_shared<VirtualCloudDb>();
1128 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1129 g_syncProcess = {};
1130 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1131 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1132 // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1133 Query query = Query::Select().FromTable(g_tables);
1134 CloudSyncStatusCallback callback;
1135 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1136 DBStatus::SCHEMA_MISMATCH);
1137 DataBaseSchema dataBaseSchema;
1138 GetCloudDbSchema(dataBaseSchema);
1139 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1140 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1141 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1142 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1143 }
1144
TearDown(void)1145 void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1146 {
1147 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1148 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1149 LOGE("rm test db files error.");
1150 }
1151 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1152 communicatorAggregator_ = nullptr;
1153 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
1154 }
1155
1156 /**
1157 * @tc.name: CloudSyncTest001
1158 * @tc.desc: Cloud data is older than local data.
1159 * @tc.type: FUNC
1160 * @tc.require:
1161 * @tc.author: bty
1162 */
1163 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level1)
1164 {
1165 int64_t paddingSize = 10;
1166 int64_t cloudCount = 20;
1167 int64_t localCount = cloudCount / g_arrayHalfSub;
1168 ChangedData changedDataForTable1;
1169 ChangedData changedDataForTable2;
1170 changedDataForTable1.tableName = g_tableName1;
1171 changedDataForTable2.tableName = g_tableName2;
1172 changedDataForTable1.field.push_back(std::string("name"));
1173 changedDataForTable2.field.push_back(std::string("id"));
1174 for (int i = 0; i < cloudCount; i++) {
1175 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1176 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1177 }
1178 g_observer->SetExpectedResult(changedDataForTable1);
1179 g_observer->SetExpectedResult(changedDataForTable2);
1180 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1181 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1182 Query query = Query::Select().FromTable(g_tables);
1183 std::vector<SyncProcess> expectProcess;
1184 InitProcessForTest1(cloudCount, localCount, expectProcess);
1185 CloudSyncStatusCallback callback;
1186 GetCallback(g_syncProcess, callback, expectProcess);
1187 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1188 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1189 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1190 g_observer->ClearChangedData();
1191 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1192 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1193 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1194 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1195 CloseDb();
1196 }
1197
1198 /**
1199 * @tc.name: CloudSyncTest002
1200 * @tc.desc: Local data is older than cloud data.
1201 * @tc.type: FUNC
1202 * @tc.require:
1203 * @tc.author: bty
1204 */
1205 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level1)
1206 {
1207 int64_t localCount = 20;
1208 int64_t cloudCount = 10;
1209 int64_t paddingSize = 100;
1210 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1211 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1212 Query query = Query::Select().FromTable(g_tables);
1213 std::vector<SyncProcess> expectProcess;
1214 InitProcessForTest2(cloudCount, localCount, expectProcess);
1215 CloudSyncStatusCallback callback;
1216 GetCallback(g_syncProcess, callback, expectProcess);
1217 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1218 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1219 LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1220 CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1221 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1222 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1223 CloseDb();
1224 }
1225
1226 /**
1227 * @tc.name: CloudSyncTest003
1228 * @tc.desc: test with update and delete operator
1229 * @tc.type: FUNC
1230 * @tc.require:
1231 * @tc.author: bty
1232 */
1233 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level1)
1234 {
1235 int64_t paddingSize = 10;
1236 int cloudCount = 20;
1237 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1238 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1239 Query query = Query::Select().FromTable(g_tables);
1240 std::vector<SyncProcess> expectProcess;
1241 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1242 CloudSyncStatusCallback callback;
1243 GetCallback(g_syncProcess, callback, expectProcess);
1244 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1245 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1246 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1247 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1248
1249 int updateCount = 10;
1250 UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1251 g_syncProcess = {};
1252 InitProcessForTest1(cloudCount, updateCount, expectProcess);
1253 GetCallback(g_syncProcess, callback, expectProcess);
1254 LOGD("-------------------sync after update--------------");
1255 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1256 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1257
1258 VBucket extend;
1259 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1260 std::vector<VBucket> data1;
1261 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1262 for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1263 EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1264 }
1265
1266 std::vector<VBucket> data2;
1267 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1268 for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1269 EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1270 }
1271
1272 int deleteCount = 3;
1273 DeleteUserTableRecord(db, 0, deleteCount);
1274 g_syncProcess = {};
1275 InitProcessForTest1(updateCount, deleteCount, expectProcess);
1276 GetCallback(g_syncProcess, callback, expectProcess);
1277 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1278 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1279
1280 CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1281 CloseDb();
1282 }
1283
1284 /**
1285 * @tc.name: CloudSyncTest004
1286 * @tc.desc: Random write of local and cloud data
1287 * @tc.type: FUNC
1288 * @tc.require:
1289 * @tc.author: bty
1290 */
1291 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level1)
1292 {
1293 int64_t paddingSize = 1024 * 8;
1294 vector<thread> threads;
1295 int cloudCount = 1024;
1296 threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1297 threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1298 for (auto &thread: threads) {
1299 thread.join();
1300 }
1301 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1302 CloseDb();
1303 }
1304
1305 /**
1306 * @tc.name: CloudSyncTest005
1307 * @tc.desc: sync with device sync query
1308 * @tc.type: FUNC
1309 * @tc.require:
1310 * @tc.author: bty
1311 */
1312 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level1)
1313 {
1314 Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1315 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1316 DBStatus::NOT_SUPPORT);
1317
1318 query = Query::Select();
1319 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1320 DBStatus::INVALID_ARGS);
1321 CloseDb();
1322 }
1323
1324 /**
1325 * @tc.name: CloudSyncTest006
1326 * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1327 * @tc.type: FUNC
1328 * @tc.require:
1329 * @tc.author: wanyi
1330 */
1331 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level1)
1332 {
1333 int64_t paddingSize = 10;
1334 int cloudCount = 20;
1335 ChangedData changedDataForTable1;
1336 ChangedData changedDataForTable2;
1337 changedDataForTable1.tableName = g_tableName1;
1338 changedDataForTable2.tableName = g_tableName2;
1339 changedDataForTable1.field.push_back(std::string("name"));
1340 changedDataForTable2.field.push_back(std::string("id"));
1341 for (int i = 0; i < cloudCount; i++) {
1342 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1343 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1344 }
1345 g_observer->SetExpectedResult(changedDataForTable1);
1346 g_observer->SetExpectedResult(changedDataForTable2);
1347 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1348 InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1349 // Set correct cloudDbSchema (correct version)
1350 DataBaseSchema correctSchema;
1351 GetCloudDbSchema(correctSchema);
1352 ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1353 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1354 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1355 g_observer->ClearChangedData();
1356 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1357 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1358 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1359 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1360
1361 // Reset cloudDbSchema (invalid version - null)
1362 DataBaseSchema nullSchema;
1363 ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1364 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1365
1366 // Reset cloudDbSchema (invalid version - field mismatch)
1367 DataBaseSchema invalidSchema;
1368 GetInvalidCloudDbSchema(invalidSchema);
1369 ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1370 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1371 CloseDb();
1372 }
1373
1374 /**
1375 * @tc.name: CloudSyncTest007
1376 * @tc.desc: Check the asset types after sync
1377 * @tc.type: FUNC
1378 * @tc.require:
1379 * @tc.author: bty
1380 */
1381 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1382 {
1383 int64_t paddingSize = 100;
1384 int localCount = 20;
1385 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1386 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1387 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1388
1389 CheckAssetAfterDownload(db, localCount);
1390 CheckAllAssetAfterUpload(localCount);
1391 CheckAssetsAfterDownload(db, localCount);
1392 CloseDb();
1393 }
1394
1395 /*
1396 * @tc.name: CloudSyncTest008
1397 * @tc.desc: Test sync with invalid param
1398 * @tc.type: FUNC
1399 * @tc.require:
1400 * @tc.author: zhangqiquan
1401 */
1402 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level1)
1403 {
1404 ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK); // it will not happen because cloudb has been set in SetUp()
1405 Query query = Query::Select().FromTable({g_tableName3});
1406 // clouddb has been set in SetUp() and it's not null
1407 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1408 CloseDb();
1409 }
1410
1411 /**
1412 * @tc.name: CloudSyncTest009
1413 * @tc.desc: The second time there was no data change and sync was called.
1414 * @tc.type: FUNC
1415 * @tc.require:
1416 * @tc.author: bty
1417 */
1418 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level1)
1419 {
1420 int64_t paddingSize = 10;
1421 int cloudCount = 20;
1422 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1423 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1424 Query query = Query::Select().FromTable(g_tables);
1425 std::vector<SyncProcess> expectProcess;
1426 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1427 CloudSyncStatusCallback callback;
1428 GetCallback(g_syncProcess, callback, expectProcess);
1429 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1430 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1431 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1432 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1433 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1434 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1435
1436 g_syncProcess = {};
1437 InitProcessForTest9(cloudCount, 0, expectProcess);
1438 GetCallback(g_syncProcess, callback, expectProcess);
1439 LOGD("--------------the second sync-------------");
1440 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1441 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1442 CloseDb();
1443 }
1444
1445 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level1)
1446 {
1447 int64_t paddingSize = 10;
1448 int cloudCount = 20;
1449 int localCount = 10;
1450 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1451 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1452 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1453
1454 int rowid = 27;
1455 UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1456 UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1457 UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1458 UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1459
1460 int id = 0;
1461 UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1462 UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1463 UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1464 UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1465
1466 LOGD("--------------the second sync-------------");
1467 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1468
1469 CheckFillAssetForTest10(db);
1470 CheckFillAssetsForTest10(db);
1471 CloseDb();
1472 }
1473
1474 /**
1475 * @tc.name: CloudSyncTest011
1476 * @tc.desc: Test sync with same table name.
1477 * @tc.type: FUNC
1478 * @tc.require:
1479 * @tc.author: zhangqiquan
1480 */
1481 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level1)
1482 {
1483 Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1484 bool syncFinish = false;
1485 std::mutex syncMutex;
1486 std::condition_variable cv;
1487 std::atomic<int> callCount = 0;
1488 CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anona0f839580602( const std::map<std::string, SyncProcess> &onProcess) 1489 const std::map<std::string, SyncProcess> &onProcess) {
1490 ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1491 SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1492 callCount++;
1493 if (syncProcess.process == FINISHED) {
1494 std::lock_guard<std::mutex> autoLock(syncMutex);
1495 syncFinish = true;
1496 }
1497 cv.notify_all();
1498 };
1499 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1500 std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anona0f839580702() 1501 cv.wait(uniqueLock, [&syncFinish]() {
1502 return syncFinish;
1503 });
1504 EXPECT_EQ(callCount, 2); // 2 is onProcess count
1505 CloseDb();
1506 }
1507
1508 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level1)
1509 {
1510 int64_t localCount = 20;
1511 int64_t cloudCount = 10;
1512 int64_t paddingSize = 10;
1513 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1514 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1515 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1516
1517 InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1518 InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1519 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1520
1521 InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1522 InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1523 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1524
1525
1526 InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1527 InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1528 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1529 CloseDb();
1530 }
1531
1532 /*
1533 * @tc.name: CloudSyncTest013
1534 * @tc.desc: test increment watermark when cloud db query data size is 0
1535 * @tc.type: FUNC
1536 * @tc.require:
1537 * @tc.author: zhuwentao
1538 */
1539 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level1)
1540 {
1541 /**
1542 * @tc.steps: insert some data into cloud db
1543 * @tc.expected: return ok.
1544 */
1545 int64_t paddingSize = 10;
1546 int64_t cloudCount = 10;
1547 SyncProcess syncProcess;
1548 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1549 /**
1550 * @tc.steps: try to cloud sync
1551 * @tc.expected: return ok.
1552 */
1553 Query query = Query::Select().FromTable(g_tables);
__anona0f839580802(const std::map<std::string, SyncProcess> &process) 1554 CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1555 LOGI("devices size = %d", process.size());
1556 ASSERT_EQ(process.size(), 1u);
1557 syncProcess = std::move(process.begin()->second);
1558 if (syncProcess.process == FINISHED) {
1559 g_processCondition.notify_one();
1560 }
1561 };
1562 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1563 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1564 uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1565 /**
1566 * @tc.steps: insert some increment data into cloud db
1567 * @tc.expected: return ok.
1568 */
1569 VBucket data;
1570 Timestamp now = TimeHelper::GetSysCurrentTime();
1571 data.insert_or_assign("name", "Cloud" + std::to_string(0));
1572 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1573 data.insert_or_assign("married", false);
1574 data.insert_or_assign("age", 13L);
1575 VBucket log;
1576 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1577 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1578 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1579 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1580 log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1581 g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1582 syncProcess.process = PREPARED;
1583 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1584 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1585 uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1586 ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1587 CloseDb();
1588 }
1589
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1590 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1591 {
1592 std::mutex dataMutex;
1593 std::condition_variable cv;
1594 bool finish = false;
1595 DBStatus res = OK;
1596 CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1597 const std::map<std::string, SyncProcess> &process) {
1598 std::map<std::string, SyncProcess> syncProcess;
1599 {
1600 std::lock_guard<std::mutex> autoLock(dataMutex);
1601 syncProcess = process;
1602 if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1603 finish = true;
1604 }
1605 res = syncProcess[DEVICE_CLOUD].errCode;
1606 }
1607 cv.notify_one();
1608 };
1609 Query query = Query::Select().FromTable({g_tableName3});
1610 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1611 {
1612 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1613 cv.wait(uniqueLock, [&finish] {
1614 return finish;
1615 });
1616 }
1617 EXPECT_EQ(res, expectStatus);
1618 }
1619
1620 /*
1621 * @tc.name: CloudSyncTest015
1622 * @tc.desc: Test sync with cloud error
1623 * @tc.type: FUNC
1624 * @tc.require:
1625 * @tc.author: zhangqiquan
1626 */
1627 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level1)
1628 {
1629 g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1630 TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1631
1632 g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1633 TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1634
1635 g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1636 TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1637
1638 g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1639 TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1640
1641 g_virtualCloudDb->SetActionStatus(DB_ERROR);
1642 TestSyncForStatus(g_delegate, CLOUD_ERROR);
1643
1644 g_virtualCloudDb->SetActionStatus(OK);
1645 CloseDb();
1646 }
1647
1648 /*
1649 * @tc.name: CloudSyncTest014
1650 * @tc.desc: Test sync with s4
1651 * @tc.type: FUNC
1652 * @tc.require:
1653 * @tc.author: zhangqiquan
1654 */
1655 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level1)
1656 {
1657 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1658 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1659
1660 // sync failed because get security option failed
__anona0f839580b02(const std::string&, SecurityOption &option) 1661 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1662 option.securityLabel = S0;
1663 return DB_ERROR;
1664 });
1665 Query query = Query::Select().FromTable({g_tableName3});
1666 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1667 SECURITY_OPTION_CHECK_ERROR);
1668
1669 // sync failed because get S4
__anona0f839580c02(const std::string&, SecurityOption &option) 1670 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1671 option.securityLabel = S4;
1672 return NOT_SUPPORT;
1673 });
1674 Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1675 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1676 NOT_SUPPORT);
1677
1678 // sync failed because get S4
__anona0f839580d02(const std::string&, SecurityOption &option) 1679 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1680 option.securityLabel = S4;
1681 return OK;
1682 });
1683 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1684 SECURITY_OPTION_CHECK_ERROR);
1685
1686 // sync failed because S4 has been cached
__anona0f839580e02(const std::string&, SecurityOption &option) 1687 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1688 option.securityLabel = S0;
1689 return OK;
1690 });
1691 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1692 SECURITY_OPTION_CHECK_ERROR);
1693 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1694 CloseDb();
1695 }
1696
1697 /*
1698 * @tc.name: CloudSyncTest016
1699 * @tc.desc: Test sync when push before merge
1700 * @tc.type: FUNC
1701 * @tc.require:
1702 * @tc.author: chenchaohao
1703 */
1704 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level1)
1705 {
1706 int64_t localCount = 10;
1707 int64_t paddingSize = 10;
1708 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1709 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1710 CheckCloudTotalCount({10L, 10L});
1711 UpdateUserTableRecord(db, 0, localCount);
1712 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1713
1714 VBucket extend;
1715 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1716 std::vector<VBucket> data1;
1717 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1718 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1719 EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1720 }
1721
1722 std::vector<VBucket> data2;
1723 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1724 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1725 EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1726 }
1727
1728 CloseDb();
1729 }
1730
1731 /*
1732 * @tc.name: CloudSyncTest017
1733 * @tc.desc: Test sync to push when local data deleted and not upload to cloud
1734 * @tc.type: FUNC
1735 * @tc.require:
1736 * @tc.author: wangxiangdong
1737 */
1738 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest017, TestSize.Level1)
1739 {
1740 /**
1741 * @tc.steps: step1. make data: 20 records on local, 20 records on cloud
1742 */
1743 int64_t localCount = 20;
1744 int64_t paddingSize = 20;
1745 InsertCloudTableRecord(0, localCount, paddingSize, true);
1746 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1747 localCount = 10;
1748 /**
1749 * @tc.steps: step2. delete 10 local record before sync
1750 */
1751 DeleteUserTableRecord(db, 0, localCount);
1752 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1753 /**
1754 * @tc.steps: step3. check local and cloud num
1755 */
1756 CheckCloudTotalCount({30L, 20L});
1757 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(g_tables[0]) +
1758 " where data_key=-1 and cloud_gid='';";
1759 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback,
1760 reinterpret_cast<void *>(10), nullptr), SQLITE_OK);
1761 CloseDb();
1762 }
1763
1764 /*
1765 * @tc.name: DataNotifier001
1766 * @tc.desc: Notify data without primary key
1767 * @tc.type: FUNC
1768 * @tc.require:
1769 * @tc.author: wanyi
1770 */
1771 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level1)
1772 {
1773 int64_t paddingSize = 10;
1774 int localCount = 20;
1775 InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1776 callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1777 CloseDb();
1778 }
1779
1780 /**
1781 * @tc.name: CloudSyncAssetTest001
1782 * @tc.desc:
1783 * @tc.type: FUNC
1784 * @tc.require:
1785 * @tc.author: wanyi
1786 */
1787 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1788 {
1789 int64_t paddingSize = 100;
1790 int localCount = 20;
1791 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1792 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1793 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1794
1795 CheckAssetAfterDownload(db, localCount);
1796 CheckAllAssetAfterUpload(localCount);
1797 CloseDb();
1798 }
1799
1800 /*
1801 * @tc.name: MannualNotify001
1802 * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1803 * @tc.type: FUNC
1804 * @tc.require:
1805 * @tc.author: huangboxin
1806 */
1807 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level1)
1808 {
1809 int64_t paddingSize = 10;
1810 int localCount = 10;
1811 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1812 Query query = Query::Select().FromTable(g_tables);
1813 std::vector<SyncProcess> expectProcess;
1814 InitProcessForMannualSync1(expectProcess);
1815 CloudSyncStatusCallback callback;
1816 GetCallback(g_syncProcess, callback, expectProcess);
1817 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1818 DBStatus::OK);
1819 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1820 CloseDb();
1821 }
1822
1823 /**
1824 * @tc.name: CloudProcessNotify001
1825 * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1826 * @tc.type: FUNC
1827 * @tc.require:
1828 * @tc.author: liufuchenxing
1829 */
1830 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1831 {
1832 /**
1833 * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1834 * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1835 */
1836 int64_t paddingSize = 10;
1837 int64_t localCount = 1;
1838 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1839 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1840 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1841 g_observer->ClearChangedData();
1842 LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1843 CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1844 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1845 CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1846
1847 /**
1848 * @tc.steps: step2. reset data
1849 * @tc.expected: step2. return ok.
1850 */
1851 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1852 g_syncProcess = {};
1853 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1854
1855 /**
1856 * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1857 * @tc.expected: step3. return ok.
1858 */
1859 VBucket idMap;
1860 idMap.insert_or_assign("#_gid", std::to_string(0));
1861 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1862
1863 /**
1864 * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1865 * @tc.expected: step4. return ok.
1866 */
1867 std::vector<VBucket> record1;
1868 std::vector<VBucket> extend1;
1869 InsertCloudForCloudProcessNotify001(record1, extend1);
1870 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1871
1872 /**
1873 * @tc.steps: step5. sync() and check local data.
1874 * @tc.expected: step5. return ok.
1875 */
1876 ChangedData changedDataForTable1;
1877 changedDataForTable1.tableName = g_tableName1;
1878 changedDataForTable1.field.push_back(std::string("name"));
1879 changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1880 g_observer->SetExpectedResult(changedDataForTable1);
1881
1882 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1883 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1884 g_observer->ClearChangedData();
1885 LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1886 // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1887 CheckDownloadResult(db, {1L, 1L}, "Local");
1888 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1889 CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1890
1891 /**
1892 * @tc.steps: step6. CloseDb().
1893 * @tc.expected: step6. return ok.
1894 */
1895 CloseDb();
1896 }
1897
1898 /*
1899 * @tc.name: CloudSyncAssetTest002
1900 * @tc.desc:
1901 * @tc.type: FUNC
1902 * @tc.require:
1903 * @tc.author: huangboxin
1904 */
1905 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level1)
1906 {
1907 int64_t paddingSize = 10;
1908 int localCount = 3;
1909 int cloudCount = 3;
1910 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1911 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1912 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1913 CloseDb();
1914 }
1915
1916 /*
1917 * @tc.name: CloudSyncAssetTest003
1918 * @tc.desc:
1919 * @tc.type: FUNC
1920 * @tc.require:
1921 * @tc.author: bty
1922 */
1923 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level1)
1924 {
1925 int64_t paddingSize = 10;
1926 int localCount = 3;
1927 int cloudCount = 3;
1928 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1929 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1930 Assets assets;
1931 assets.push_back(g_localAsset);
1932 assets.push_back(g_localAsset);
1933 UpdateLocalAssets(db, assets, 1);
1934 Query query = Query::Select().FromTable(g_tables);
1935 std::vector<SyncProcess> expectProcess;
__anona0f839580f02(const std::map<std::string, SyncProcess> &process) 1936 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1937 ASSERT_EQ(process.size(), 1u);
1938 g_syncProcess = std::move(process.begin()->second);
1939
1940 if (g_syncProcess.process == FINISHED) {
1941 g_processCondition.notify_one();
1942 }
1943 };
1944 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1945 DBStatus::OK);
1946 {
1947 std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581002() 1948 g_processCondition.wait(lock, []() {
1949 return g_syncProcess.process == FINISHED;
1950 });
1951 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1952 }
1953 CloseDb();
1954 }
1955
1956 /*
1957 * @tc.name: CloudSyncAssetTest004
1958 * @tc.desc:
1959 * @tc.type: FUNC
1960 * @tc.require:
1961 * @tc.author: bty
1962 */
1963 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level1)
1964 {
1965 int64_t paddingSize = 10;
1966 int localCount = 3;
1967 int cloudCount = 3;
1968 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1969 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1970 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1971
1972 UpdateDiffType(localCount);
1973 g_syncProcess = {};
__anona0f839581102(const std::map<std::string, SyncProcess> &process) 1974 CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1975 ASSERT_EQ(process.size(), 1u);
1976 g_syncProcess = std::move(process.begin()->second);
1977 if (g_syncProcess.process == FINISHED) {
1978 g_processCondition.notify_one();
1979 }
1980 };
1981 Query query = Query::Select().FromTable(g_tables);
1982 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1983 DBStatus::OK);
1984 {
1985 std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581202() 1986 g_processCondition.wait(lock, []() {
1987 return g_syncProcess.process == FINISHED;
1988 });
1989 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1990 }
1991 CheckDiffTypeAsset(db);
1992 CloseDb();
1993 }
1994
1995 /*
1996 * @tc.name: CloudSyncAssetTest005
1997 * @tc.desc: Test erase all no change Asset
1998 * @tc.type: FUNC
1999 * @tc.require:
2000 * @tc.author: bty
2001 */
2002 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level1)
2003 {
2004 /**
2005 * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
2006 * @tc.expected: step1. return ok.
2007 */
2008 int64_t paddingSize = 10;
2009 int localCount = 3;
2010 int cloudCount = 3;
2011 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2012 Assets assets;
2013 for (int64_t j = 0; j < cloudCount; j++) {
2014 Asset asset = g_cloudAsset;
2015 asset.name = g_cloudAsset.name + std::to_string(j);
2016 assets.push_back(asset);
2017 }
2018 UpdateLocalAssets(db, assets, 0);
2019 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2020
2021 /**
2022 * @tc.steps:step2. Construct cloud data
2023 * @tc.expected: step2. return ok.
2024 */
2025 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2026
2027 /**
2028 * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
2029 * @tc.expected: step3. return ok.
2030 */
2031 Query query = Query::Select().FromTable(g_tables);
2032 std::vector<SyncProcess> expectProcess;
__anona0f839581302(const std::map<std::string, SyncProcess> &process) 2033 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2034 ASSERT_EQ(process.size(), 1u);
2035 g_syncProcess = std::move(process.begin()->second);
2036
2037 if (g_syncProcess.process == FINISHED) {
2038 g_processCondition.notify_one();
2039 }
2040 };
2041 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2042 DBStatus::OK);
2043 {
2044 std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581402() 2045 g_processCondition.wait(lock, []() {
2046 return g_syncProcess.process == FINISHED;
2047 });
2048 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2049 }
2050 CloseDb();
2051 }
2052
2053 /*
2054 * @tc.name: CloudSyncAssetTest006
2055 * @tc.desc: Test upload new data without assets
2056 * @tc.type: FUNC
2057 * @tc.require:
2058 * @tc.author: bty
2059 */
2060 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level1)
2061 {
2062 /**
2063 * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2064 * @tc.expected: step1. return ok.
2065 */
2066 int64_t paddingSize = 10;
2067 int localCount = 6;
2068 int cloudCount = 3;
2069 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2070 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2071 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2072
2073 /**
2074 * @tc.steps:step2. sync, upload new data without assets,
2075 * @tc.expected: step2. return ok.
2076 */
2077 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2078 CloseDb();
2079 }
2080
2081 /*
2082 * @tc.name: CloudSyncAssetTest007
2083 * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2084 * regarded as NO-CHANGE, rather than delete
2085 * @tc.type: FUNC
2086 * @tc.require:
2087 * @tc.author: wanyi
2088 */
2089 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level1)
2090 {
2091 /**
2092 * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2093 * @tc.expected: step1. return ok.
2094 */
2095 int64_t paddingSize = 10;
2096 int localCount = 1;
2097 int cloudCount = 1;
2098 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2099 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2100 /**
2101 * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2102 * @tc.expected: step2. return ok.
2103 */
2104 Assets assets;
2105 for (int64_t j = 0; j < cloudCount; j++) {
2106 Asset asset = g_cloudAsset;
2107 asset.name = g_cloudAsset.name + std::to_string(j);
2108 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2109 assets.push_back(asset);
2110 }
2111 UpdateLocalAssets(db, assets, 0);
2112 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2113 /**
2114 * @tc.steps:step3. Do sync
2115 * @tc.expected: step3. return ok.
2116 */
2117 Query query = Query::Select().FromTable(g_tables);
2118 std::vector<SyncProcess> expectProcess;
__anona0f839581502(const std::map<std::string, SyncProcess> &process) 2119 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2120 ASSERT_EQ(process.size(), 1u);
2121 g_syncProcess = std::move(process.begin()->second);
2122
2123 if (g_syncProcess.process == FINISHED) {
2124 g_processCondition.notify_one();
2125 }
2126 };
2127 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2128 DBStatus::OK);
2129 {
2130 std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581602() 2131 g_processCondition.wait(lock, []() {
2132 return g_syncProcess.process == FINISHED;
2133 });
2134 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2135 }
2136 /**
2137 * @tc.steps:step4. Check result. Cloud db should not contain asset.
2138 * @tc.expected: step4. return ok.
2139 */
2140 CheckAssetForAssetTest006();
2141 CloseDb();
2142 }
2143
2144 /**
2145 * @tc.name: DownloadAssetTest001
2146 * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2147 * @tc.type: FUNC
2148 * @tc.require:
2149 * @tc.author: bty
2150 */
2151 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level1)
2152 {
2153 /**
2154 * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2155 * @tc.expected: step1. return ok.
2156 */
2157 DBStatus expectStatus = DBStatus::OK;
2158 int index = 0;
2159 InitMockAssetLoader(expectStatus, index);
2160
2161 /**
2162 * @tc.steps:step2. init download data
2163 * @tc.expected: step2. return ok.
2164 */
2165 int64_t paddingSize = 1;
2166 int localCount = 120;
2167 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2168 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2169
2170 /**
2171 * @tc.steps:step3. sync
2172 * @tc.expected: step3. return ok.
2173 */
2174 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2175
2176 /**
2177 * @tc.steps:step4. Expect all states to be normal
2178 * @tc.expected: step4. return ok.
2179 */
2180 CheckAssetAfterDownload(db, localCount);
2181 CloseDb();
2182 }
2183
2184 /*
2185 * @tc.name: CloudSyncAssetTest008
2186 * @tc.desc: sync failed with download asset
2187 * @tc.type: FUNC
2188 * @tc.require:
2189 * @tc.author: zhangqiquan
2190 */
2191 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level1)
2192 {
2193 /**
2194 * @tc.steps:step1. prepare asset data
2195 */
2196 int64_t paddingSize = 10;
2197 int localCount = 1;
2198 int cloudCount = 1;
2199 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2200 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2201 /**
2202 * @tc.steps:step2. set download asset status failed
2203 */
2204 g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2205 Query query = Query::Select().FromTable(g_tables);
2206 std::vector<SyncProcess> expectProcess;
__anona0f839581702(const std::map<std::string, SyncProcess> &process) 2207 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2208 for (const auto &item: process) {
2209 g_syncProcess = item.second;
2210 }
2211 if (g_syncProcess.process == FINISHED) {
2212 g_processCondition.notify_one();
2213 }
2214 };
2215 /**
2216 * @tc.steps:step3. sync and wait sync finished.
2217 * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2218 */
2219 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2220 DBStatus::OK);
2221 {
2222 std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581802() 2223 g_processCondition.wait(lock, []() {
2224 return g_syncProcess.process == FINISHED;
2225 });
2226 ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2227 }
2228 /**
2229 * @tc.steps:step4. clear data.
2230 */
2231 g_virtualAssetLoader->SetDownloadStatus(OK);
2232 CloseDb();
2233 }
2234
2235 /*
2236 * @tc.name: CloudSyncAssetTest009
2237 * @tc.desc:
2238 * @tc.type: FUNC
2239 * @tc.require:
2240 * @tc.author: zhangqiquan
2241 */
2242 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level1)
2243 {
2244 // insert 3 data with asset 3 data without asset into local
2245 // sync them to cloud
2246 int64_t paddingSize = 10;
2247 int localCount = 3;
2248 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2249 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2250 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2251 // update these data and sync again
2252 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2253 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2254 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2255 EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2256 CloseDb();
2257 }
2258
2259 /**
2260 * @tc.name: DownloadAssetTest002
2261 * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2262 * @tc.type: FUNC
2263 * @tc.require:
2264 * @tc.author: bty
2265 */
2266 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level1)
2267 {
2268 /**
2269 * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2270 * @tc.expected: step1. return ok.
2271 */
2272 DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2273 int index = 0;
2274 InitMockAssetLoader(expectStatus, index);
2275 int64_t paddingSize = 1;
2276 int localCount = 100;
2277
2278 /**
2279 * @tc.steps:step2. init download data
2280 * @tc.expected: step2. return ok.
2281 */
2282 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2283 InsertCloudTableRecord(0, localCount, paddingSize, false);
2284
2285 /**
2286 * @tc.steps:step3. sync
2287 * @tc.expected: step3. return ok.
2288 */
2289 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2290
2291 /**
2292 * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2293 * @tc.expected: step4. return ok.
2294 */
2295 CheckAssetAfterDownload2(db, localCount);
2296 CloseDb();
2297 }
2298
2299 /**
2300 * @tc.name: DownloadAssetTest003
2301 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2302 * @tc.type: FUNC
2303 * @tc.require:
2304 * @tc.author: bty
2305 */
2306 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level1)
2307 {
2308 /**
2309 * @tc.steps:step1. Init data and sync
2310 * @tc.expected: step1. return ok.
2311 */
2312 int64_t paddingSize = 1;
2313 int localCount = 10;
2314 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2315 InsertCloudTableRecord(0, localCount, paddingSize, false);
2316 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2317
2318 /**
2319 * @tc.steps:step2. update cloud Asset where gid = 0
2320 * @tc.expected: step2. return ok.
2321 */
2322 UpdateCloudAssetForDownloadAssetTest003();
2323
2324 /**
2325 * @tc.steps:step3. sync again
2326 * @tc.expected: step3. return ok.
2327 */
2328 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2329
2330 /**
2331 * @tc.steps:step4. check asset after download where gid = 0
2332 * @tc.expected: step4. return ok.
2333 */
2334 CheckAssetForDownloadAssetTest003(db);
2335 CloseDb();
2336 }
2337
2338 /**
2339 * @tc.name: DownloadAssetTest004
2340 * @tc.desc: Test total count, fail count and success count when drop table
2341 * @tc.type: FUNC
2342 * @tc.require:
2343 * @tc.author: liufuchenxing
2344 */
2345 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level1)
2346 {
2347 /**
2348 * @tc.steps:step1. Init data and sync
2349 * @tc.expected: step1. return ok.
2350 */
2351 int64_t paddingSize = 1;
2352 int count = 10;
2353 InsertUserTableRecord(db, 0, count, paddingSize, false);
2354 g_syncProcess = {};
__anona0f839581902(const std::map<std::string, SyncProcess> &process) 2355 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2356 for (const auto &item : process) {
2357 g_syncProcess = item.second;
2358 }
2359 if (g_syncProcess.process == FINISHED) {
2360 g_processCondition.notify_one();
2361 }
2362 };
2363 Query query = Query::Select().FromTable(g_tables);
2364 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2365 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2366
2367 /**
2368 * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2369 * @tc.expected: step2. total = 20, success=0, fail=20
2370 */
2371 g_syncProcess = {};
2372 InsertCloudTableRecord(0, count, paddingSize, false);
2373 EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2374 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2375 DBStatus::NOT_FOUND);
2376
2377 /**
2378 * @tc.steps:step3. close db.
2379 * @tc.expected: step3. close success.
2380 */
2381 CloseDb();
2382 }
2383
2384 /**
2385 * @tc.name: SchemaTest001
2386 * @tc.desc: Create table with Cloud cooperation mode and do sync
2387 * @tc.type: FUNC
2388 * @tc.require:
2389 * @tc.author: wanyi
2390 */
2391 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level1)
2392 {
2393 /**
2394 * @tc.steps:step1. Create table with Cloud cooperation mode
2395 * @tc.expected: step1. return ok.
2396 */
2397 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2398 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2399 /**
2400 * @tc.steps:step1. do sync
2401 * @tc.expected: step1. return ok.
2402 */
2403 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2404 CloseDb();
2405 }
2406
2407 /**
2408 * @tc.name: SchemaTest002
2409 * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2410 * @tc.type: FUNC
2411 * @tc.require:
2412 * @tc.author: wanyi
2413 */
2414 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level1)
2415 {
2416 /**
2417 * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2418 * @tc.expected: step1. return ok.
2419 */
2420 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2421 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2422 /**
2423 * @tc.steps:step1. do sync
2424 * @tc.expected: step1. return NOT_SUPPORT.
2425 */
2426 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2427 CloseDb();
2428 }
2429
2430 /**
2431 * @tc.name: CloudCursorTest001
2432 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2433 * @tc.type: FUNC
2434 * @tc.require:
2435 * @tc.author: bty
2436 */
2437 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2438 {
2439 /**
2440 * @tc.steps:step1. Init data and sync
2441 * @tc.expected: step1. return ok.
2442 */
2443 int64_t paddingSize = 1;
2444 int localCount = 10;
2445 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2446 InsertCloudTableRecord(0, localCount, paddingSize, true);
2447 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2448
2449 /**
2450 * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2451 * although it is unTrackerTable
2452 * @tc.expected: step2. return ok.
2453 */
2454 string sql = "select cursor from " + std::string(DBConstant::RELATIONAL_PREFIX) + g_tableName1 + "_log";
2455 sqlite3_stmt *stmt = nullptr;
2456 EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2457 int64_t index = 0;
2458 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2459 EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2460 }
2461 int errCode;
2462 SQLiteUtils::ResetStatement(stmt, true, errCode);
2463 CloseDb();
2464 }
2465
2466 /*
2467 * @tc.name: RDBSupportEncryptTest001
2468 * @tc.desc: Test sync when security label is not set and different encryption para is set
2469 * @tc.type: FUNC
2470 * @tc.require:
2471 * @tc.author: suyue
2472 */
2473 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest001, TestSize.Level1)
2474 {
2475 /**
2476 * @tc.steps: step1. sync when security label is not set and encryption is not supported
2477 * @tc.expected: step1. return ok.
2478 */
2479 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2480 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2481 Query query = Query::Select().FromTable({g_tableName3});
2482 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2483
2484 /**
2485 * @tc.steps: step2. sync when security label is not set and encryption is supported
2486 * @tc.expected: step2. return ok.
2487 */
2488 CloudSyncConfig config;
2489 config.isSupportEncrypt = true;
2490 g_delegate->SetCloudSyncConfig(config);
2491 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2492 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2493 CloseDb();
2494 }
2495
2496 /*
2497 * @tc.name: RDBSupportEncryptTest002
2498 * @tc.desc: Test sync when security label is S4 and different encryption para is set
2499 * @tc.type: FUNC
2500 * @tc.require:
2501 * @tc.author: suyue
2502 */
2503 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest002, TestSize.Level0)
2504 {
2505 /**
2506 * @tc.steps: step1. sync when security label is S4 and encryption is not supported
2507 * @tc.expected: step1. return SECURITY_OPTION_CHECK_ERROR.
2508 */
2509 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2510 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2511 Query query = Query::Select().FromTable({g_tableName3});
__anona0f839581a02(const std::string&, SecurityOption &option) 2512 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
2513 option.securityLabel = S4;
2514 return OK;
2515 });
2516 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
2517 SECURITY_OPTION_CHECK_ERROR);
2518
2519 /**
2520 * @tc.steps: step2. sync when security label is S4 and encryption is supported
2521 * @tc.expected: step2. return OK.
2522 */
2523 CloudSyncConfig config;
2524 config.isSupportEncrypt = true;
2525 g_delegate->SetCloudSyncConfig(config);
2526 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2527
2528 /**
2529 * @tc.steps: step3. sync when isSupportEncrypt is set to false for the second time
2530 * @tc.expected: step3. return SECURITY_OPTION_CHECK_ERROR.
2531 */
2532 config.isSupportEncrypt = false;
2533 g_delegate->SetCloudSyncConfig(config);
2534 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
2535 SECURITY_OPTION_CHECK_ERROR);
2536 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2537 CloseDb();
2538 }
2539
2540 /*
2541 * @tc.name: RDBSupportEncryptTest003
2542 * @tc.desc: Test sync when SecurityOption is not supported and different encryption para is set
2543 * @tc.type: FUNC
2544 * @tc.require:
2545 * @tc.author: suyue
2546 */
2547 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest003, TestSize.Level1)
2548 {
2549 /**
2550 * @tc.steps: step1. sync when unable to get security options
2551 * @tc.expected: step1. return OK whether or not encryption is supported.
2552 */
2553 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2554 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2555 Query query = Query::Select().FromTable({g_tableName3});
__anona0f839581b02(const std::string&, SecurityOption &option) 2556 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
2557 return NOT_SUPPORT;
2558 });
2559 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2560 CloudSyncConfig config;
2561 config.isSupportEncrypt = true;
2562 g_delegate->SetCloudSyncConfig(config);
2563 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2564 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2565 CloseDb();
2566 }
2567 }
2568 #endif // RELATIONAL_STORE
2569