1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "mock_asset_loader.h"
34
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 using namespace std;
39
40 namespace {
41 string g_storeID = "Relational_Store_SYNC";
42 const string g_tableName1 = "worker1";
43 const string g_tableName2 = "worker2";
44 const string g_tableName3 = "worker3";
45 const string g_tableName4 = "worker4";
46 const string DEVICE_CLOUD = "cloud_dev";
47 const string DB_SUFFIX = ".db";
48 const int64_t g_syncWaitTime = 60;
49 const int g_arrayHalfSub = 2;
50 int g_syncIndex = 0;
51 string g_testDir;
52 string g_storePath;
53 std::mutex g_processMutex;
54 std::condition_variable g_processCondition;
55 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
56 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
57 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
58 RelationalStoreObserverUnitTest *g_observer = nullptr;
59 RelationalStoreDelegate *g_delegate = nullptr;
60 SyncProcess g_syncProcess;
61 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
62 const std::string CREATE_LOCAL_TABLE_SQL =
63 "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
64 "name TEXT PRIMARY KEY," \
65 "height REAL ," \
66 "married BOOLEAN ," \
67 "photo BLOB NOT NULL," \
68 "assert BLOB," \
69 "age INT);";
70 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
71 "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
72 "id INTEGER PRIMARY KEY," \
73 "name TEXT ," \
74 "height REAL ," \
75 "photo BLOB ," \
76 "asserts BLOB," \
77 "age INT);";
78 const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
79 const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
80 "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
81 "name TEXT," \
82 "height REAL ," \
83 "married BOOLEAN ," \
84 "photo BLOB NOT NULL," \
85 "assert BLOB," \
86 "age INT);";
87 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
88 "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
89 "id INTEGER PRIMARY KEY," \
90 "name TEXT ," \
91 "height REAL ," \
92 "photo BLOB ," \
93 "asserts BLOB," \
94 "age INT);";
95 const std::vector<Field> g_cloudFiled1 = {
96 {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
97 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
98 {"assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
99 };
100 const std::vector<Field> g_invalidCloudFiled1 = {
101 {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
102 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
103 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
104 };
105 const std::vector<Field> g_cloudFiled2 = {
106 {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
107 {"height", TYPE_INDEX<double>}, {"photo", TYPE_INDEX<Bytes>},
108 {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
109 };
110 const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
111 {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
112 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
113 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
114 };
115 const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
116 const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
117 const std::vector<string> g_prefix = {"Local", ""};
118 const Asset g_localAsset = {
119 .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
120 .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
121 };
122 const Asset g_cloudAsset = {
123 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
124 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
125 };
126
CreateUserDBAndTable(sqlite3 * & db)127 void CreateUserDBAndTable(sqlite3 *&db)
128 {
129 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
130 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
131 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
132 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
133 }
134
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)135 void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
136 {
137 std::string photo(photoSize, 'v');
138 int errCode;
139 std::vector<uint8_t> assetBlob;
140 for (int64_t i = begin; i < begin + count; ++i) {
141 Asset asset = g_localAsset;
142 asset.name = asset.name + std::to_string(i);
143 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
144 string sql = "INSERT OR REPLACE INTO " + g_tableName1
145 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
146 "', '175.8', '0', '" + photo + "', ? , '18');";
147 sqlite3_stmt *stmt = nullptr;
148 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
149 if (assetIsNull) {
150 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
151 } else {
152 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
153 }
154 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
155 SQLiteUtils::ResetStatement(stmt, true, errCode);
156 }
157 for (int64_t i = begin; i < begin + count; ++i) {
158 std::vector<Asset> assets;
159 Asset asset = g_localAsset;
160 asset.name = g_localAsset.name + std::to_string(i);
161 assets.push_back(asset);
162 asset.name = g_localAsset.name + std::to_string(i + 1);
163 assets.push_back(asset);
164 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
165 string sql = "INSERT OR REPLACE INTO " + g_tableName2
166 + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
167 + std::to_string(i) + "', '155.10', '"+ photo + "', ? , '21');";
168 sqlite3_stmt *stmt = nullptr;
169 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
170 if (assetIsNull) {
171 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
172 } else {
173 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
174 }
175 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
176 SQLiteUtils::ResetStatement(stmt, true, errCode);
177 }
178 LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
179 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
180 }
181
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)182 void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
183 {
184 for (size_t i = 0; i < g_tables.size(); i++) {
185 string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
186 for (int64_t j = begin; j < begin + count; ++j) {
187 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
188 }
189 updateAge.pop_back();
190 updateAge += ");";
191 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
192 }
193 LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
194 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
195 }
196
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)197 void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
198 {
199 for (size_t i = 0; i < g_tables.size(); i++) {
200 string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
201 for (int64_t j = begin; j < count; ++j) {
202 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
203 }
204 updateAge.pop_back();
205 updateAge += ");";
206 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
207 }
208 LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
209 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
210 }
211
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)212 void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
213 {
214 std::vector<uint8_t> photo(photoSize, 'v');
215 std::string photoLocal(photoSize, 'v');
216 Asset asset = { .version = 1, .name = "Phone" };
217 std::vector<uint8_t> assetBlob;
218 RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
219 std::string assetStr(assetBlob.begin(), assetBlob.end());
220 std::vector<VBucket> record1;
221 std::vector<VBucket> extend1;
222 for (int64_t i = begin; i < count; ++i) {
223 Timestamp now = TimeHelper::GetSysCurrentTime();
224 VBucket data;
225 data.insert_or_assign("name", "Cloud" + std::to_string(i));
226 data.insert_or_assign("height", 166.0); // 166.0 is random double value
227 data.insert_or_assign("married", (bool)0);
228 data.insert_or_assign("photo", photo);
229 data.insert_or_assign("assert", KEY_1);
230 data.insert_or_assign("age", 13L);
231 record1.push_back(data);
232 VBucket log;
233 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
234 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
236 extend1.push_back(log);
237 std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wait for 1 ms
238 }
239 int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
240 ASSERT_EQ(errCode, DBStatus::OK);
241 for (int64_t i = begin; i < count; ++i) {
242 string sql = "INSERT OR REPLACE INTO " + g_tableName3
243 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
244 "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
245 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
246 }
247 }
248
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)249 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
250 {
251 std::vector<uint8_t> photo(photoSize, 'v');
252 std::vector<VBucket> record1;
253 std::vector<VBucket> extend1;
254 std::vector<VBucket> record2;
255 std::vector<VBucket> extend2;
256 Timestamp now = TimeHelper::GetSysCurrentTime();
257 for (int64_t i = begin; i < begin + count; ++i) {
258 VBucket data;
259 data.insert_or_assign("name", "Cloud" + std::to_string(i));
260 data.insert_or_assign("height", 166.0); // 166.0 is random double value
261 data.insert_or_assign("married", false);
262 data.insert_or_assign("photo", photo);
263 data.insert_or_assign("age", 13L);
264 Asset asset = g_cloudAsset;
265 asset.name = asset.name + std::to_string(i);
266 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
267 record1.push_back(data);
268 VBucket log;
269 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
270 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
272 extend1.push_back(log);
273
274 std::vector<Asset> assets;
275 data.insert_or_assign("id", i);
276 data.insert_or_assign("height", 180.3); // 180.3 is random double value
277 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
278 asset.name = g_cloudAsset.name + std::to_string(j);
279 assets.push_back(asset);
280 }
281 data.erase("assert");
282 data.erase("married");
283 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
284 record2.push_back(data);
285 extend2.push_back(log);
286 }
287 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
288 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
289 LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
290 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
291 std::this_thread::sleep_for(std::chrono::milliseconds(count));
292 }
293
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)294 void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
295 {
296 string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
297 std::vector<uint8_t> assetBlob;
298 int errCode;
299 Asset asset = g_cloudAsset;
300 asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
301 if (opType == AssetOpType::UPDATE) {
302 asset.uri = "/data/test";
303 } else if (opType == AssetOpType::INSERT) {
304 asset.name = "Test10";
305 }
306 asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
307 sqlite3_stmt *stmt = nullptr;
308 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
309 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
310 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
311 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
312 }
313 SQLiteUtils::ResetStatement(stmt, true, errCode);
314 }
315
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)316 void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
317 {
318 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
319 Asset asset1 = g_localAsset;
320 Asset asset2 = g_localAsset;
321 Assets assets;
322 asset1.name = g_localAsset.name + std::to_string(rowid);
323 asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
324 asset2.name = g_localAsset.name + std::to_string(rowid + 1);
325 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326 if (opType == AssetOpType::UPDATE) {
327 assets.push_back(asset1);
328 asset2.uri = "/data/test";
329 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
330 assets.push_back(asset2);
331 } else if (opType == AssetOpType::INSERT) {
332 assets.push_back(asset1);
333 assets.push_back(asset2);
334 Asset asset3;
335 asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
336 asset3.name = "Test10";
337 assets.push_back(asset3);
338 } else if (opType == AssetOpType::DELETE) {
339 assets.push_back(asset1);
340 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
341 assets.push_back(asset2);
342 } else {
343 assets.push_back(asset1);
344 assets.push_back(asset2);
345 }
346 sqlite3_stmt *stmt = nullptr;
347 std::vector<uint8_t> assetsBlob;
348 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
349 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
350 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
351 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
352 }
353 int errCode;
354 SQLiteUtils::ResetStatement(stmt, true, errCode);
355 }
356
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)357 void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
358 {
359 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
360 std::vector<uint8_t> assetsBlob;
361 int errCode;
362 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
363 sqlite3_stmt *stmt = nullptr;
364 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
365 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
366 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
367 }
368 SQLiteUtils::ResetStatement(stmt, true, errCode);
369 }
370
UpdateDiffType(int64_t begin)371 void UpdateDiffType(int64_t begin)
372 {
373 std::vector<std::string> hash = {"DEC", "update_", "insert_"};
374 std::vector<std::string> name = {
375 g_cloudAsset.name + std::to_string(0),
376 g_cloudAsset.name + std::to_string(1),
377 g_cloudAsset.name + std::to_string(3) // 3 is insert id
378 };
379 std::vector<VBucket> record;
380 std::vector<VBucket> extend;
381 Assets assets;
382 for (int i = 0; i < 3; i ++) { // 3 is type num
383 Asset asset = g_cloudAsset;
384 asset.name = name[i];
385 asset.hash = hash[i];
386 assets.push_back(asset);
387 }
388 VBucket data;
389 data.insert_or_assign("name", "Cloud" + std::to_string(0));
390 data.insert_or_assign("id", 0L);
391 data.insert_or_assign("asserts", assets);
392 Timestamp now = TimeHelper::GetSysCurrentTime();
393 VBucket log;
394 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
395 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
396 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
397 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
398 record.push_back(data);
399 extend.push_back(log);
400 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
401 }
402
CheckDiffTypeAsset(sqlite3 * & db)403 void CheckDiffTypeAsset(sqlite3 *&db)
404 {
405 std::vector<std::string> names = {
406 g_cloudAsset.name + std::to_string(0),
407 g_cloudAsset.name + std::to_string(1),
408 g_cloudAsset.name + std::to_string(3) // 3 is insert id
409 };
410 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
411 sqlite3_stmt *stmt = nullptr;
412 int index = 0;
413 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
414 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
415 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
416 Type cloudValue;
417 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
418 std::vector<uint8_t> assetsBlob;
419 Assets assets;
420 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
421 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
422 for (const Asset &asset: assets) {
423 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
424 ASSERT_EQ(asset.name, names[index++]);
425 }
426 }
427 int errCode;
428 SQLiteUtils::ResetStatement(stmt, true, errCode);
429 }
430
CheckAssetForAssetTest006()431 void CheckAssetForAssetTest006()
432 {
433 VBucket extend;
434 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
435 std::vector<VBucket> data;
436 g_virtualCloudDb->Query(g_tables[1], extend, data);
437 for (size_t j = 0; j < data.size(); ++j) {
438 ASSERT_NE(data[j].find("asserts"), data[j].end());
439 ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
440 Assets &assets = std::get<Assets>(data[j]["asserts"]);
441 ASSERT_TRUE(assets.size() > 0);
442 Asset &asset = assets[0];
443 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
444 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
445 }
446 }
447
CheckFillAssetForTest10(sqlite3 * & db)448 void CheckFillAssetForTest10(sqlite3 *&db)
449 {
450 std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
451 sqlite3_stmt *stmt = nullptr;
452 int index = 0;
453 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
454 int suffixId = 6;
455 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
456 if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
457 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
458 Type cloudValue;
459 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
460 std::vector<uint8_t> assetBlob;
461 Asset asset;
462 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
463 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
464 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
465 if (index == 0) {
466 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
467 } else if (index == 1) {
468 ASSERT_EQ(asset.name, "Test10");
469 } else {
470 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471 ASSERT_EQ(asset.uri, "/data/test");
472 }
473 } else {
474 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
475 }
476 index++;
477 }
478 int errCode;
479 SQLiteUtils::ResetStatement(stmt, true, errCode);
480 }
481
CheckFillAssetsForTest10(sqlite3 * & db)482 void CheckFillAssetsForTest10(sqlite3 *&db)
483 {
484 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
485 sqlite3_stmt *stmt = nullptr;
486 int index = 0;
487 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
488 int insertIndex = 2;
489 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
490 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
491 Type cloudValue;
492 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
493 std::vector<uint8_t> assetsBlob;
494 Assets assets;
495 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
496 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
497 if (index == 0) {
498 ASSERT_EQ(assets.size(), 2u);
499 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
500 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
501 } else if (index == 1) {
502 ASSERT_EQ(assets.size(), 3u);
503 ASSERT_EQ(assets[insertIndex].name, "Test10");
504 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
505 } else if (index == 2) { // 2 is the third element
506 ASSERT_EQ(assets.size(), 1u);
507 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
508 } else {
509 ASSERT_EQ(assets.size(), 2u);
510 ASSERT_EQ(assets[1].uri, "/data/test");
511 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
512 }
513 index++;
514 }
515 int errCode;
516 SQLiteUtils::ResetStatement(stmt, true, errCode);
517 }
518
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)519 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
520 {
521 if (count != 1) {
522 return 0;
523 }
524 auto expectCount = reinterpret_cast<int64_t>(data);
525 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
526 return 0;
527 }
528
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")529 void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
530 {
531 for (size_t i = 0; i < g_tables.size(); ++i) {
532 string queryDownload = "select count(*) from " + g_tables[i] + " where name "
533 + " like '" + keyStr + "%'";
534 EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
535 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
536 }
537 }
538
CheckCloudTotalCount(std::vector<int64_t> expectCounts)539 void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
540 {
541 VBucket extend;
542 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
543 for (size_t i = 0; i < g_tables.size(); ++i) {
544 int64_t realCount = 0;
545 std::vector<VBucket> data;
546 g_virtualCloudDb->Query(g_tables[i], extend, data);
547 for (size_t j = 0; j < data.size(); ++j) {
548 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
549 if (entry != data[j].end() && std::get<bool>(entry->second)) {
550 continue;
551 }
552 realCount++;
553 }
554 EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
555 }
556 }
557
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)558 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
559 {
560 TableSchema tableSchema1 = {
561 .name = g_tableName1,
562 .fields = g_cloudFiled1
563 };
564 TableSchema tableSchema2 = {
565 .name = g_tableName2,
566 .fields = g_cloudFiled2
567 };
568 TableSchema tableSchemaWithOutPrimaryKey = {
569 .name = g_tableName3,
570 .fields = g_cloudFiledWithOutPrimaryKey3
571 };
572 TableSchema tableSchema4 = {
573 .name = g_tableName4,
574 .fields = g_cloudFiled2
575 };
576 dataBaseSchema.tables.push_back(tableSchema1);
577 dataBaseSchema.tables.push_back(tableSchema2);
578 dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
579 dataBaseSchema.tables.push_back(tableSchema4);
580 }
581
582
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)583 void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
584 {
585 TableSchema tableSchema1 = {
586 .name = g_tableName1,
587 .fields = g_invalidCloudFiled1
588 };
589 TableSchema tableSchema2 = {
590 .name = g_tableName1,
591 .fields = g_cloudFiled2
592 };
593 dataBaseSchema.tables.push_back(tableSchema1);
594 dataBaseSchema.tables.push_back(tableSchema2);
595 }
596
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)597 void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
598 std::vector<SyncProcess> &expectProcess)
599 {
600 expectProcess.clear();
601 std::vector<TableProcessInfo> infos;
602 uint32_t index = 1;
603 infos.push_back(TableProcessInfo{
604 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
605 });
606 infos.push_back(TableProcessInfo{
607 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
608 });
609
610 infos.push_back(TableProcessInfo{
611 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
612 });
613 infos.push_back(TableProcessInfo{
614 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615 });
616
617 infos.push_back(TableProcessInfo{
618 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
619 });
620 infos.push_back(TableProcessInfo{
621 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622 });
623
624 infos.push_back(TableProcessInfo{
625 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
626 });
627 infos.push_back(TableProcessInfo{
628 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629 });
630
631 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
632 SyncProcess syncProcess;
633 syncProcess.errCode = OK;
634 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
635 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
636 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
637 expectProcess.push_back(syncProcess);
638 }
639 }
640
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)641 void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
642 {
643 expectProcess.clear();
644 std::vector<TableProcessInfo> infos;
645 // first notify, first table
646 infos.push_back(TableProcessInfo{
647 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
648 });
649 // first notify, second table
650 infos.push_back(TableProcessInfo{
651 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
652 });
653 // second notify, first table
654 infos.push_back(TableProcessInfo{
655 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
656 });
657 // second notify, second table
658 infos.push_back(TableProcessInfo{
659 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
660 });
661
662 infos.push_back(TableProcessInfo{
663 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
664 });
665 // second notify, second table
666 infos.push_back(TableProcessInfo{
667 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
668 });
669 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
670 SyncProcess syncProcess;
671 syncProcess.errCode = OK;
672 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
673 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
674 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
675 expectProcess.push_back(syncProcess);
676 }
677 }
678
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)679 void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
680 std::vector<SyncProcess> &expectProcess)
681 {
682 expectProcess.clear();
683 std::vector<TableProcessInfo> infos;
684 uint32_t index = 1;
685 infos.push_back(TableProcessInfo{
686 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
687 });
688 infos.push_back(TableProcessInfo{
689 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
690 });
691
692 infos.push_back(TableProcessInfo{
693 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
694 });
695 infos.push_back(TableProcessInfo{
696 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697 });
698
699 infos.push_back(TableProcessInfo{
700 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
701 });
702 infos.push_back(TableProcessInfo{
703 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704 });
705
706 infos.push_back(TableProcessInfo{
707 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
708 });
709 infos.push_back(TableProcessInfo{
710 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
711 });
712
713 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
714 SyncProcess syncProcess;
715 syncProcess.errCode = OK;
716 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
717 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
718 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
719 expectProcess.push_back(syncProcess);
720 }
721 }
722
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)723 void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
724 std::vector<SyncProcess> &expectProcess)
725 {
726 expectProcess.clear();
727 std::vector<TableProcessInfo> infos;
728 uint32_t index = 1;
729 infos.push_back(TableProcessInfo{
730 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
731 });
732 infos.push_back(TableProcessInfo{
733 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
734 });
735
736 infos.push_back(TableProcessInfo{
737 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
738 });
739 infos.push_back(TableProcessInfo{
740 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741 });
742
743 infos.push_back(TableProcessInfo{
744 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
745 });
746 infos.push_back(TableProcessInfo{
747 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748 });
749
750 infos.push_back(TableProcessInfo{
751 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
752 });
753 infos.push_back(TableProcessInfo{
754 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755 });
756
757 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
758 SyncProcess syncProcess;
759 syncProcess.errCode = OK;
760 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
761 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
762 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
763 expectProcess.push_back(syncProcess);
764 }
765 }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)766 void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
767 std::vector<SyncProcess> &expectProcess)
768 {
769 g_syncIndex = 0;
770 callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
771 LOGI("devices size = %d", process.size());
772 ASSERT_EQ(process.size(), 1u);
773 syncProcess = std::move(process.begin()->second);
774 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
775 ASSERT_NE(syncProcess.tableProcess.empty(), true);
776 LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
777 std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
778 auto table1 = syncProcess.tableProcess.find(item);
779 if (table1 != syncProcess.tableProcess.end()) {
780 LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
781 "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
782 item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
783 table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
784 table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
785 table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
786 table1->second.upLoadInfo.failCount);
787 }
788 });
789 if (expectProcess.empty()) {
790 if (syncProcess.process == FINISHED) {
791 g_processCondition.notify_one();
792 }
793 return;
794 }
795 ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
796 for (size_t i = 0; i < g_tables.size(); ++i) {
797 SyncProcess head = expectProcess[g_syncIndex];
798 for (auto &expect : head.tableProcess) {
799 auto real = syncProcess.tableProcess.find(expect.first);
800 ASSERT_NE(real, syncProcess.tableProcess.end());
801 EXPECT_EQ(expect.second.process, real->second.process);
802 EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
803 EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
804 EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
805 EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
806 EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
807 EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
808 EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
809 EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
810 }
811 }
812 g_syncIndex++;
813 if (syncProcess.process == FINISHED) {
814 g_processCondition.notify_one();
815 }
816 };
817 }
818
CheckAllAssetAfterUpload(int64_t localCount)819 void CheckAllAssetAfterUpload(int64_t localCount)
820 {
821 VBucket extend;
822 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
823 std::vector<VBucket> data1;
824 g_virtualCloudDb->Query(g_tables[0], extend, data1);
825 for (size_t j = 0; j < data1.size(); ++j) {
826 auto entry = data1[j].find("assert");
827 ASSERT_NE(entry, data1[j].end());
828 Asset asset = std::get<Asset>(entry->second);
829 bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
830 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
831 EXPECT_EQ(asset.version, baseAsset.version);
832 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
833 EXPECT_EQ(asset.uri, baseAsset.uri);
834 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
835 EXPECT_EQ(asset.createTime, baseAsset.createTime);
836 EXPECT_EQ(asset.size, baseAsset.size);
837 EXPECT_EQ(asset.hash, baseAsset.hash);
838 }
839
840 std::vector<VBucket> data2;
841 g_virtualCloudDb->Query(g_tables[1], extend, data2);
842 for (size_t j = 0; j < data2.size(); ++j) {
843 auto entry = data2[j].find("asserts");
844 ASSERT_NE(entry, data2[j].end());
845 Assets assets = std::get<Assets>(entry->second);
846 Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
847 int index = j;
848 for (const auto &asset: assets) {
849 EXPECT_EQ(asset.version, baseAsset.version);
850 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
851 EXPECT_EQ(asset.uri, baseAsset.uri);
852 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
853 EXPECT_EQ(asset.createTime, baseAsset.createTime);
854 EXPECT_EQ(asset.size, baseAsset.size);
855 EXPECT_EQ(asset.hash, baseAsset.hash);
856 }
857 }
858 }
859
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)860 void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
861 {
862 string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
863 for (int64_t i = 0; i < localCount; ++i) {
864 queryDownload += "'" + std::to_string(i) + "',";
865 }
866 queryDownload.pop_back();
867 queryDownload += ");";
868 sqlite3_stmt *stmt = nullptr;
869 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
870 int index = 0;
871 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
872 std::vector<uint8_t> blobValue;
873 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
874 Assets assets;
875 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
876 bool isLocal = index >= localCount / g_arrayHalfSub;
877 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
878 int nameIndex = index;
879 for (const auto &asset: assets) {
880 EXPECT_EQ(asset.version, baseAsset.version);
881 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
882 EXPECT_EQ(asset.uri, baseAsset.uri);
883 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
884 EXPECT_EQ(asset.createTime, baseAsset.createTime);
885 EXPECT_EQ(asset.size, baseAsset.size);
886 EXPECT_EQ(asset.hash, baseAsset.hash);
887 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
888 nameIndex++;
889 }
890 index++;
891 }
892 int errCode;
893 SQLiteUtils::ResetStatement(stmt, true, errCode);
894 }
895
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)896 void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
897 {
898 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
899 for (int64_t i = 0; i < localCount; ++i) {
900 queryDownload += "'" + std::to_string(i) + "',";
901 }
902 queryDownload.pop_back();
903 queryDownload += ");";
904 sqlite3_stmt *stmt = nullptr;
905 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
906 int index = 0;
907 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
908 std::vector<uint8_t> blobValue;
909 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
910 Asset asset;
911 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
912 bool isCloud = index >= localCount;
913 Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
914 EXPECT_EQ(asset.version, baseAsset.version);
915 EXPECT_EQ(asset.name,
916 baseAsset.name + std::to_string(isCloud ? index - localCount / g_arrayHalfSub : index));
917 EXPECT_EQ(asset.uri, baseAsset.uri);
918 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
919 EXPECT_EQ(asset.createTime, baseAsset.createTime);
920 EXPECT_EQ(asset.size, baseAsset.size);
921 EXPECT_EQ(asset.hash, baseAsset.hash);
922 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
923 index++;
924 }
925 int errCode;
926 SQLiteUtils::ResetStatement(stmt, true, errCode);
927 }
928
UpdateCloudAssetForDownloadAssetTest003()929 void UpdateCloudAssetForDownloadAssetTest003()
930 {
931 VBucket data;
932 std::vector<uint8_t> photo(1, 'x');
933 data.insert_or_assign("name", "Cloud" + std::to_string(0));
934 data.insert_or_assign("photo", photo);
935 data.insert_or_assign("assert", g_cloudAsset);
936 Timestamp now = TimeHelper::GetSysCurrentTime();
937 VBucket log;
938 std::vector<VBucket> record;
939 std::vector<VBucket> extend;
940 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
941 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
942 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
943 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
944 record.push_back(data);
945 extend.push_back(log);
946 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
947 }
948
CheckAssetForDownloadAssetTest003(sqlite3 * & db)949 void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
950 {
951 string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
952 sqlite3_stmt *stmt = nullptr;
953 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
954 int index = 0;
955 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
956 std::vector<uint8_t> blobValue;
957 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
958 Asset asset;
959 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
960 EXPECT_EQ(asset.name, g_cloudAsset.name);
961 EXPECT_EQ(asset.hash, g_cloudAsset.hash);
962 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
963 index++;
964 }
965 int errCode;
966 SQLiteUtils::ResetStatement(stmt, true, errCode);
967 }
968
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)969 void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
970 {
971 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
972 for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
973 queryDownload += "'" + std::to_string(i) + "',";
974 }
975 queryDownload.pop_back();
976 queryDownload += ");";
977 sqlite3_stmt *stmt = nullptr;
978 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
979 int index = 0;
980 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
981 std::vector<uint8_t> blobValue;
982 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
983 Asset asset;
984 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
985 EXPECT_EQ(asset.version, g_cloudAsset.version);
986 if (index % 6u == 0) { // 6 is AssetStatus type num, include invalid type
987 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
988 } else {
989 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
990 }
991 index++;
992 }
993 int errCode;
994 SQLiteUtils::ResetStatement(stmt, true, errCode);
995 }
996
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)997 void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
998 {
999 VBucket data;
1000 std::vector<uint8_t> photo(1, 'v');
1001 data.insert_or_assign("name", "Local" + std::to_string(0));
1002 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1003 data.insert_or_assign("married", false);
1004 data.insert_or_assign("age", 13L);
1005 data.insert_or_assign("photo", photo);
1006 Asset asset = g_cloudAsset;
1007 asset.name = asset.name + std::to_string(0);
1008 data.insert_or_assign("assert", asset);
1009 record.push_back(data);
1010 VBucket log;
1011 Timestamp now = TimeHelper::GetSysCurrentTime();
1012 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1013 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1014 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1015 log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1016 extend.push_back(log);
1017 }
1018
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1019 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1020 {
1021 std::unique_lock<std::mutex> lock(g_processMutex);
1022 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1023 return syncProcess.process == FINISHED;
1024 });
1025 ASSERT_EQ(result, true);
1026 LOGD("-------------------sync end--------------");
1027 }
1028
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1029 void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1030 {
1031 g_syncProcess = {};
1032 Query query = Query::Select().FromTable(tableNames);
1033 std::vector<SyncProcess> expectProcess;
1034 CloudSyncStatusCallback callback;
1035 GetCallback(g_syncProcess, callback, expectProcess);
1036 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1037 if (dbStatus == DBStatus::OK) {
1038 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1039 }
1040 }
1041
CloseDb()1042 void CloseDb()
1043 {
1044 delete g_observer;
1045 g_virtualCloudDb = nullptr;
1046 if (g_delegate != nullptr) {
1047 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1048 g_delegate = nullptr;
1049 }
1050 }
1051
InitMockAssetLoader(DBStatus & status,int & index)1052 void InitMockAssetLoader(DBStatus &status, int &index)
1053 {
1054 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1055 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1056 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1057 .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1058 std::map<std::string, Assets> &assets) {
1059 LOGD("Download GID:%s", gid.c_str());
1060 for (auto &item: assets) {
1061 for (auto &asset: item.second) {
1062 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DOWNLOADING));
1063 LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1064 asset.status = (index++) % 6u; // 6 is AssetStatus type num, include invalid type
1065 }
1066 }
1067 return status;
1068 });
1069 }
1070
1071 class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1072 public:
1073 static void SetUpTestCase(void);
1074 static void TearDownTestCase(void);
1075 void SetUp();
1076 void TearDown();
1077 protected:
1078 sqlite3 *db = nullptr;
1079 };
1080
1081
SetUpTestCase(void)1082 void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1083 {
1084 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1085 g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1086 LOGI("The test db is:%s", g_testDir.c_str());
1087 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1088 }
1089
TearDownTestCase(void)1090 void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1091 {}
1092
SetUp(void)1093 void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1094 {
1095 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1096 LOGE("rm test db files error.");
1097 }
1098 DistributedDBToolsUnitTest::PrintTestCaseInfo();
1099 LOGD("Test dir is %s", g_testDir.c_str());
1100 db = RelationalTestUtils::CreateDataBase(g_storePath);
1101 ASSERT_NE(db, nullptr);
1102 CreateUserDBAndTable(db);
1103 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1104 ASSERT_NE(g_observer, nullptr);
1105 ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1106 g_delegate), DBStatus::OK);
1107 ASSERT_NE(g_delegate, nullptr);
1108 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1109 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1110 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1111 g_virtualCloudDb = make_shared<VirtualCloudDb>();
1112 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1113 g_syncProcess = {};
1114 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1115 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1116 // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1117 Query query = Query::Select().FromTable(g_tables);
1118 CloudSyncStatusCallback callback;
1119 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1120 DBStatus::SCHEMA_MISMATCH);
1121 DataBaseSchema dataBaseSchema;
1122 GetCloudDbSchema(dataBaseSchema);
1123 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1124 }
1125
TearDown(void)1126 void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1127 {
1128 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1129 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1130 LOGE("rm test db files error.");
1131 }
1132 }
1133
1134 /**
1135 * @tc.name: CloudSyncTest001
1136 * @tc.desc: Cloud data is older than local data.
1137 * @tc.type: FUNC
1138 * @tc.require:
1139 * @tc.author: bty
1140 */
1141 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1142 {
1143 int64_t paddingSize = 10;
1144 int64_t cloudCount = 20;
1145 int64_t localCount = cloudCount / g_arrayHalfSub;
1146 ChangedData changedDataForTable1;
1147 ChangedData changedDataForTable2;
1148 changedDataForTable1.tableName = g_tableName1;
1149 changedDataForTable2.tableName = g_tableName2;
1150 changedDataForTable1.field.push_back(std::string("name"));
1151 changedDataForTable2.field.push_back(std::string("id"));
1152 for (int i = 0; i < cloudCount; i++) {
1153 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1154 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1155 }
1156 g_observer->SetExpectedResult(changedDataForTable1);
1157 g_observer->SetExpectedResult(changedDataForTable2);
1158 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1159 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1160 Query query = Query::Select().FromTable(g_tables);
1161 std::vector<SyncProcess> expectProcess;
1162 InitProcessForTest1(cloudCount, localCount, expectProcess);
1163 CloudSyncStatusCallback callback;
1164 GetCallback(g_syncProcess, callback, expectProcess);
1165 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1166 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1167 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1168 g_observer->ClearChangedData();
1169 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1170 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1171 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1172 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1173 CloseDb();
1174 }
1175
1176 /**
1177 * @tc.name: CloudSyncTest002
1178 * @tc.desc: Local data is older than cloud data.
1179 * @tc.type: FUNC
1180 * @tc.require:
1181 * @tc.author: bty
1182 */
1183 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1184 {
1185 int64_t localCount = 20;
1186 int64_t cloudCount = 10;
1187 int64_t paddingSize = 100;
1188 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1189 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1190 Query query = Query::Select().FromTable(g_tables);
1191 std::vector<SyncProcess> expectProcess;
1192 InitProcessForTest2(cloudCount, localCount, expectProcess);
1193 CloudSyncStatusCallback callback;
1194 GetCallback(g_syncProcess, callback, expectProcess);
1195 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1196 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1197 LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1198 CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1199 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1200 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1201 CloseDb();
1202 }
1203
1204 /**
1205 * @tc.name: CloudSyncTest003
1206 * @tc.desc: test with update and delete operator
1207 * @tc.type: FUNC
1208 * @tc.require:
1209 * @tc.author: bty
1210 */
1211 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1212 {
1213 int64_t paddingSize = 10;
1214 int cloudCount = 20;
1215 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1216 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1217 Query query = Query::Select().FromTable(g_tables);
1218 std::vector<SyncProcess> expectProcess;
1219 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1220 CloudSyncStatusCallback callback;
1221 GetCallback(g_syncProcess, callback, expectProcess);
1222 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1223 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1224 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1225 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1226
1227 int updateCount = 10;
1228 UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1229 g_syncProcess = {};
1230 InitProcessForTest1(cloudCount, updateCount, expectProcess);
1231 GetCallback(g_syncProcess, callback, expectProcess);
1232 LOGD("-------------------sync after update--------------");
1233 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1234 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1235
1236 VBucket extend;
1237 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1238 std::vector<VBucket> data1;
1239 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1240 for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1241 EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1242 }
1243
1244 std::vector<VBucket> data2;
1245 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1246 for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1247 EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1248 }
1249
1250 int deleteCount = 3;
1251 DeleteUserTableRecord(db, 0, deleteCount);
1252 g_syncProcess = {};
1253 InitProcessForTest1(updateCount, deleteCount, expectProcess);
1254 GetCallback(g_syncProcess, callback, expectProcess);
1255 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1256 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1257
1258 CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1259 CloseDb();
1260 }
1261
1262 /**
1263 * @tc.name: CloudSyncTest004
1264 * @tc.desc: Random write of local and cloud data
1265 * @tc.type: FUNC
1266 * @tc.require:
1267 * @tc.author: bty
1268 */
1269 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1270 {
1271 int64_t paddingSize = 1024 * 8;
1272 vector<thread> threads;
1273 int cloudCount = 1024;
1274 threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1275 threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1276 for (auto &thread: threads) {
1277 thread.join();
1278 }
1279 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1280 CloseDb();
1281 }
1282
1283 /**
1284 * @tc.name: CloudSyncTest005
1285 * @tc.desc: sync with device sync query
1286 * @tc.type: FUNC
1287 * @tc.require:
1288 * @tc.author: bty
1289 */
1290 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1291 {
1292 Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1293 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1294 DBStatus::NOT_SUPPORT);
1295
1296 query = Query::Select();
1297 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1298 DBStatus::INVALID_ARGS);
1299 CloseDb();
1300 }
1301
1302 /**
1303 * @tc.name: CloudSyncTest006
1304 * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1305 * @tc.type: FUNC
1306 * @tc.require:
1307 * @tc.author: wanyi
1308 */
1309 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1310 {
1311 int64_t paddingSize = 10;
1312 int cloudCount = 20;
1313 ChangedData changedDataForTable1;
1314 ChangedData changedDataForTable2;
1315 changedDataForTable1.tableName = g_tableName1;
1316 changedDataForTable2.tableName = g_tableName2;
1317 changedDataForTable1.field.push_back(std::string("name"));
1318 changedDataForTable2.field.push_back(std::string("id"));
1319 for (int i = 0; i < cloudCount; i++) {
1320 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1321 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1322 }
1323 g_observer->SetExpectedResult(changedDataForTable1);
1324 g_observer->SetExpectedResult(changedDataForTable2);
1325 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1326 InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1327 // Set correct cloudDbSchema (correct version)
1328 DataBaseSchema correctSchema;
1329 GetCloudDbSchema(correctSchema);
1330 ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1331 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1332 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1333 g_observer->ClearChangedData();
1334 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1335 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1336 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1337 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1338
1339 // Reset cloudDbSchema (invalid version - null)
1340 DataBaseSchema nullSchema;
1341 ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1342 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1343
1344 // Reset cloudDbSchema (invalid version - field mismatch)
1345 DataBaseSchema invalidSchema;
1346 GetInvalidCloudDbSchema(invalidSchema);
1347 ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1348 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1349 CloseDb();
1350 }
1351
1352 /**
1353 * @tc.name: CloudSyncTest007
1354 * @tc.desc: Check the asset types after sync
1355 * @tc.type: FUNC
1356 * @tc.require:
1357 * @tc.author: bty
1358 */
1359 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1360 {
1361 int64_t paddingSize = 100;
1362 int localCount = 20;
1363 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1364 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1365 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1366
1367 CheckAssetAfterDownload(db, localCount);
1368 CheckAllAssetAfterUpload(localCount);
1369 CheckAssetsAfterDownload(db, localCount);
1370 CloseDb();
1371 }
1372
1373 /*
1374 * @tc.name: CloudSyncTest008
1375 * @tc.desc: Test sync with invalid param
1376 * @tc.type: FUNC
1377 * @tc.require:
1378 * @tc.author: zhangqiquan
1379 */
1380 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1381 {
1382 ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK); // it will not happen because cloudb has been set in SetUp()
1383 Query query = Query::Select().FromTable({g_tableName3});
1384 // clouddb has been set in SetUp() and it's not null
1385 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1386 CloseDb();
1387 }
1388
1389 /**
1390 * @tc.name: CloudSyncTest009
1391 * @tc.desc: The second time there was no data change and sync was called.
1392 * @tc.type: FUNC
1393 * @tc.require:
1394 * @tc.author: bty
1395 */
1396 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1397 {
1398 int64_t paddingSize = 10;
1399 int cloudCount = 20;
1400 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1401 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1402 Query query = Query::Select().FromTable(g_tables);
1403 std::vector<SyncProcess> expectProcess;
1404 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1405 CloudSyncStatusCallback callback;
1406 GetCallback(g_syncProcess, callback, expectProcess);
1407 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1408 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1409 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1410 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1411 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1412 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1413
1414 g_syncProcess = {};
1415 InitProcessForTest9(cloudCount, 0, expectProcess);
1416 GetCallback(g_syncProcess, callback, expectProcess);
1417 LOGD("--------------the second sync-------------");
1418 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1419 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1420 CloseDb();
1421 }
1422
1423 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1424 {
1425 int64_t paddingSize = 10;
1426 int cloudCount = 20;
1427 int localCount = 10;
1428 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1429 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1430 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1431
1432 int rowid = 27;
1433 UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1434 UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1435 UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1436 UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1437
1438 int id = 0;
1439 UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1440 UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1441 UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1442 UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1443
1444 LOGD("--------------the second sync-------------");
1445 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1446
1447 CheckFillAssetForTest10(db);
1448 CheckFillAssetsForTest10(db);
1449 CloseDb();
1450 }
1451
1452 /**
1453 * @tc.name: CloudSyncTest011
1454 * @tc.desc: Test sync with same table name.
1455 * @tc.type: FUNC
1456 * @tc.require:
1457 * @tc.author: zhangqiquan
1458 */
1459 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1460 {
1461 Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1462 bool syncFinish = false;
1463 std::mutex syncMutex;
1464 std::condition_variable cv;
1465 std::atomic<int> callCount = 0;
1466 CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anon01eba2f50602( const std::map<std::string, SyncProcess> &onProcess) 1467 const std::map<std::string, SyncProcess> &onProcess) {
1468 ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1469 SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1470 callCount++;
1471 if (syncProcess.process == FINISHED) {
1472 std::lock_guard<std::mutex> autoLock(syncMutex);
1473 syncFinish = true;
1474 }
1475 cv.notify_all();
1476 };
1477 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1478 std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anon01eba2f50702() 1479 cv.wait(uniqueLock, [&syncFinish]() {
1480 return syncFinish;
1481 });
1482 RuntimeContext::GetInstance()->StopTaskPool();
1483 EXPECT_EQ(callCount, 2); // 2 is onProcess count
1484 CloseDb();
1485 }
1486
1487 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1488 {
1489 int64_t localCount = 20;
1490 int64_t cloudCount = 10;
1491 int64_t paddingSize = 10;
1492 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1493 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1494 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1495
1496 InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1497 InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1498 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1499
1500 InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1501 InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1502 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1503
1504
1505 InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1506 InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1507 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1508 CloseDb();
1509 }
1510
1511 /*
1512 * @tc.name: CloudSyncTest013
1513 * @tc.desc: test increment watermark when cloud db query data size is 0
1514 * @tc.type: FUNC
1515 * @tc.require:
1516 * @tc.author: zhuwentao
1517 */
1518 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1519 {
1520 /**
1521 * @tc.steps: insert some data into cloud db
1522 * @tc.expected: return ok.
1523 */
1524 int64_t paddingSize = 10;
1525 int64_t cloudCount = 10;
1526 SyncProcess syncProcess;
1527 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1528 /**
1529 * @tc.steps: try to cloud sync
1530 * @tc.expected: return ok.
1531 */
1532 Query query = Query::Select().FromTable(g_tables);
__anon01eba2f50802(const std::map<std::string, SyncProcess> &process) 1533 CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1534 LOGI("devices size = %d", process.size());
1535 ASSERT_EQ(process.size(), 1u);
1536 syncProcess = std::move(process.begin()->second);
1537 if (syncProcess.process == FINISHED) {
1538 g_processCondition.notify_one();
1539 }
1540 };
1541 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1542 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1543 uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1544 /**
1545 * @tc.steps: insert some increment data into cloud db
1546 * @tc.expected: return ok.
1547 */
1548 VBucket data;
1549 Timestamp now = TimeHelper::GetSysCurrentTime();
1550 data.insert_or_assign("name", "Cloud" + std::to_string(0));
1551 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1552 data.insert_or_assign("married", false);
1553 data.insert_or_assign("age", 13L);
1554 VBucket log;
1555 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1556 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1557 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1558 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1559 log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1560 g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1561 syncProcess.process = PREPARED;
1562 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1563 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1564 uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1565 ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1566 CloseDb();
1567 }
1568
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1569 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1570 {
1571 std::mutex dataMutex;
1572 std::condition_variable cv;
1573 bool finish = false;
1574 DBStatus res = OK;
1575 CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1576 const std::map<std::string, SyncProcess> &process) {
1577 std::map<std::string, SyncProcess> syncProcess;
1578 {
1579 std::lock_guard<std::mutex> autoLock(dataMutex);
1580 syncProcess = process;
1581 if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1582 finish = true;
1583 }
1584 res = syncProcess[DEVICE_CLOUD].errCode;
1585 }
1586 cv.notify_one();
1587 };
1588 Query query = Query::Select().FromTable({g_tableName3});
1589 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1590 {
1591 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1592 cv.wait(uniqueLock, [&finish] {
1593 return finish;
1594 });
1595 }
1596 EXPECT_EQ(res, expectStatus);
1597 }
1598
1599 /*
1600 * @tc.name: CloudSyncTest015
1601 * @tc.desc: Test sync with cloud error
1602 * @tc.type: FUNC
1603 * @tc.require:
1604 * @tc.author: zhangqiquan
1605 */
1606 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1607 {
1608 g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1609 TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1610
1611 g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1612 TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1613
1614 g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1615 TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1616
1617 g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1618 TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1619
1620 g_virtualCloudDb->SetActionStatus(DB_ERROR);
1621 TestSyncForStatus(g_delegate, CLOUD_ERROR);
1622
1623 g_virtualCloudDb->SetActionStatus(OK);
1624 CloseDb();
1625 }
1626
1627 /*
1628 * @tc.name: CloudSyncTest014
1629 * @tc.desc: Test sync with s4
1630 * @tc.type: FUNC
1631 * @tc.require:
1632 * @tc.author: zhangqiquan
1633 */
1634 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1635 {
1636 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1637 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1638
1639 // sync failed because get security option failed
__anon01eba2f50b02(const std::string&, SecurityOption &option) 1640 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1641 option.securityLabel = S0;
1642 return DB_ERROR;
1643 });
1644 Query query = Query::Select().FromTable({g_tableName3});
1645 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1646 SECURITY_OPTION_CHECK_ERROR);
1647
1648 // sync failed because get S4
__anon01eba2f50c02(const std::string&, SecurityOption &option) 1649 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1650 option.securityLabel = S4;
1651 return NOT_SUPPORT;
1652 });
1653 Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1654 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1655 NOT_SUPPORT);
1656
1657 // sync failed because get S4
__anon01eba2f50d02(const std::string&, SecurityOption &option) 1658 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1659 option.securityLabel = S4;
1660 return OK;
1661 });
1662 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1663 SECURITY_OPTION_CHECK_ERROR);
1664
1665 // sync failed because S4 has been cached
__anon01eba2f50e02(const std::string&, SecurityOption &option) 1666 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1667 option.securityLabel = S0;
1668 return OK;
1669 });
1670 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1671 SECURITY_OPTION_CHECK_ERROR);
1672 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1673 CloseDb();
1674 }
1675
1676 /*
1677 * @tc.name: CloudSyncTest016
1678 * @tc.desc: Test sync when push before merge
1679 * @tc.type: FUNC
1680 * @tc.require:
1681 * @tc.author: chenchaohao
1682 */
1683 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1684 {
1685 int64_t localCount = 10;
1686 int64_t paddingSize = 10;
1687 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1688 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1689 CheckCloudTotalCount({10L, 10L});
1690 UpdateUserTableRecord(db, 0, localCount);
1691 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1692
1693 VBucket extend;
1694 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1695 std::vector<VBucket> data1;
1696 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1697 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1698 EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1699 }
1700
1701 std::vector<VBucket> data2;
1702 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1703 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1704 EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1705 }
1706
1707 CloseDb();
1708 }
1709
1710 /*
1711 * @tc.name: DataNotifier001
1712 * @tc.desc: Notify data without primary key
1713 * @tc.type: FUNC
1714 * @tc.require:
1715 * @tc.author: wanyi
1716 */
1717 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1718 {
1719 int64_t paddingSize = 10;
1720 int localCount = 20;
1721 InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1722 callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1723 CloseDb();
1724 }
1725
1726 /**
1727 * @tc.name: CloudSyncAssetTest001
1728 * @tc.desc:
1729 * @tc.type: FUNC
1730 * @tc.require:
1731 * @tc.author: wanyi
1732 */
1733 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1734 {
1735 int64_t paddingSize = 100;
1736 int localCount = 20;
1737 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1738 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1739 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1740
1741 CheckAssetAfterDownload(db, localCount);
1742 CheckAllAssetAfterUpload(localCount);
1743 CloseDb();
1744 }
1745
1746 /*
1747 * @tc.name: MannualNotify001
1748 * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1749 * @tc.type: FUNC
1750 * @tc.require:
1751 * @tc.author: huangboxin
1752 */
1753 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1754 {
1755 int64_t paddingSize = 10;
1756 int localCount = 10;
1757 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1758 Query query = Query::Select().FromTable(g_tables);
1759 std::vector<SyncProcess> expectProcess;
1760 InitProcessForMannualSync1(expectProcess);
1761 CloudSyncStatusCallback callback;
1762 GetCallback(g_syncProcess, callback, expectProcess);
1763 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1764 DBStatus::OK);
1765 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1766 CloseDb();
1767 }
1768
1769 /**
1770 * @tc.name: CloudProcessNotify001
1771 * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1772 * @tc.type: FUNC
1773 * @tc.require:
1774 * @tc.author: liufuchenxing
1775 */
1776 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1777 {
1778 /**
1779 * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1780 * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1781 */
1782 int64_t paddingSize = 10;
1783 int64_t localCount = 1;
1784 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1785 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1786 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1787 g_observer->ClearChangedData();
1788 LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1789 CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1790 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1791 CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1792
1793 /**
1794 * @tc.steps: step2. reset data
1795 * @tc.expected: step2. return ok.
1796 */
1797 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1798 g_syncProcess = {};
1799 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1800
1801 /**
1802 * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1803 * @tc.expected: step3. return ok.
1804 */
1805 VBucket idMap;
1806 idMap.insert_or_assign("#_gid", std::to_string(0));
1807 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1808
1809 /**
1810 * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1811 * @tc.expected: step4. return ok.
1812 */
1813 std::vector<VBucket> record1;
1814 std::vector<VBucket> extend1;
1815 InsertCloudForCloudProcessNotify001(record1, extend1);
1816 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1817
1818 /**
1819 * @tc.steps: step5. sync() and check local data.
1820 * @tc.expected: step5. return ok.
1821 */
1822 ChangedData changedDataForTable1;
1823 changedDataForTable1.tableName = g_tableName1;
1824 changedDataForTable1.field.push_back(std::string("name"));
1825 changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1826 g_observer->SetExpectedResult(changedDataForTable1);
1827
1828 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1829 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1830 g_observer->ClearChangedData();
1831 LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1832 // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1833 CheckDownloadResult(db, {1L, 1L}, "Local");
1834 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1835 CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1836
1837 /**
1838 * @tc.steps: step6. CloseDb().
1839 * @tc.expected: step6. return ok.
1840 */
1841 CloseDb();
1842 }
1843
1844 /*
1845 * @tc.name: CloudSyncAssetTest002
1846 * @tc.desc:
1847 * @tc.type: FUNC
1848 * @tc.require:
1849 * @tc.author: huangboxin
1850 */
1851 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1852 {
1853 int64_t paddingSize = 10;
1854 int localCount = 3;
1855 int cloudCount = 3;
1856 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1857 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1858 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1859 CloseDb();
1860 }
1861
1862 /*
1863 * @tc.name: CloudSyncAssetTest003
1864 * @tc.desc:
1865 * @tc.type: FUNC
1866 * @tc.require:
1867 * @tc.author: bty
1868 */
1869 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1870 {
1871 int64_t paddingSize = 10;
1872 int localCount = 3;
1873 int cloudCount = 3;
1874 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1875 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1876 Assets assets;
1877 assets.push_back(g_localAsset);
1878 assets.push_back(g_localAsset);
1879 UpdateLocalAssets(db, assets, 1);
1880 Query query = Query::Select().FromTable(g_tables);
1881 std::vector<SyncProcess> expectProcess;
__anon01eba2f50f02(const std::map<std::string, SyncProcess> &process) 1882 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1883 ASSERT_EQ(process.size(), 1u);
1884 g_syncProcess = std::move(process.begin()->second);
1885
1886 if (g_syncProcess.process == FINISHED) {
1887 g_processCondition.notify_one();
1888 }
1889 };
1890 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1891 DBStatus::OK);
1892 {
1893 std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51002() 1894 g_processCondition.wait(lock, []() {
1895 return g_syncProcess.process == FINISHED;
1896 });
1897 ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ERROR);
1898 }
1899 CloseDb();
1900 }
1901
1902 /*
1903 * @tc.name: CloudSyncAssetTest004
1904 * @tc.desc:
1905 * @tc.type: FUNC
1906 * @tc.require:
1907 * @tc.author: bty
1908 */
1909 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1910 {
1911 int64_t paddingSize = 10;
1912 int localCount = 3;
1913 int cloudCount = 3;
1914 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1915 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1916 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1917
1918 UpdateDiffType(localCount);
1919 g_syncProcess = {};
__anon01eba2f51102(const std::map<std::string, SyncProcess> &process) 1920 CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1921 ASSERT_EQ(process.size(), 1u);
1922 g_syncProcess = std::move(process.begin()->second);
1923 if (g_syncProcess.process == FINISHED) {
1924 g_processCondition.notify_one();
1925 }
1926 };
1927 Query query = Query::Select().FromTable(g_tables);
1928 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1929 DBStatus::OK);
1930 {
1931 std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51202() 1932 g_processCondition.wait(lock, []() {
1933 return g_syncProcess.process == FINISHED;
1934 });
1935 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1936 }
1937 CheckDiffTypeAsset(db);
1938 CloseDb();
1939 }
1940
1941 /*
1942 * @tc.name: CloudSyncAssetTest005
1943 * @tc.desc: Test erase all no change Asset
1944 * @tc.type: FUNC
1945 * @tc.require:
1946 * @tc.author: bty
1947 */
1948 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
1949 {
1950 /**
1951 * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
1952 * @tc.expected: step1. return ok.
1953 */
1954 int64_t paddingSize = 10;
1955 int localCount = 3;
1956 int cloudCount = 3;
1957 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1958 Assets assets;
1959 for (int64_t j = 0; j < cloudCount; j++) {
1960 Asset asset = g_cloudAsset;
1961 asset.name = g_cloudAsset.name + std::to_string(j);
1962 assets.push_back(asset);
1963 }
1964 UpdateLocalAssets(db, assets, 0);
1965 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
1966
1967 /**
1968 * @tc.steps:step2. Construct cloud data
1969 * @tc.expected: step2. return ok.
1970 */
1971 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1972
1973 /**
1974 * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
1975 * @tc.expected: step3. return ok.
1976 */
1977 Query query = Query::Select().FromTable(g_tables);
1978 std::vector<SyncProcess> expectProcess;
__anon01eba2f51302(const std::map<std::string, SyncProcess> &process) 1979 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1980 ASSERT_EQ(process.size(), 1u);
1981 g_syncProcess = std::move(process.begin()->second);
1982
1983 if (g_syncProcess.process == FINISHED) {
1984 g_processCondition.notify_one();
1985 }
1986 };
1987 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1988 DBStatus::OK);
1989 {
1990 std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51402() 1991 g_processCondition.wait(lock, []() {
1992 return g_syncProcess.process == FINISHED;
1993 });
1994 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1995 }
1996 CloseDb();
1997 }
1998
1999 /*
2000 * @tc.name: CloudSyncAssetTest006
2001 * @tc.desc: Test upload new data without assets
2002 * @tc.type: FUNC
2003 * @tc.require:
2004 * @tc.author: bty
2005 */
2006 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2007 {
2008 /**
2009 * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2010 * @tc.expected: step1. return ok.
2011 */
2012 int64_t paddingSize = 10;
2013 int localCount = 6;
2014 int cloudCount = 3;
2015 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2016 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2017 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2018
2019 /**
2020 * @tc.steps:step2. sync, upload new data without assets,
2021 * @tc.expected: step2. return ok.
2022 */
2023 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2024 CloseDb();
2025 }
2026
2027 /*
2028 * @tc.name: CloudSyncAssetTest007
2029 * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2030 * regarded as NO-CHANGE, rather than delete
2031 * @tc.type: FUNC
2032 * @tc.require:
2033 * @tc.author: wanyi
2034 */
2035 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2036 {
2037 /**
2038 * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2039 * @tc.expected: step1. return ok.
2040 */
2041 int64_t paddingSize = 10;
2042 int localCount = 1;
2043 int cloudCount = 1;
2044 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2045 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2046 /**
2047 * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2048 * @tc.expected: step2. return ok.
2049 */
2050 Assets assets;
2051 for (int64_t j = 0; j < cloudCount; j++) {
2052 Asset asset = g_cloudAsset;
2053 asset.name = g_cloudAsset.name + std::to_string(j);
2054 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2055 assets.push_back(asset);
2056 }
2057 UpdateLocalAssets(db, assets, 0);
2058 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2059 /**
2060 * @tc.steps:step3. Do sync
2061 * @tc.expected: step3. return ok.
2062 */
2063 Query query = Query::Select().FromTable(g_tables);
2064 std::vector<SyncProcess> expectProcess;
__anon01eba2f51502(const std::map<std::string, SyncProcess> &process) 2065 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2066 ASSERT_EQ(process.size(), 1u);
2067 g_syncProcess = std::move(process.begin()->second);
2068
2069 if (g_syncProcess.process == FINISHED) {
2070 g_processCondition.notify_one();
2071 }
2072 };
2073 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2074 DBStatus::OK);
2075 {
2076 std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51602() 2077 g_processCondition.wait(lock, []() {
2078 return g_syncProcess.process == FINISHED;
2079 });
2080 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2081 }
2082 /**
2083 * @tc.steps:step4. Check result. Cloud db should not contain asset.
2084 * @tc.expected: step4. return ok.
2085 */
2086 CheckAssetForAssetTest006();
2087 CloseDb();
2088 }
2089
2090
2091 /**
2092 * @tc.name: DownloadAssetTest001
2093 * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2094 * @tc.type: FUNC
2095 * @tc.require:
2096 * @tc.author: bty
2097 */
2098 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2099 {
2100 /**
2101 * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2102 * @tc.expected: step1. return ok.
2103 */
2104 DBStatus expectStatus = DBStatus::OK;
2105 int index = 0;
2106 InitMockAssetLoader(expectStatus, index);
2107
2108 /**
2109 * @tc.steps:step2. init download data
2110 * @tc.expected: step2. return ok.
2111 */
2112 int64_t paddingSize = 1;
2113 int localCount = 120;
2114 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2115 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2116
2117 /**
2118 * @tc.steps:step3. sync
2119 * @tc.expected: step3. return ok.
2120 */
2121 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2122
2123 /**
2124 * @tc.steps:step4. Expect all states to be normal
2125 * @tc.expected: step4. return ok.
2126 */
2127 CheckAssetAfterDownload(db, localCount);
2128 CloseDb();
2129 }
2130
2131 /*
2132 * @tc.name: CloudSyncAssetTest008
2133 * @tc.desc: sync failed with download asset
2134 * @tc.type: FUNC
2135 * @tc.require:
2136 * @tc.author: zhangqiquan
2137 */
2138 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2139 {
2140 /**
2141 * @tc.steps:step1. prepare asset data
2142 */
2143 int64_t paddingSize = 10;
2144 int localCount = 1;
2145 int cloudCount = 1;
2146 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2147 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2148 /**
2149 * @tc.steps:step2. set download asset status failed
2150 */
2151 g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2152 Query query = Query::Select().FromTable(g_tables);
2153 std::vector<SyncProcess> expectProcess;
__anon01eba2f51702(const std::map<std::string, SyncProcess> &process) 2154 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2155 for (const auto &item: process) {
2156 g_syncProcess = item.second;
2157 }
2158 if (g_syncProcess.process == FINISHED) {
2159 g_processCondition.notify_one();
2160 }
2161 };
2162 /**
2163 * @tc.steps:step3. sync and wait sync finished.
2164 * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2165 */
2166 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2167 DBStatus::OK);
2168 {
2169 std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51802() 2170 g_processCondition.wait(lock, []() {
2171 return g_syncProcess.process == FINISHED;
2172 });
2173 ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2174 }
2175 /**
2176 * @tc.steps:step4. clear data.
2177 */
2178 g_virtualAssetLoader->SetDownloadStatus(OK);
2179 CloseDb();
2180 }
2181
2182 /**
2183 * @tc.name: DownloadAssetTest002
2184 * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2185 * @tc.type: FUNC
2186 * @tc.require:
2187 * @tc.author: bty
2188 */
2189 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2190 {
2191 /**
2192 * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2193 * @tc.expected: step1. return ok.
2194 */
2195 DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2196 int index = 0;
2197 InitMockAssetLoader(expectStatus, index);
2198 int64_t paddingSize = 1;
2199 int localCount = 100;
2200
2201 /**
2202 * @tc.steps:step2. init download data
2203 * @tc.expected: step2. return ok.
2204 */
2205 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2206 InsertCloudTableRecord(0, localCount, paddingSize, false);
2207
2208 /**
2209 * @tc.steps:step3. sync
2210 * @tc.expected: step3. return ok.
2211 */
2212 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2213
2214 /**
2215 * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2216 * @tc.expected: step4. return ok.
2217 */
2218 CheckAssetAfterDownload2(db, localCount);
2219 CloseDb();
2220 }
2221
2222 /**
2223 * @tc.name: DownloadAssetTest003
2224 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2225 * @tc.type: FUNC
2226 * @tc.require:
2227 * @tc.author: bty
2228 */
2229 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2230 {
2231 /**
2232 * @tc.steps:step1. Init data and sync
2233 * @tc.expected: step1. return ok.
2234 */
2235 int64_t paddingSize = 1;
2236 int localCount = 10;
2237 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2238 InsertCloudTableRecord(0, localCount, paddingSize, false);
2239 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2240
2241 /**
2242 * @tc.steps:step2. update cloud Asset where gid = 0
2243 * @tc.expected: step2. return ok.
2244 */
2245 UpdateCloudAssetForDownloadAssetTest003();
2246
2247 /**
2248 * @tc.steps:step3. sync again
2249 * @tc.expected: step3. return ok.
2250 */
2251 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2252
2253 /**
2254 * @tc.steps:step4. check asset after download where gid = 0
2255 * @tc.expected: step4. return ok.
2256 */
2257 CheckAssetForDownloadAssetTest003(db);
2258 CloseDb();
2259 }
2260
2261 /**
2262 * @tc.name: DownloadAssetTest004
2263 * @tc.desc: Test total count, fail count and success count when drop table
2264 * @tc.type: FUNC
2265 * @tc.require:
2266 * @tc.author: liufuchenxing
2267 */
2268 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2269 {
2270 /**
2271 * @tc.steps:step1. Init data and sync
2272 * @tc.expected: step1. return ok.
2273 */
2274 int64_t paddingSize = 1;
2275 int count = 10;
2276 InsertUserTableRecord(db, 0, count, paddingSize, false);
2277 g_syncProcess = {};
__anon01eba2f51902(const std::map<std::string, SyncProcess> &process) 2278 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2279 for (const auto &item : process) {
2280 g_syncProcess = item.second;
2281 }
2282 if (g_syncProcess.process == FINISHED) {
2283 g_processCondition.notify_one();
2284 }
2285 };
2286 Query query = Query::Select().FromTable(g_tables);
2287 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2288 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2289
2290 /**
2291 * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2292 * @tc.expected: step2. total = 20, success=0, fail=20
2293 */
2294 g_syncProcess = {};
2295 InsertCloudTableRecord(0, count, paddingSize, false);
2296 EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), DBStatus::OK);
2297 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2298 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2299 EXPECT_EQ(g_syncProcess.errCode, DBStatus::DB_ERROR);
2300 uint32_t expectTotalCnt = 20u;
2301 EXPECT_NE(g_syncProcess.tableProcess.find(g_tableName2), g_syncProcess.tableProcess.end());
2302 EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.batchIndex, 1u);
2303 EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.total, expectTotalCnt);
2304 EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.successCount, 0u);
2305 EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.failCount, expectTotalCnt);
2306
2307 /**
2308 * @tc.steps:step3. close db.
2309 * @tc.expected: step3. close success.
2310 */
2311 CloseDb();
2312 }
2313
2314 /**
2315 * @tc.name: SchemaTest001
2316 * @tc.desc: Create table with Cloud cooperation mode and do sync
2317 * @tc.type: FUNC
2318 * @tc.require:
2319 * @tc.author: wanyi
2320 */
2321 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2322 {
2323 /**
2324 * @tc.steps:step1. Create table with Cloud cooperation mode
2325 * @tc.expected: step1. return ok.
2326 */
2327 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2328 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2329 /**
2330 * @tc.steps:step1. do sync
2331 * @tc.expected: step1. return ok.
2332 */
2333 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2334 CloseDb();
2335 }
2336
2337 /**
2338 * @tc.name: SchemaTest002
2339 * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2340 * @tc.type: FUNC
2341 * @tc.require:
2342 * @tc.author: wanyi
2343 */
2344 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2345 {
2346 /**
2347 * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2348 * @tc.expected: step1. return ok.
2349 */
2350 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2351 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2352 /**
2353 * @tc.steps:step1. do sync
2354 * @tc.expected: step1. return ok.
2355 */
2356 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2357 CloseDb();
2358 }
2359
2360 }
2361 #endif // RELATIONAL_STORE
2362