• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "mock_asset_loader.h"
34 
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 using namespace std;
39 
40 namespace {
41     string g_storeID = "Relational_Store_SYNC";
42     const string g_tableName1 = "worker1";
43     const string g_tableName2 = "worker2";
44     const string g_tableName3 = "worker3";
45     const string g_tableName4 = "worker4";
46     const string DEVICE_CLOUD = "cloud_dev";
47     const string DB_SUFFIX = ".db";
48     const int64_t g_syncWaitTime = 60;
49     const int g_arrayHalfSub = 2;
50     int g_syncIndex = 0;
51     string g_testDir;
52     string g_storePath;
53     std::mutex g_processMutex;
54     std::condition_variable g_processCondition;
55     std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
56     std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
57     DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
58     RelationalStoreObserverUnitTest *g_observer = nullptr;
59     RelationalStoreDelegate *g_delegate = nullptr;
60     SyncProcess g_syncProcess;
61     using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
62     const std::string CREATE_LOCAL_TABLE_SQL =
63             "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
64     "name TEXT PRIMARY KEY," \
65     "height REAL ," \
66     "married BOOLEAN ," \
67     "photo BLOB NOT NULL," \
68     "assert BLOB," \
69     "age INT);";
70     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
71             "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
72     "id INTEGER PRIMARY KEY," \
73     "name TEXT ," \
74     "height REAL ," \
75     "photo BLOB ," \
76     "asserts BLOB," \
77     "age INT);";
78     const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
79     const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
80             "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
81     "name TEXT," \
82     "height REAL ," \
83     "married BOOLEAN ," \
84     "photo BLOB NOT NULL," \
85     "assert BLOB," \
86     "age INT);";
87     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
88             "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
89     "id INTEGER PRIMARY KEY," \
90     "name TEXT ," \
91     "height REAL ," \
92     "photo BLOB ," \
93     "asserts BLOB," \
94     "age INT);";
95     const std::vector<Field> g_cloudFiled1 = {
96         {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
97         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
98         {"assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
99     };
100     const std::vector<Field> g_invalidCloudFiled1 = {
101         {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
102         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
103         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
104     };
105     const std::vector<Field> g_cloudFiled2 = {
106         {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
107         {"height", TYPE_INDEX<double>},  {"photo", TYPE_INDEX<Bytes>},
108         {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
109     };
110     const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
111         {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
112         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
113         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
114     };
115     const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
116     const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
117     const std::vector<string> g_prefix = {"Local", ""};
118     const Asset g_localAsset = {
119         .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
120         .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
121     };
122     const Asset g_cloudAsset = {
123         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
124         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
125     };
126 
CreateUserDBAndTable(sqlite3 * & db)127     void CreateUserDBAndTable(sqlite3 *&db)
128     {
129         EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
130         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
131         EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
132         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
133     }
134 
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)135     void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
136     {
137         std::string photo(photoSize, 'v');
138         int errCode;
139         std::vector<uint8_t> assetBlob;
140         for (int64_t i = begin; i < begin + count; ++i) {
141             Asset asset = g_localAsset;
142             asset.name = asset.name + std::to_string(i);
143             RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
144             string sql = "INSERT OR REPLACE INTO " + g_tableName1
145                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
146                          "', '175.8', '0', '" + photo + "', ? , '18');";
147             sqlite3_stmt *stmt = nullptr;
148             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
149             if (assetIsNull) {
150                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
151             } else {
152                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
153             }
154             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
155             SQLiteUtils::ResetStatement(stmt, true, errCode);
156         }
157         for (int64_t i = begin; i < begin + count; ++i) {
158             std::vector<Asset> assets;
159             Asset asset = g_localAsset;
160             asset.name = g_localAsset.name + std::to_string(i);
161             assets.push_back(asset);
162             asset.name = g_localAsset.name + std::to_string(i + 1);
163             assets.push_back(asset);
164             RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
165             string sql = "INSERT OR REPLACE INTO " + g_tableName2
166                          + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
167                          + std::to_string(i) + "', '155.10', '"+ photo + "',  ? , '21');";
168             sqlite3_stmt *stmt = nullptr;
169             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
170             if (assetIsNull) {
171                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
172             } else {
173                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
174             }
175             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
176             SQLiteUtils::ResetStatement(stmt, true, errCode);
177         }
178         LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
179             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
180     }
181 
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)182     void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
183     {
184         for (size_t i = 0; i < g_tables.size(); i++) {
185             string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
186             for (int64_t j = begin; j < begin + count; ++j) {
187                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
188             }
189             updateAge.pop_back();
190             updateAge += ");";
191             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
192         }
193         LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
194             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
195     }
196 
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)197     void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
198     {
199         for (size_t i = 0; i < g_tables.size(); i++) {
200             string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
201             for (int64_t j = begin; j < count; ++j) {
202                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
203             }
204             updateAge.pop_back();
205             updateAge += ");";
206             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
207         }
208         LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
209             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
210     }
211 
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)212     void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
213     {
214         std::vector<uint8_t> photo(photoSize, 'v');
215         std::string photoLocal(photoSize, 'v');
216         Asset asset = { .version = 1, .name = "Phone" };
217         std::vector<uint8_t> assetBlob;
218         RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
219         std::string assetStr(assetBlob.begin(), assetBlob.end());
220         std::vector<VBucket> record1;
221         std::vector<VBucket> extend1;
222         for (int64_t i = begin; i < count; ++i) {
223             Timestamp now = TimeHelper::GetSysCurrentTime();
224             VBucket data;
225             data.insert_or_assign("name", "Cloud" + std::to_string(i));
226             data.insert_or_assign("height", 166.0); // 166.0 is random double value
227             data.insert_or_assign("married", (bool)0);
228             data.insert_or_assign("photo", photo);
229             data.insert_or_assign("assert", KEY_1);
230             data.insert_or_assign("age", 13L);
231             record1.push_back(data);
232             VBucket log;
233             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
234             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
236             extend1.push_back(log);
237             std::this_thread::sleep_for(std::chrono::milliseconds(1));  // wait for 1 ms
238         }
239         int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
240         ASSERT_EQ(errCode, DBStatus::OK);
241         for (int64_t i = begin; i < count; ++i) {
242             string sql = "INSERT OR REPLACE INTO " + g_tableName3
243                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
244                          "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
245             ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
246         }
247     }
248 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)249     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
250     {
251         std::vector<uint8_t> photo(photoSize, 'v');
252         std::vector<VBucket> record1;
253         std::vector<VBucket> extend1;
254         std::vector<VBucket> record2;
255         std::vector<VBucket> extend2;
256         Timestamp now = TimeHelper::GetSysCurrentTime();
257         for (int64_t i = begin; i < begin + count; ++i) {
258             VBucket data;
259             data.insert_or_assign("name", "Cloud" + std::to_string(i));
260             data.insert_or_assign("height", 166.0); // 166.0 is random double value
261             data.insert_or_assign("married", false);
262             data.insert_or_assign("photo", photo);
263             data.insert_or_assign("age", 13L);
264             Asset asset = g_cloudAsset;
265             asset.name = asset.name + std::to_string(i);
266             assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
267             record1.push_back(data);
268             VBucket log;
269             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
270             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
272             extend1.push_back(log);
273 
274             std::vector<Asset> assets;
275             data.insert_or_assign("id", i);
276             data.insert_or_assign("height", 180.3); // 180.3 is random double value
277             for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
278                 asset.name = g_cloudAsset.name + std::to_string(j);
279                 assets.push_back(asset);
280             }
281             data.erase("assert");
282             data.erase("married");
283             assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
284             record2.push_back(data);
285             extend2.push_back(log);
286         }
287         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
288         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
289         LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
290             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
291         std::this_thread::sleep_for(std::chrono::milliseconds(count));
292     }
293 
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)294     void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
295     {
296         string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
297         std::vector<uint8_t> assetBlob;
298         int errCode;
299         Asset asset = g_cloudAsset;
300         asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
301         if (opType == AssetOpType::UPDATE) {
302             asset.uri = "/data/test";
303         } else if (opType == AssetOpType::INSERT) {
304             asset.name = "Test10";
305         }
306         asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
307         sqlite3_stmt *stmt = nullptr;
308         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
309         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
310         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
311             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
312         }
313         SQLiteUtils::ResetStatement(stmt, true, errCode);
314     }
315 
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)316     void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
317     {
318         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
319         Asset asset1 = g_localAsset;
320         Asset asset2 = g_localAsset;
321         Assets assets;
322         asset1.name = g_localAsset.name + std::to_string(rowid);
323         asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
324         asset2.name = g_localAsset.name + std::to_string(rowid + 1);
325         asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326         if (opType == AssetOpType::UPDATE) {
327             assets.push_back(asset1);
328             asset2.uri = "/data/test";
329             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
330             assets.push_back(asset2);
331         } else if (opType == AssetOpType::INSERT) {
332             assets.push_back(asset1);
333             assets.push_back(asset2);
334             Asset asset3;
335             asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
336             asset3.name = "Test10";
337             assets.push_back(asset3);
338         } else if (opType == AssetOpType::DELETE) {
339             assets.push_back(asset1);
340             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
341             assets.push_back(asset2);
342         } else {
343             assets.push_back(asset1);
344             assets.push_back(asset2);
345         }
346         sqlite3_stmt *stmt = nullptr;
347         std::vector<uint8_t> assetsBlob;
348         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
349         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
350         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
351             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
352         }
353         int errCode;
354         SQLiteUtils::ResetStatement(stmt, true, errCode);
355     }
356 
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)357     void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
358     {
359         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
360         std::vector<uint8_t> assetsBlob;
361         int errCode;
362         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
363         sqlite3_stmt *stmt = nullptr;
364         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
365         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
366             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
367         }
368         SQLiteUtils::ResetStatement(stmt, true, errCode);
369     }
370 
UpdateDiffType(int64_t begin)371     void UpdateDiffType(int64_t begin)
372     {
373         std::vector<std::string> hash = {"DEC", "update_", "insert_"};
374         std::vector<std::string> name = {
375             g_cloudAsset.name + std::to_string(0),
376             g_cloudAsset.name + std::to_string(1),
377             g_cloudAsset.name + std::to_string(3) // 3 is insert id
378         };
379         std::vector<VBucket> record;
380         std::vector<VBucket> extend;
381         Assets assets;
382         for (int i = 0; i < 3; i ++) { // 3 is type num
383             Asset asset = g_cloudAsset;
384             asset.name = name[i];
385             asset.hash = hash[i];
386             assets.push_back(asset);
387         }
388         VBucket data;
389         data.insert_or_assign("name", "Cloud" + std::to_string(0));
390         data.insert_or_assign("id", 0L);
391         data.insert_or_assign("asserts", assets);
392         Timestamp now = TimeHelper::GetSysCurrentTime();
393         VBucket log;
394         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
395         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
396         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
397         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
398         record.push_back(data);
399         extend.push_back(log);
400         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
401     }
402 
CheckDiffTypeAsset(sqlite3 * & db)403     void CheckDiffTypeAsset(sqlite3 *&db)
404     {
405         std::vector<std::string> names = {
406             g_cloudAsset.name + std::to_string(0),
407             g_cloudAsset.name + std::to_string(1),
408             g_cloudAsset.name + std::to_string(3) // 3 is insert id
409         };
410         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
411         sqlite3_stmt *stmt = nullptr;
412         int index = 0;
413         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
414         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
415             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
416             Type cloudValue;
417             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
418             std::vector<uint8_t> assetsBlob;
419             Assets assets;
420             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
421             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
422             for (const Asset &asset: assets) {
423                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
424                 ASSERT_EQ(asset.name, names[index++]);
425             }
426         }
427         int errCode;
428         SQLiteUtils::ResetStatement(stmt, true, errCode);
429     }
430 
CheckAssetForAssetTest006()431     void CheckAssetForAssetTest006()
432     {
433         VBucket extend;
434         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
435         std::vector<VBucket> data;
436         g_virtualCloudDb->Query(g_tables[1], extend, data);
437         for (size_t j = 0; j < data.size(); ++j) {
438             ASSERT_NE(data[j].find("asserts"), data[j].end());
439             ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
440             Assets &assets = std::get<Assets>(data[j]["asserts"]);
441             ASSERT_TRUE(assets.size() > 0);
442             Asset &asset = assets[0];
443             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
444             EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
445         }
446     }
447 
CheckFillAssetForTest10(sqlite3 * & db)448     void CheckFillAssetForTest10(sqlite3 *&db)
449     {
450         std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
451         sqlite3_stmt *stmt = nullptr;
452         int index = 0;
453         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
454         int suffixId = 6;
455         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
456             if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
457                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
458                 Type cloudValue;
459                 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
460                 std::vector<uint8_t> assetBlob;
461                 Asset asset;
462                 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
463                 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
464                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
465                 if (index == 0) {
466                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
467                 } else if (index == 1) {
468                     ASSERT_EQ(asset.name, "Test10");
469                 } else {
470                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471                     ASSERT_EQ(asset.uri, "/data/test");
472                 }
473             } else {
474                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
475             }
476             index++;
477         }
478         int errCode;
479         SQLiteUtils::ResetStatement(stmt, true, errCode);
480     }
481 
CheckFillAssetsForTest10(sqlite3 * & db)482     void CheckFillAssetsForTest10(sqlite3 *&db)
483     {
484         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
485         sqlite3_stmt *stmt = nullptr;
486         int index = 0;
487         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
488         int insertIndex = 2;
489         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
490             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
491             Type cloudValue;
492             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
493             std::vector<uint8_t> assetsBlob;
494             Assets assets;
495             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
496             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
497             if (index == 0) {
498                 ASSERT_EQ(assets.size(), 2u);
499                 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
500                 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
501             } else if (index == 1) {
502                 ASSERT_EQ(assets.size(), 3u);
503                 ASSERT_EQ(assets[insertIndex].name, "Test10");
504                 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
505             } else if (index == 2) { // 2 is the third element
506                 ASSERT_EQ(assets.size(), 1u);
507                 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
508             } else {
509                 ASSERT_EQ(assets.size(), 2u);
510                 ASSERT_EQ(assets[1].uri, "/data/test");
511                 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
512             }
513             index++;
514         }
515         int errCode;
516         SQLiteUtils::ResetStatement(stmt, true, errCode);
517     }
518 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)519     int QueryCountCallback(void *data, int count, char **colValue, char **colName)
520     {
521         if (count != 1) {
522             return 0;
523         }
524         auto expectCount = reinterpret_cast<int64_t>(data);
525         EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
526         return 0;
527     }
528 
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")529     void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
530     {
531         for (size_t i = 0; i < g_tables.size(); ++i) {
532             string queryDownload = "select count(*) from " + g_tables[i] + " where name "
533                                    + " like '" + keyStr + "%'";
534             EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
535                 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
536         }
537     }
538 
CheckCloudTotalCount(std::vector<int64_t> expectCounts)539     void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
540     {
541         VBucket extend;
542         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
543         for (size_t i = 0; i < g_tables.size(); ++i) {
544             int64_t realCount = 0;
545             std::vector<VBucket> data;
546             g_virtualCloudDb->Query(g_tables[i], extend, data);
547             for (size_t j = 0; j < data.size(); ++j) {
548                 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
549                 if (entry != data[j].end() && std::get<bool>(entry->second)) {
550                     continue;
551                 }
552                 realCount++;
553             }
554             EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
555         }
556     }
557 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)558     void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
559     {
560         TableSchema tableSchema1 = {
561             .name = g_tableName1,
562             .fields = g_cloudFiled1
563         };
564         TableSchema tableSchema2 = {
565             .name = g_tableName2,
566             .fields = g_cloudFiled2
567         };
568         TableSchema tableSchemaWithOutPrimaryKey = {
569             .name = g_tableName3,
570             .fields = g_cloudFiledWithOutPrimaryKey3
571         };
572         TableSchema tableSchema4 = {
573             .name = g_tableName4,
574             .fields = g_cloudFiled2
575         };
576         dataBaseSchema.tables.push_back(tableSchema1);
577         dataBaseSchema.tables.push_back(tableSchema2);
578         dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
579         dataBaseSchema.tables.push_back(tableSchema4);
580     }
581 
582 
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)583     void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
584     {
585         TableSchema tableSchema1 = {
586             .name = g_tableName1,
587             .fields = g_invalidCloudFiled1
588         };
589         TableSchema tableSchema2 = {
590             .name = g_tableName1,
591             .fields = g_cloudFiled2
592         };
593         dataBaseSchema.tables.push_back(tableSchema1);
594         dataBaseSchema.tables.push_back(tableSchema2);
595     }
596 
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)597     void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
598         std::vector<SyncProcess> &expectProcess)
599     {
600         expectProcess.clear();
601         std::vector<TableProcessInfo> infos;
602         uint32_t index = 1;
603         infos.push_back(TableProcessInfo{
604             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
605         });
606         infos.push_back(TableProcessInfo{
607             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
608         });
609 
610         infos.push_back(TableProcessInfo{
611             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
612         });
613         infos.push_back(TableProcessInfo{
614             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615         });
616 
617         infos.push_back(TableProcessInfo{
618             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
619         });
620         infos.push_back(TableProcessInfo{
621             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622         });
623 
624         infos.push_back(TableProcessInfo{
625             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
626         });
627         infos.push_back(TableProcessInfo{
628             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629         });
630 
631         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
632             SyncProcess syncProcess;
633             syncProcess.errCode = OK;
634             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
635             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
636             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
637             expectProcess.push_back(syncProcess);
638         }
639     }
640 
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)641     void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
642     {
643         expectProcess.clear();
644         std::vector<TableProcessInfo> infos;
645         // first notify, first table
646         infos.push_back(TableProcessInfo{
647             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
648         });
649         // first notify, second table
650         infos.push_back(TableProcessInfo{
651             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
652         });
653         // second notify, first table
654         infos.push_back(TableProcessInfo{
655             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
656         });
657         // second notify, second table
658         infos.push_back(TableProcessInfo{
659             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
660         });
661 
662         infos.push_back(TableProcessInfo{
663             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
664         });
665         // second notify, second table
666         infos.push_back(TableProcessInfo{
667             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
668         });
669         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
670             SyncProcess syncProcess;
671             syncProcess.errCode = OK;
672             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
673             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
674             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
675             expectProcess.push_back(syncProcess);
676         }
677     }
678 
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)679     void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
680         std::vector<SyncProcess> &expectProcess)
681     {
682         expectProcess.clear();
683         std::vector<TableProcessInfo> infos;
684         uint32_t index = 1;
685         infos.push_back(TableProcessInfo{
686             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
687         });
688         infos.push_back(TableProcessInfo{
689             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
690         });
691 
692         infos.push_back(TableProcessInfo{
693             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
694         });
695         infos.push_back(TableProcessInfo{
696             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697         });
698 
699         infos.push_back(TableProcessInfo{
700             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
701         });
702         infos.push_back(TableProcessInfo{
703             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704         });
705 
706         infos.push_back(TableProcessInfo{
707             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
708         });
709         infos.push_back(TableProcessInfo{
710             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
711         });
712 
713         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
714             SyncProcess syncProcess;
715             syncProcess.errCode = OK;
716             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
717             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
718             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
719             expectProcess.push_back(syncProcess);
720         }
721     }
722 
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)723     void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
724         std::vector<SyncProcess> &expectProcess)
725     {
726         expectProcess.clear();
727         std::vector<TableProcessInfo> infos;
728         uint32_t index = 1;
729         infos.push_back(TableProcessInfo{
730             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
731         });
732         infos.push_back(TableProcessInfo{
733             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
734         });
735 
736         infos.push_back(TableProcessInfo{
737             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
738         });
739         infos.push_back(TableProcessInfo{
740             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741         });
742 
743         infos.push_back(TableProcessInfo{
744             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
745         });
746         infos.push_back(TableProcessInfo{
747             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748         });
749 
750         infos.push_back(TableProcessInfo{
751             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
752         });
753         infos.push_back(TableProcessInfo{
754             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755         });
756 
757         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
758             SyncProcess syncProcess;
759             syncProcess.errCode = OK;
760             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
761             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
762             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
763             expectProcess.push_back(syncProcess);
764         }
765     }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)766     void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
767         std::vector<SyncProcess> &expectProcess)
768     {
769         g_syncIndex = 0;
770         callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
771             LOGI("devices size = %d", process.size());
772             ASSERT_EQ(process.size(), 1u);
773             syncProcess = std::move(process.begin()->second);
774             ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
775             ASSERT_NE(syncProcess.tableProcess.empty(), true);
776             LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
777             std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
778                 auto table1 = syncProcess.tableProcess.find(item);
779                 if (table1 != syncProcess.tableProcess.end()) {
780                     LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
781                          "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
782                          item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
783                          table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
784                          table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
785                          table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
786                          table1->second.upLoadInfo.failCount);
787                 }
788             });
789             if (expectProcess.empty()) {
790                 if (syncProcess.process == FINISHED) {
791                     g_processCondition.notify_one();
792                 }
793                 return;
794             }
795             ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
796             for (size_t i = 0; i < g_tables.size(); ++i) {
797                 SyncProcess head = expectProcess[g_syncIndex];
798                 for (auto &expect : head.tableProcess) {
799                     auto real = syncProcess.tableProcess.find(expect.first);
800                     ASSERT_NE(real, syncProcess.tableProcess.end());
801                     EXPECT_EQ(expect.second.process, real->second.process);
802                     EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
803                     EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
804                     EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
805                     EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
806                     EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
807                     EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
808                     EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
809                     EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
810                 }
811             }
812             g_syncIndex++;
813             if (syncProcess.process == FINISHED) {
814                 g_processCondition.notify_one();
815             }
816         };
817     }
818 
CheckAllAssetAfterUpload(int64_t localCount)819     void CheckAllAssetAfterUpload(int64_t localCount)
820     {
821         VBucket extend;
822         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
823         std::vector<VBucket> data1;
824         g_virtualCloudDb->Query(g_tables[0], extend, data1);
825         for (size_t j = 0; j < data1.size(); ++j) {
826             auto entry = data1[j].find("assert");
827             ASSERT_NE(entry, data1[j].end());
828             Asset asset = std::get<Asset>(entry->second);
829             bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
830             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
831             EXPECT_EQ(asset.version, baseAsset.version);
832             EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
833             EXPECT_EQ(asset.uri, baseAsset.uri);
834             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
835             EXPECT_EQ(asset.createTime, baseAsset.createTime);
836             EXPECT_EQ(asset.size, baseAsset.size);
837             EXPECT_EQ(asset.hash, baseAsset.hash);
838         }
839 
840         std::vector<VBucket> data2;
841         g_virtualCloudDb->Query(g_tables[1], extend, data2);
842         for (size_t j = 0; j < data2.size(); ++j) {
843             auto entry = data2[j].find("asserts");
844             ASSERT_NE(entry, data2[j].end());
845             Assets assets = std::get<Assets>(entry->second);
846             Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
847             int index = j;
848             for (const auto &asset: assets) {
849                 EXPECT_EQ(asset.version, baseAsset.version);
850                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
851                 EXPECT_EQ(asset.uri, baseAsset.uri);
852                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
853                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
854                 EXPECT_EQ(asset.size, baseAsset.size);
855                 EXPECT_EQ(asset.hash, baseAsset.hash);
856             }
857         }
858     }
859 
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)860     void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
861     {
862         string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
863         for (int64_t i = 0; i < localCount; ++i) {
864             queryDownload +=  "'" + std::to_string(i) + "',";
865         }
866         queryDownload.pop_back();
867         queryDownload += ");";
868         sqlite3_stmt *stmt = nullptr;
869         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
870         int index = 0;
871         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
872             std::vector<uint8_t> blobValue;
873             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
874             Assets assets;
875             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
876             bool isLocal = index >= localCount / g_arrayHalfSub;
877             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
878             int nameIndex = index;
879             for (const auto &asset: assets) {
880                 EXPECT_EQ(asset.version, baseAsset.version);
881                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
882                 EXPECT_EQ(asset.uri, baseAsset.uri);
883                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
884                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
885                 EXPECT_EQ(asset.size, baseAsset.size);
886                 EXPECT_EQ(asset.hash, baseAsset.hash);
887                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
888                 nameIndex++;
889             }
890             index++;
891         }
892         int errCode;
893         SQLiteUtils::ResetStatement(stmt, true, errCode);
894     }
895 
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)896     void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
897     {
898         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
899         for (int64_t i = 0; i < localCount; ++i) {
900             queryDownload +=  "'" + std::to_string(i) + "',";
901         }
902         queryDownload.pop_back();
903         queryDownload += ");";
904         sqlite3_stmt *stmt = nullptr;
905         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
906         int index = 0;
907         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
908             std::vector<uint8_t> blobValue;
909             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
910             Asset asset;
911             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
912             bool isCloud = index >= localCount;
913             Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
914             EXPECT_EQ(asset.version, baseAsset.version);
915             EXPECT_EQ(asset.name,
916                 baseAsset.name + std::to_string(isCloud ?  index - localCount / g_arrayHalfSub : index));
917             EXPECT_EQ(asset.uri, baseAsset.uri);
918             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
919             EXPECT_EQ(asset.createTime, baseAsset.createTime);
920             EXPECT_EQ(asset.size, baseAsset.size);
921             EXPECT_EQ(asset.hash, baseAsset.hash);
922             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
923             index++;
924         }
925         int errCode;
926         SQLiteUtils::ResetStatement(stmt, true, errCode);
927     }
928 
UpdateCloudAssetForDownloadAssetTest003()929     void UpdateCloudAssetForDownloadAssetTest003()
930     {
931         VBucket data;
932         std::vector<uint8_t> photo(1, 'x');
933         data.insert_or_assign("name", "Cloud" + std::to_string(0));
934         data.insert_or_assign("photo", photo);
935         data.insert_or_assign("assert", g_cloudAsset);
936         Timestamp now = TimeHelper::GetSysCurrentTime();
937         VBucket log;
938         std::vector<VBucket> record;
939         std::vector<VBucket> extend;
940         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
941         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
942         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
943         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
944         record.push_back(data);
945         extend.push_back(log);
946         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
947     }
948 
CheckAssetForDownloadAssetTest003(sqlite3 * & db)949     void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
950     {
951         string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
952         sqlite3_stmt *stmt = nullptr;
953         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
954         int index = 0;
955         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
956             std::vector<uint8_t> blobValue;
957             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
958             Asset asset;
959             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
960             EXPECT_EQ(asset.name, g_cloudAsset.name);
961             EXPECT_EQ(asset.hash, g_cloudAsset.hash);
962             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
963             index++;
964         }
965         int errCode;
966         SQLiteUtils::ResetStatement(stmt, true, errCode);
967     }
968 
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)969     void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
970     {
971         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
972         for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
973             queryDownload +=  "'" + std::to_string(i) + "',";
974         }
975         queryDownload.pop_back();
976         queryDownload += ");";
977         sqlite3_stmt *stmt = nullptr;
978         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
979         int index = 0;
980         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
981             std::vector<uint8_t> blobValue;
982             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
983             Asset asset;
984             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
985             EXPECT_EQ(asset.version, g_cloudAsset.version);
986             if (index % 6u == 0) { // 6 is AssetStatus type num, include invalid type
987                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
988             } else {
989                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
990             }
991             index++;
992         }
993         int errCode;
994         SQLiteUtils::ResetStatement(stmt, true, errCode);
995     }
996 
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)997     void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
998     {
999         VBucket data;
1000         std::vector<uint8_t> photo(1, 'v');
1001         data.insert_or_assign("name", "Local" + std::to_string(0));
1002         data.insert_or_assign("height", 166.0); // 166.0 is random double value
1003         data.insert_or_assign("married", false);
1004         data.insert_or_assign("age", 13L);
1005         data.insert_or_assign("photo", photo);
1006         Asset asset = g_cloudAsset;
1007         asset.name = asset.name + std::to_string(0);
1008         data.insert_or_assign("assert", asset);
1009         record.push_back(data);
1010         VBucket log;
1011         Timestamp now = TimeHelper::GetSysCurrentTime();
1012         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1013         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1014         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1015         log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1016         extend.push_back(log);
1017     }
1018 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1019     void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1020     {
1021         std::unique_lock<std::mutex> lock(g_processMutex);
1022         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1023             return syncProcess.process == FINISHED;
1024         });
1025         ASSERT_EQ(result, true);
1026         LOGD("-------------------sync end--------------");
1027     }
1028 
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1029     void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1030     {
1031         g_syncProcess = {};
1032         Query query = Query::Select().FromTable(tableNames);
1033         std::vector<SyncProcess> expectProcess;
1034         CloudSyncStatusCallback callback;
1035         GetCallback(g_syncProcess, callback, expectProcess);
1036         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1037         if (dbStatus == DBStatus::OK) {
1038             WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1039         }
1040     }
1041 
CloseDb()1042     void CloseDb()
1043     {
1044         delete g_observer;
1045         g_virtualCloudDb = nullptr;
1046         if (g_delegate != nullptr) {
1047             EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1048             g_delegate = nullptr;
1049         }
1050     }
1051 
InitMockAssetLoader(DBStatus & status,int & index)1052     void InitMockAssetLoader(DBStatus &status, int &index)
1053     {
1054         std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1055         ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1056         EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1057             .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1058                 std::map<std::string, Assets> &assets) {
1059                 LOGD("Download GID:%s", gid.c_str());
1060                 for (auto &item: assets) {
1061                     for (auto &asset: item.second) {
1062                         EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DOWNLOADING));
1063                         LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1064                         asset.status = (index++) % 6u; // 6 is AssetStatus type num, include invalid type
1065                     }
1066                 }
1067                 return status;
1068         });
1069     }
1070 
1071     class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1072     public:
1073         static void SetUpTestCase(void);
1074         static void TearDownTestCase(void);
1075         void SetUp();
1076         void TearDown();
1077     protected:
1078         sqlite3 *db = nullptr;
1079     };
1080 
1081 
SetUpTestCase(void)1082     void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1083     {
1084         DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1085         g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1086         LOGI("The test db is:%s", g_testDir.c_str());
1087         RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1088     }
1089 
TearDownTestCase(void)1090     void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1091     {}
1092 
SetUp(void)1093     void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1094     {
1095         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1096             LOGE("rm test db files error.");
1097         }
1098         DistributedDBToolsUnitTest::PrintTestCaseInfo();
1099         LOGD("Test dir is %s", g_testDir.c_str());
1100         db = RelationalTestUtils::CreateDataBase(g_storePath);
1101         ASSERT_NE(db, nullptr);
1102         CreateUserDBAndTable(db);
1103         g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1104         ASSERT_NE(g_observer, nullptr);
1105         ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1106             g_delegate), DBStatus::OK);
1107         ASSERT_NE(g_delegate, nullptr);
1108         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1109         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1110         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1111         g_virtualCloudDb = make_shared<VirtualCloudDb>();
1112         g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1113         g_syncProcess = {};
1114         ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1115         ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1116         // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1117         Query query = Query::Select().FromTable(g_tables);
1118         CloudSyncStatusCallback callback;
1119         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1120             DBStatus::SCHEMA_MISMATCH);
1121         DataBaseSchema dataBaseSchema;
1122         GetCloudDbSchema(dataBaseSchema);
1123         ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1124     }
1125 
TearDown(void)1126     void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1127     {
1128         EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1129         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1130             LOGE("rm test db files error.");
1131         }
1132     }
1133 
1134 /**
1135  * @tc.name: CloudSyncTest001
1136  * @tc.desc: Cloud data is older than local data.
1137  * @tc.type: FUNC
1138  * @tc.require:
1139  * @tc.author: bty
1140  */
1141 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1142 {
1143     int64_t paddingSize = 10;
1144     int64_t cloudCount = 20;
1145     int64_t localCount = cloudCount / g_arrayHalfSub;
1146     ChangedData changedDataForTable1;
1147     ChangedData changedDataForTable2;
1148     changedDataForTable1.tableName = g_tableName1;
1149     changedDataForTable2.tableName = g_tableName2;
1150     changedDataForTable1.field.push_back(std::string("name"));
1151     changedDataForTable2.field.push_back(std::string("id"));
1152     for (int i = 0; i < cloudCount; i++) {
1153         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1154         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1155     }
1156     g_observer->SetExpectedResult(changedDataForTable1);
1157     g_observer->SetExpectedResult(changedDataForTable2);
1158     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1159     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1160     Query query = Query::Select().FromTable(g_tables);
1161     std::vector<SyncProcess> expectProcess;
1162     InitProcessForTest1(cloudCount, localCount, expectProcess);
1163     CloudSyncStatusCallback callback;
1164     GetCallback(g_syncProcess, callback, expectProcess);
1165     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1166     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1167     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1168     g_observer->ClearChangedData();
1169     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1170     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1171     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1172     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1173     CloseDb();
1174 }
1175 
1176 /**
1177  * @tc.name: CloudSyncTest002
1178  * @tc.desc: Local data is older than cloud data.
1179  * @tc.type: FUNC
1180  * @tc.require:
1181  * @tc.author: bty
1182  */
1183 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1184 {
1185     int64_t localCount = 20;
1186     int64_t cloudCount = 10;
1187     int64_t paddingSize = 100;
1188     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1189     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1190     Query query = Query::Select().FromTable(g_tables);
1191     std::vector<SyncProcess> expectProcess;
1192     InitProcessForTest2(cloudCount, localCount, expectProcess);
1193     CloudSyncStatusCallback callback;
1194     GetCallback(g_syncProcess, callback, expectProcess);
1195     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1196     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1197     LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1198     CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1199     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1200     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1201     CloseDb();
1202 }
1203 
1204 /**
1205  * @tc.name: CloudSyncTest003
1206  * @tc.desc: test with update and delete operator
1207  * @tc.type: FUNC
1208  * @tc.require:
1209  * @tc.author: bty
1210  */
1211 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1212 {
1213     int64_t paddingSize = 10;
1214     int cloudCount = 20;
1215     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1216     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1217     Query query = Query::Select().FromTable(g_tables);
1218     std::vector<SyncProcess> expectProcess;
1219     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1220     CloudSyncStatusCallback callback;
1221     GetCallback(g_syncProcess, callback, expectProcess);
1222     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1223     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1224     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1225     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1226 
1227     int updateCount = 10;
1228     UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1229     g_syncProcess = {};
1230     InitProcessForTest1(cloudCount, updateCount, expectProcess);
1231     GetCallback(g_syncProcess, callback, expectProcess);
1232     LOGD("-------------------sync after update--------------");
1233     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1234     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1235 
1236     VBucket extend;
1237     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1238     std::vector<VBucket> data1;
1239     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1240     for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1241         EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1242     }
1243 
1244     std::vector<VBucket> data2;
1245     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1246     for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1247         EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1248     }
1249 
1250     int deleteCount = 3;
1251     DeleteUserTableRecord(db, 0, deleteCount);
1252     g_syncProcess = {};
1253     InitProcessForTest1(updateCount, deleteCount, expectProcess);
1254     GetCallback(g_syncProcess, callback, expectProcess);
1255     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1256     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1257 
1258     CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1259     CloseDb();
1260 }
1261 
1262 /**
1263  * @tc.name: CloudSyncTest004
1264  * @tc.desc: Random write of local and cloud data
1265  * @tc.type: FUNC
1266  * @tc.require:
1267  * @tc.author: bty
1268  */
1269 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1270 {
1271     int64_t paddingSize = 1024 * 8;
1272     vector<thread> threads;
1273     int cloudCount = 1024;
1274     threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1275     threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1276     for (auto &thread: threads) {
1277         thread.join();
1278     }
1279     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1280     CloseDb();
1281 }
1282 
1283 /**
1284  * @tc.name: CloudSyncTest005
1285  * @tc.desc: sync with device sync query
1286  * @tc.type: FUNC
1287  * @tc.require:
1288  * @tc.author: bty
1289  */
1290 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1291 {
1292     Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1293     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1294         DBStatus::NOT_SUPPORT);
1295 
1296     query = Query::Select();
1297     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1298         DBStatus::INVALID_ARGS);
1299     CloseDb();
1300 }
1301 
1302 /**
1303  * @tc.name: CloudSyncTest006
1304  * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1305  * @tc.type: FUNC
1306  * @tc.require:
1307  * @tc.author: wanyi
1308  */
1309 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1310 {
1311     int64_t paddingSize = 10;
1312     int cloudCount = 20;
1313     ChangedData changedDataForTable1;
1314     ChangedData changedDataForTable2;
1315     changedDataForTable1.tableName = g_tableName1;
1316     changedDataForTable2.tableName = g_tableName2;
1317     changedDataForTable1.field.push_back(std::string("name"));
1318     changedDataForTable2.field.push_back(std::string("id"));
1319     for (int i = 0; i < cloudCount; i++) {
1320         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1321         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1322     }
1323     g_observer->SetExpectedResult(changedDataForTable1);
1324     g_observer->SetExpectedResult(changedDataForTable2);
1325     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1326     InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1327     // Set correct cloudDbSchema (correct version)
1328     DataBaseSchema correctSchema;
1329     GetCloudDbSchema(correctSchema);
1330     ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1331     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1332     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1333     g_observer->ClearChangedData();
1334     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1335     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1336     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1337     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1338 
1339     // Reset cloudDbSchema (invalid version - null)
1340     DataBaseSchema nullSchema;
1341     ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1342     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1343 
1344     // Reset cloudDbSchema (invalid version - field mismatch)
1345     DataBaseSchema invalidSchema;
1346     GetInvalidCloudDbSchema(invalidSchema);
1347     ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1348     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1349     CloseDb();
1350 }
1351 
1352 /**
1353  * @tc.name: CloudSyncTest007
1354  * @tc.desc: Check the asset types after sync
1355  * @tc.type: FUNC
1356  * @tc.require:
1357  * @tc.author: bty
1358  */
1359 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1360 {
1361     int64_t paddingSize = 100;
1362     int localCount = 20;
1363     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1364     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1365     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1366 
1367     CheckAssetAfterDownload(db, localCount);
1368     CheckAllAssetAfterUpload(localCount);
1369     CheckAssetsAfterDownload(db, localCount);
1370     CloseDb();
1371 }
1372 
1373 /*
1374  * @tc.name: CloudSyncTest008
1375  * @tc.desc: Test sync with invalid param
1376  * @tc.type: FUNC
1377  * @tc.require:
1378  * @tc.author: zhangqiquan
1379  */
1380 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1381 {
1382     ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK);   // it will not happen because cloudb has been set in SetUp()
1383     Query query = Query::Select().FromTable({g_tableName3});
1384     // clouddb has been set in SetUp() and it's not null
1385     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1386     CloseDb();
1387 }
1388 
1389 /**
1390  * @tc.name: CloudSyncTest009
1391  * @tc.desc: The second time there was no data change and sync was called.
1392  * @tc.type: FUNC
1393  * @tc.require:
1394  * @tc.author: bty
1395  */
1396 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1397 {
1398     int64_t paddingSize = 10;
1399     int cloudCount = 20;
1400     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1401     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1402     Query query = Query::Select().FromTable(g_tables);
1403     std::vector<SyncProcess> expectProcess;
1404     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1405     CloudSyncStatusCallback callback;
1406     GetCallback(g_syncProcess, callback, expectProcess);
1407     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1408     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1409     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1410     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1411     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1412     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1413 
1414     g_syncProcess = {};
1415     InitProcessForTest9(cloudCount, 0, expectProcess);
1416     GetCallback(g_syncProcess, callback, expectProcess);
1417     LOGD("--------------the second sync-------------");
1418     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1419     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1420     CloseDb();
1421 }
1422 
1423 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1424 {
1425     int64_t paddingSize = 10;
1426     int cloudCount = 20;
1427     int localCount = 10;
1428     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1429     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1430     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1431 
1432     int rowid = 27;
1433     UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1434     UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1435     UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1436     UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1437 
1438     int id = 0;
1439     UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1440     UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1441     UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1442     UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1443 
1444     LOGD("--------------the second sync-------------");
1445     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1446 
1447     CheckFillAssetForTest10(db);
1448     CheckFillAssetsForTest10(db);
1449     CloseDb();
1450 }
1451 
1452 /**
1453  * @tc.name: CloudSyncTest011
1454  * @tc.desc: Test sync with same table name.
1455  * @tc.type: FUNC
1456  * @tc.require:
1457  * @tc.author: zhangqiquan
1458  */
1459 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1460 {
1461     Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1462     bool syncFinish = false;
1463     std::mutex syncMutex;
1464     std::condition_variable cv;
1465     std::atomic<int> callCount = 0;
1466     CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anon01eba2f50602( const std::map<std::string, SyncProcess> &onProcess) 1467         const std::map<std::string, SyncProcess> &onProcess) {
1468         ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1469         SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1470         callCount++;
1471         if (syncProcess.process == FINISHED) {
1472             std::lock_guard<std::mutex> autoLock(syncMutex);
1473             syncFinish = true;
1474         }
1475         cv.notify_all();
1476     };
1477     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1478     std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anon01eba2f50702() 1479     cv.wait(uniqueLock, [&syncFinish]() {
1480         return syncFinish;
1481     });
1482     RuntimeContext::GetInstance()->StopTaskPool();
1483     EXPECT_EQ(callCount, 2); // 2 is onProcess count
1484     CloseDb();
1485 }
1486 
1487 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1488 {
1489     int64_t localCount = 20;
1490     int64_t cloudCount = 10;
1491     int64_t paddingSize = 10;
1492     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1493     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1494     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1495 
1496     InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1497     InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1498     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1499 
1500     InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1501     InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1502     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1503 
1504 
1505     InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1506     InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1507     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1508     CloseDb();
1509 }
1510 
1511 /*
1512  * @tc.name: CloudSyncTest013
1513  * @tc.desc: test increment watermark when cloud db query data size is 0
1514  * @tc.type: FUNC
1515  * @tc.require:
1516  * @tc.author: zhuwentao
1517  */
1518 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1519 {
1520     /**
1521      * @tc.steps: insert some data into cloud db
1522      * @tc.expected: return ok.
1523      */
1524     int64_t paddingSize = 10;
1525     int64_t cloudCount = 10;
1526     SyncProcess syncProcess;
1527     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1528     /**
1529      * @tc.steps: try to cloud sync
1530      * @tc.expected: return ok.
1531      */
1532     Query query = Query::Select().FromTable(g_tables);
__anon01eba2f50802(const std::map<std::string, SyncProcess> &process) 1533     CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1534         LOGI("devices size = %d", process.size());
1535         ASSERT_EQ(process.size(), 1u);
1536         syncProcess = std::move(process.begin()->second);
1537         if (syncProcess.process == FINISHED) {
1538             g_processCondition.notify_one();
1539         }
1540     };
1541     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1542     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1543     uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1544     /**
1545      * @tc.steps: insert some increment data into cloud db
1546      * @tc.expected: return ok.
1547      */
1548     VBucket data;
1549     Timestamp now = TimeHelper::GetSysCurrentTime();
1550     data.insert_or_assign("name", "Cloud" + std::to_string(0));
1551     data.insert_or_assign("height", 166.0); // 166.0 is random double value
1552     data.insert_or_assign("married", false);
1553     data.insert_or_assign("age", 13L);
1554     VBucket log;
1555     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1556     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1557     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1558     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1559     log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1560     g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1561     syncProcess.process = PREPARED;
1562     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1563     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1564     uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1565     ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1566     CloseDb();
1567 }
1568 
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1569 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1570 {
1571     std::mutex dataMutex;
1572     std::condition_variable cv;
1573     bool finish = false;
1574     DBStatus res = OK;
1575     CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1576         const std::map<std::string, SyncProcess> &process) {
1577         std::map<std::string, SyncProcess> syncProcess;
1578         {
1579             std::lock_guard<std::mutex> autoLock(dataMutex);
1580             syncProcess = process;
1581             if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1582                 finish = true;
1583             }
1584             res = syncProcess[DEVICE_CLOUD].errCode;
1585         }
1586         cv.notify_one();
1587     };
1588     Query query = Query::Select().FromTable({g_tableName3});
1589     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1590     {
1591         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1592         cv.wait(uniqueLock, [&finish] {
1593             return finish;
1594         });
1595     }
1596     EXPECT_EQ(res, expectStatus);
1597 }
1598 
1599 /*
1600  * @tc.name: CloudSyncTest015
1601  * @tc.desc: Test sync with cloud error
1602  * @tc.type: FUNC
1603  * @tc.require:
1604  * @tc.author: zhangqiquan
1605  */
1606 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1607 {
1608     g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1609     TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1610 
1611     g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1612     TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1613 
1614     g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1615     TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1616 
1617     g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1618     TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1619 
1620     g_virtualCloudDb->SetActionStatus(DB_ERROR);
1621     TestSyncForStatus(g_delegate, CLOUD_ERROR);
1622 
1623     g_virtualCloudDb->SetActionStatus(OK);
1624     CloseDb();
1625 }
1626 
1627 /*
1628  * @tc.name: CloudSyncTest014
1629  * @tc.desc: Test sync with s4
1630  * @tc.type: FUNC
1631  * @tc.require:
1632  * @tc.author: zhangqiquan
1633  */
1634 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1635 {
1636     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1637     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1638 
1639     // sync failed because get security option failed
__anon01eba2f50b02(const std::string&, SecurityOption &option) 1640     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1641         option.securityLabel = S0;
1642         return DB_ERROR;
1643     });
1644     Query query = Query::Select().FromTable({g_tableName3});
1645     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1646         SECURITY_OPTION_CHECK_ERROR);
1647 
1648     // sync failed because get S4
__anon01eba2f50c02(const std::string&, SecurityOption &option) 1649     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1650         option.securityLabel = S4;
1651         return NOT_SUPPORT;
1652     });
1653     Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1654     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1655         NOT_SUPPORT);
1656 
1657     // sync failed because get S4
__anon01eba2f50d02(const std::string&, SecurityOption &option) 1658     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1659         option.securityLabel = S4;
1660         return OK;
1661     });
1662     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1663         SECURITY_OPTION_CHECK_ERROR);
1664 
1665     // sync failed because S4 has been cached
__anon01eba2f50e02(const std::string&, SecurityOption &option) 1666     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1667         option.securityLabel = S0;
1668         return OK;
1669     });
1670     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1671         SECURITY_OPTION_CHECK_ERROR);
1672     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1673     CloseDb();
1674 }
1675 
1676 /*
1677  * @tc.name: CloudSyncTest016
1678  * @tc.desc: Test sync when push before merge
1679  * @tc.type: FUNC
1680  * @tc.require:
1681  * @tc.author: chenchaohao
1682  */
1683 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1684 {
1685     int64_t localCount = 10;
1686     int64_t paddingSize = 10;
1687     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1688     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1689     CheckCloudTotalCount({10L, 10L});
1690     UpdateUserTableRecord(db, 0, localCount);
1691     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1692 
1693     VBucket extend;
1694     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1695     std::vector<VBucket> data1;
1696     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1697     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1698         EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1699     }
1700 
1701     std::vector<VBucket> data2;
1702     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1703     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1704         EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1705     }
1706 
1707     CloseDb();
1708 }
1709 
1710 /*
1711  * @tc.name: DataNotifier001
1712  * @tc.desc: Notify data without primary key
1713  * @tc.type: FUNC
1714  * @tc.require:
1715  * @tc.author: wanyi
1716  */
1717 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1718 {
1719     int64_t paddingSize = 10;
1720     int localCount = 20;
1721     InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1722     callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1723     CloseDb();
1724 }
1725 
1726 /**
1727  * @tc.name: CloudSyncAssetTest001
1728  * @tc.desc:
1729  * @tc.type: FUNC
1730  * @tc.require:
1731  * @tc.author: wanyi
1732  */
1733 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1734 {
1735     int64_t paddingSize = 100;
1736     int localCount = 20;
1737     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1738     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1739     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1740 
1741     CheckAssetAfterDownload(db, localCount);
1742     CheckAllAssetAfterUpload(localCount);
1743     CloseDb();
1744 }
1745 
1746 /*
1747  * @tc.name: MannualNotify001
1748  * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1749  * @tc.type: FUNC
1750  * @tc.require:
1751  * @tc.author: huangboxin
1752  */
1753 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1754 {
1755     int64_t paddingSize = 10;
1756     int localCount = 10;
1757     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1758     Query query = Query::Select().FromTable(g_tables);
1759     std::vector<SyncProcess> expectProcess;
1760     InitProcessForMannualSync1(expectProcess);
1761     CloudSyncStatusCallback callback;
1762     GetCallback(g_syncProcess, callback, expectProcess);
1763     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1764         DBStatus::OK);
1765     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1766     CloseDb();
1767 }
1768 
1769 /**
1770  * @tc.name: CloudProcessNotify001
1771  * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1772  * @tc.type: FUNC
1773  * @tc.require:
1774  * @tc.author: liufuchenxing
1775  */
1776 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1777 {
1778     /**
1779      * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1780      * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1781      */
1782     int64_t paddingSize = 10;
1783     int64_t localCount = 1;
1784     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1785     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1786     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1787     g_observer->ClearChangedData();
1788     LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1789     CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1790     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1791     CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1792 
1793     /**
1794      * @tc.steps: step2. reset data
1795      * @tc.expected: step2. return ok.
1796      */
1797     std::this_thread::sleep_for(std::chrono::milliseconds(100));
1798     g_syncProcess = {};
1799     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1800 
1801     /**
1802      * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1803      * @tc.expected: step3. return ok.
1804      */
1805     VBucket idMap;
1806     idMap.insert_or_assign("#_gid", std::to_string(0));
1807     ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1808 
1809     /**
1810      * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1811      * @tc.expected: step4. return ok.
1812      */
1813     std::vector<VBucket> record1;
1814     std::vector<VBucket> extend1;
1815     InsertCloudForCloudProcessNotify001(record1, extend1);
1816     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1817 
1818     /**
1819      * @tc.steps: step5. sync() and check local data.
1820      * @tc.expected: step5. return ok.
1821      */
1822     ChangedData changedDataForTable1;
1823     changedDataForTable1.tableName = g_tableName1;
1824     changedDataForTable1.field.push_back(std::string("name"));
1825     changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1826     g_observer->SetExpectedResult(changedDataForTable1);
1827 
1828     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1829     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1830     g_observer->ClearChangedData();
1831     LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1832     // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1833     CheckDownloadResult(db, {1L, 1L}, "Local");
1834     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1835     CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1836 
1837     /**
1838      * @tc.steps: step6. CloseDb().
1839      * @tc.expected: step6. return ok.
1840      */
1841     CloseDb();
1842 }
1843 
1844 /*
1845  * @tc.name: CloudSyncAssetTest002
1846  * @tc.desc:
1847  * @tc.type: FUNC
1848  * @tc.require:
1849  * @tc.author: huangboxin
1850  */
1851 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1852 {
1853     int64_t paddingSize = 10;
1854     int localCount = 3;
1855     int cloudCount = 3;
1856     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1857     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1858     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1859     CloseDb();
1860 }
1861 
1862 /*
1863  * @tc.name: CloudSyncAssetTest003
1864  * @tc.desc:
1865  * @tc.type: FUNC
1866  * @tc.require:
1867  * @tc.author: bty
1868  */
1869 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1870 {
1871     int64_t paddingSize = 10;
1872     int localCount = 3;
1873     int cloudCount = 3;
1874     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1875     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1876     Assets assets;
1877     assets.push_back(g_localAsset);
1878     assets.push_back(g_localAsset);
1879     UpdateLocalAssets(db, assets, 1);
1880     Query query = Query::Select().FromTable(g_tables);
1881     std::vector<SyncProcess> expectProcess;
__anon01eba2f50f02(const std::map<std::string, SyncProcess> &process) 1882     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1883         ASSERT_EQ(process.size(), 1u);
1884         g_syncProcess = std::move(process.begin()->second);
1885 
1886         if (g_syncProcess.process == FINISHED) {
1887             g_processCondition.notify_one();
1888         }
1889     };
1890     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1891         DBStatus::OK);
1892     {
1893         std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51002() 1894         g_processCondition.wait(lock, []() {
1895             return g_syncProcess.process == FINISHED;
1896         });
1897         ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ERROR);
1898     }
1899     CloseDb();
1900 }
1901 
1902 /*
1903  * @tc.name: CloudSyncAssetTest004
1904  * @tc.desc:
1905  * @tc.type: FUNC
1906  * @tc.require:
1907  * @tc.author: bty
1908  */
1909 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1910 {
1911     int64_t paddingSize = 10;
1912     int localCount = 3;
1913     int cloudCount = 3;
1914     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1915     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1916     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1917 
1918     UpdateDiffType(localCount);
1919     g_syncProcess = {};
__anon01eba2f51102(const std::map<std::string, SyncProcess> &process) 1920     CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1921         ASSERT_EQ(process.size(), 1u);
1922         g_syncProcess = std::move(process.begin()->second);
1923         if (g_syncProcess.process == FINISHED) {
1924             g_processCondition.notify_one();
1925         }
1926     };
1927     Query query = Query::Select().FromTable(g_tables);
1928     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1929         DBStatus::OK);
1930     {
1931         std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51202() 1932         g_processCondition.wait(lock, []() {
1933             return g_syncProcess.process == FINISHED;
1934         });
1935         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1936     }
1937     CheckDiffTypeAsset(db);
1938     CloseDb();
1939 }
1940 
1941 /*
1942  * @tc.name: CloudSyncAssetTest005
1943  * @tc.desc: Test erase all no change Asset
1944  * @tc.type: FUNC
1945  * @tc.require:
1946  * @tc.author: bty
1947  */
1948 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
1949 {
1950     /**
1951      * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
1952      * @tc.expected: step1. return ok.
1953      */
1954     int64_t paddingSize = 10;
1955     int localCount = 3;
1956     int cloudCount = 3;
1957     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1958     Assets assets;
1959     for (int64_t j = 0; j < cloudCount; j++) {
1960         Asset asset = g_cloudAsset;
1961         asset.name = g_cloudAsset.name + std::to_string(j);
1962         assets.push_back(asset);
1963     }
1964     UpdateLocalAssets(db, assets, 0);
1965     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
1966 
1967     /**
1968      * @tc.steps:step2. Construct cloud data
1969      * @tc.expected: step2. return ok.
1970      */
1971     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1972 
1973     /**
1974      * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
1975      * @tc.expected: step3. return ok.
1976      */
1977     Query query = Query::Select().FromTable(g_tables);
1978     std::vector<SyncProcess> expectProcess;
__anon01eba2f51302(const std::map<std::string, SyncProcess> &process) 1979     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1980         ASSERT_EQ(process.size(), 1u);
1981         g_syncProcess = std::move(process.begin()->second);
1982 
1983         if (g_syncProcess.process == FINISHED) {
1984             g_processCondition.notify_one();
1985         }
1986     };
1987     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1988         DBStatus::OK);
1989     {
1990         std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51402() 1991         g_processCondition.wait(lock, []() {
1992             return g_syncProcess.process == FINISHED;
1993         });
1994         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1995     }
1996     CloseDb();
1997 }
1998 
1999 /*
2000  * @tc.name: CloudSyncAssetTest006
2001  * @tc.desc: Test upload new data without assets
2002  * @tc.type: FUNC
2003  * @tc.require:
2004  * @tc.author: bty
2005  */
2006 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2007 {
2008     /**
2009      * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2010      * @tc.expected: step1. return ok.
2011      */
2012     int64_t paddingSize = 10;
2013     int localCount = 6;
2014     int cloudCount = 3;
2015     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2016     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2017     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2018 
2019     /**
2020      * @tc.steps:step2. sync, upload new data without assets,
2021      * @tc.expected: step2. return ok.
2022      */
2023     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2024     CloseDb();
2025 }
2026 
2027 /*
2028  * @tc.name: CloudSyncAssetTest007
2029  * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2030  * regarded as NO-CHANGE, rather than delete
2031  * @tc.type: FUNC
2032  * @tc.require:
2033  * @tc.author: wanyi
2034  */
2035 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2036 {
2037     /**
2038      * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2039      * @tc.expected: step1. return ok.
2040      */
2041     int64_t paddingSize = 10;
2042     int localCount = 1;
2043     int cloudCount = 1;
2044     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2045     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2046     /**
2047      * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2048      * @tc.expected: step2. return ok.
2049      */
2050     Assets assets;
2051     for (int64_t j = 0; j < cloudCount; j++) {
2052         Asset asset = g_cloudAsset;
2053         asset.name = g_cloudAsset.name + std::to_string(j);
2054         asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2055         assets.push_back(asset);
2056     }
2057     UpdateLocalAssets(db, assets, 0);
2058     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2059     /**
2060      * @tc.steps:step3. Do sync
2061      * @tc.expected: step3. return ok.
2062      */
2063     Query query = Query::Select().FromTable(g_tables);
2064     std::vector<SyncProcess> expectProcess;
__anon01eba2f51502(const std::map<std::string, SyncProcess> &process) 2065     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2066         ASSERT_EQ(process.size(), 1u);
2067         g_syncProcess = std::move(process.begin()->second);
2068 
2069         if (g_syncProcess.process == FINISHED) {
2070             g_processCondition.notify_one();
2071         }
2072     };
2073     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2074         DBStatus::OK);
2075     {
2076         std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51602() 2077         g_processCondition.wait(lock, []() {
2078             return g_syncProcess.process == FINISHED;
2079         });
2080         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2081     }
2082     /**
2083      * @tc.steps:step4. Check result. Cloud db should not contain asset.
2084      * @tc.expected: step4. return ok.
2085      */
2086     CheckAssetForAssetTest006();
2087     CloseDb();
2088 }
2089 
2090 
2091 /**
2092  * @tc.name: DownloadAssetTest001
2093  * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2094  * @tc.type: FUNC
2095  * @tc.require:
2096  * @tc.author: bty
2097  */
2098 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2099 {
2100     /**
2101      * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2102      * @tc.expected: step1. return ok.
2103      */
2104     DBStatus expectStatus = DBStatus::OK;
2105     int index = 0;
2106     InitMockAssetLoader(expectStatus, index);
2107 
2108     /**
2109      * @tc.steps:step2. init download data
2110      * @tc.expected: step2. return ok.
2111      */
2112     int64_t paddingSize = 1;
2113     int localCount = 120;
2114     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2115     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2116 
2117     /**
2118      * @tc.steps:step3. sync
2119      * @tc.expected: step3. return ok.
2120      */
2121     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2122 
2123     /**
2124      * @tc.steps:step4. Expect all states to be normal
2125      * @tc.expected: step4. return ok.
2126      */
2127     CheckAssetAfterDownload(db, localCount);
2128     CloseDb();
2129 }
2130 
2131 /*
2132  * @tc.name: CloudSyncAssetTest008
2133  * @tc.desc: sync failed with download asset
2134  * @tc.type: FUNC
2135  * @tc.require:
2136  * @tc.author: zhangqiquan
2137  */
2138 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2139 {
2140     /**
2141      * @tc.steps:step1. prepare asset data
2142      */
2143     int64_t paddingSize = 10;
2144     int localCount = 1;
2145     int cloudCount = 1;
2146     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2147     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2148     /**
2149      * @tc.steps:step2. set download asset status failed
2150      */
2151     g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2152     Query query = Query::Select().FromTable(g_tables);
2153     std::vector<SyncProcess> expectProcess;
__anon01eba2f51702(const std::map<std::string, SyncProcess> &process) 2154     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2155         for (const auto &item: process) {
2156             g_syncProcess = item.second;
2157         }
2158         if (g_syncProcess.process == FINISHED) {
2159             g_processCondition.notify_one();
2160         }
2161     };
2162     /**
2163      * @tc.steps:step3. sync and wait sync finished.
2164      * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2165      */
2166     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2167         DBStatus::OK);
2168     {
2169         std::unique_lock<std::mutex> lock(g_processMutex);
__anon01eba2f51802() 2170         g_processCondition.wait(lock, []() {
2171             return g_syncProcess.process == FINISHED;
2172         });
2173         ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2174     }
2175     /**
2176      * @tc.steps:step4. clear data.
2177      */
2178     g_virtualAssetLoader->SetDownloadStatus(OK);
2179     CloseDb();
2180 }
2181 
2182 /**
2183  * @tc.name: DownloadAssetTest002
2184  * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2185  * @tc.type: FUNC
2186  * @tc.require:
2187  * @tc.author: bty
2188  */
2189 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2190 {
2191     /**
2192      * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2193      * @tc.expected: step1. return ok.
2194      */
2195     DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2196     int index = 0;
2197     InitMockAssetLoader(expectStatus, index);
2198     int64_t paddingSize = 1;
2199     int localCount = 100;
2200 
2201     /**
2202      * @tc.steps:step2. init download data
2203      * @tc.expected: step2. return ok.
2204      */
2205     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2206     InsertCloudTableRecord(0, localCount, paddingSize, false);
2207 
2208     /**
2209      * @tc.steps:step3. sync
2210      * @tc.expected: step3. return ok.
2211      */
2212     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2213 
2214     /**
2215      * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2216      * @tc.expected: step4. return ok.
2217      */
2218     CheckAssetAfterDownload2(db, localCount);
2219     CloseDb();
2220 }
2221 
2222 /**
2223  * @tc.name: DownloadAssetTest003
2224  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2225  * @tc.type: FUNC
2226  * @tc.require:
2227  * @tc.author: bty
2228  */
2229 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2230 {
2231     /**
2232      * @tc.steps:step1. Init data and sync
2233      * @tc.expected: step1. return ok.
2234      */
2235     int64_t paddingSize = 1;
2236     int localCount = 10;
2237     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2238     InsertCloudTableRecord(0, localCount, paddingSize, false);
2239     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2240 
2241     /**
2242      * @tc.steps:step2. update cloud Asset where gid = 0
2243      * @tc.expected: step2. return ok.
2244      */
2245     UpdateCloudAssetForDownloadAssetTest003();
2246 
2247     /**
2248      * @tc.steps:step3. sync again
2249      * @tc.expected: step3. return ok.
2250      */
2251     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2252 
2253     /**
2254      * @tc.steps:step4. check asset after download where gid = 0
2255      * @tc.expected: step4. return ok.
2256      */
2257     CheckAssetForDownloadAssetTest003(db);
2258     CloseDb();
2259 }
2260 
2261 /**
2262  * @tc.name: DownloadAssetTest004
2263  * @tc.desc: Test total count, fail count and success count when drop table
2264  * @tc.type: FUNC
2265  * @tc.require:
2266  * @tc.author: liufuchenxing
2267  */
2268 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2269 {
2270     /**
2271      * @tc.steps:step1. Init data and sync
2272      * @tc.expected: step1. return ok.
2273      */
2274     int64_t paddingSize = 1;
2275     int count = 10;
2276     InsertUserTableRecord(db, 0, count, paddingSize, false);
2277     g_syncProcess = {};
__anon01eba2f51902(const std::map<std::string, SyncProcess> &process) 2278     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2279         for (const auto &item : process) {
2280             g_syncProcess = item.second;
2281         }
2282         if (g_syncProcess.process == FINISHED) {
2283             g_processCondition.notify_one();
2284         }
2285     };
2286     Query query = Query::Select().FromTable(g_tables);
2287     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2288     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2289 
2290     /**
2291      * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2292      * @tc.expected: step2. total = 20, success=0, fail=20
2293      */
2294     g_syncProcess = {};
2295     InsertCloudTableRecord(0, count, paddingSize, false);
2296     EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), DBStatus::OK);
2297     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2298     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2299     EXPECT_EQ(g_syncProcess.errCode, DBStatus::DB_ERROR);
2300     uint32_t expectTotalCnt = 20u;
2301     EXPECT_NE(g_syncProcess.tableProcess.find(g_tableName2), g_syncProcess.tableProcess.end());
2302     EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.batchIndex, 1u);
2303     EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.total, expectTotalCnt);
2304     EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.successCount, 0u);
2305     EXPECT_EQ(g_syncProcess.tableProcess[g_tableName2].downLoadInfo.failCount, expectTotalCnt);
2306 
2307     /**
2308      * @tc.steps:step3. close db.
2309      * @tc.expected: step3. close success.
2310      */
2311     CloseDb();
2312 }
2313 
2314 /**
2315  * @tc.name: SchemaTest001
2316  * @tc.desc: Create table with Cloud cooperation mode and do sync
2317  * @tc.type: FUNC
2318  * @tc.require:
2319  * @tc.author: wanyi
2320  */
2321 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2322 {
2323     /**
2324      * @tc.steps:step1. Create table with Cloud cooperation mode
2325      * @tc.expected: step1. return ok.
2326      */
2327     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2328     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2329     /**
2330      * @tc.steps:step1. do sync
2331      * @tc.expected: step1. return ok.
2332      */
2333     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2334     CloseDb();
2335 }
2336 
2337 /**
2338  * @tc.name: SchemaTest002
2339  * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2340  * @tc.type: FUNC
2341  * @tc.require:
2342  * @tc.author: wanyi
2343  */
2344 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2345 {
2346     /**
2347      * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2348      * @tc.expected: step1. return ok.
2349      */
2350     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2351     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2352     /**
2353      * @tc.steps:step1. do sync
2354      * @tc.expected: step1. return ok.
2355      */
2356     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2357     CloseDb();
2358 }
2359 
2360 }
2361 #endif // RELATIONAL_STORE
2362