• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "mock_asset_loader.h"
34 
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 using namespace std;
39 
40 namespace {
41     string g_storeID = "Relational_Store_SYNC";
42     const string g_tableName1 = "worker1";
43     const string g_tableName2 = "worker2";
44     const string g_tableName3 = "worker3";
45     const string g_tableName4 = "worker4";
46     const string DEVICE_CLOUD = "cloud_dev";
47     const string DB_SUFFIX = ".db";
48     const int64_t g_syncWaitTime = 60;
49     const int g_arrayHalfSub = 2;
50     int g_syncIndex = 0;
51     string g_testDir;
52     string g_storePath;
53     std::mutex g_processMutex;
54     std::condition_variable g_processCondition;
55     std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
56     std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
57     DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
58     RelationalStoreObserverUnitTest *g_observer = nullptr;
59     RelationalStoreDelegate *g_delegate = nullptr;
60     SyncProcess g_syncProcess;
61     using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
62     const std::string CREATE_LOCAL_TABLE_SQL =
63             "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
64     "name TEXT PRIMARY KEY," \
65     "height REAL ," \
66     "married BOOLEAN ," \
67     "photo BLOB NOT NULL," \
68     "assert BLOB," \
69     "age INT);";
70     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
71             "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
72     "id INTEGER PRIMARY KEY," \
73     "name TEXT ," \
74     "height REAL ," \
75     "photo BLOB ," \
76     "asserts BLOB," \
77     "age INT);";
78     const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
79     const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
80             "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
81     "name TEXT," \
82     "height REAL ," \
83     "married BOOLEAN ," \
84     "photo BLOB NOT NULL," \
85     "assert BLOB," \
86     "age INT);";
87     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
88             "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
89     "id INTEGER PRIMARY KEY," \
90     "name TEXT ," \
91     "height REAL ," \
92     "photo BLOB ," \
93     "asserts BLOB," \
94     "age INT);";
95     const std::vector<Field> g_cloudFiled1 = {
96         {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
97         {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
98         {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
99     };
100     const std::vector<Field> g_invalidCloudFiled1 = {
101         {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
102         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
103         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
104     };
105     const std::vector<Field> g_cloudFiled2 = {
106         {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
107         {"height", TYPE_INDEX<double>},  {"photo", TYPE_INDEX<Bytes>},
108         {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
109     };
110     const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
111         {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
112         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
113         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
114     };
115     const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
116     const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
117     const std::vector<string> g_prefix = {"Local", ""};
118     const Asset g_localAsset = {
119         .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
120         .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
121     };
122     const Asset g_cloudAsset = {
123         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
124         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
125     };
126 
CreateUserDBAndTable(sqlite3 * & db)127     void CreateUserDBAndTable(sqlite3 *&db)
128     {
129         EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
130         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
131         EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
132         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
133     }
134 
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)135     void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
136     {
137         std::string photo(photoSize, 'v');
138         int errCode;
139         std::vector<uint8_t> assetBlob;
140         for (int64_t i = begin; i < begin + count; ++i) {
141             Asset asset = g_localAsset;
142             asset.name = asset.name + std::to_string(i);
143             RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
144             string sql = "INSERT OR REPLACE INTO " + g_tableName1
145                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
146                          "', '175.8', '0', '" + photo + "', ? , '18');";
147             sqlite3_stmt *stmt = nullptr;
148             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
149             if (assetIsNull) {
150                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
151             } else {
152                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
153             }
154             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
155             SQLiteUtils::ResetStatement(stmt, true, errCode);
156         }
157         for (int64_t i = begin; i < begin + count; ++i) {
158             std::vector<Asset> assets;
159             Asset asset = g_localAsset;
160             asset.name = g_localAsset.name + std::to_string(i);
161             assets.push_back(asset);
162             asset.name = g_localAsset.name + std::to_string(i + 1);
163             assets.push_back(asset);
164             RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
165             string sql = "INSERT OR REPLACE INTO " + g_tableName2
166                          + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
167                          + std::to_string(i) + "', '155.10', '"+ photo + "',  ? , '21');";
168             sqlite3_stmt *stmt = nullptr;
169             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
170             if (assetIsNull) {
171                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
172             } else {
173                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
174             }
175             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
176             SQLiteUtils::ResetStatement(stmt, true, errCode);
177         }
178         LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
179             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
180     }
181 
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)182     void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
183     {
184         for (size_t i = 0; i < g_tables.size(); i++) {
185             string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
186             for (int64_t j = begin; j < begin + count; ++j) {
187                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
188             }
189             updateAge.pop_back();
190             updateAge += ");";
191             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
192         }
193         LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
194             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
195     }
196 
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)197     void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
198     {
199         for (size_t i = 0; i < g_tables.size(); i++) {
200             string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
201             for (int64_t j = begin; j < count; ++j) {
202                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
203             }
204             updateAge.pop_back();
205             updateAge += ");";
206             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
207         }
208         LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
209             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
210     }
211 
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)212     void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
213     {
214         std::vector<uint8_t> photo(photoSize, 'v');
215         std::string photoLocal(photoSize, 'v');
216         Asset asset = { .version = 1, .name = "Phone" };
217         std::vector<uint8_t> assetBlob;
218         RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
219         std::string assetStr(assetBlob.begin(), assetBlob.end());
220         std::vector<VBucket> record1;
221         std::vector<VBucket> extend1;
222         for (int64_t i = begin; i < count; ++i) {
223             Timestamp now = TimeHelper::GetSysCurrentTime();
224             VBucket data;
225             data.insert_or_assign("name", "Cloud" + std::to_string(i));
226             data.insert_or_assign("height", 166.0); // 166.0 is random double value
227             data.insert_or_assign("married", (bool)0);
228             data.insert_or_assign("photo", photo);
229             data.insert_or_assign("assert", KEY_1);
230             data.insert_or_assign("age", 13L);
231             record1.push_back(data);
232             VBucket log;
233             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
234             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
236             extend1.push_back(log);
237             std::this_thread::sleep_for(std::chrono::milliseconds(1));  // wait for 1 ms
238         }
239         int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
240         ASSERT_EQ(errCode, DBStatus::OK);
241         for (int64_t i = begin; i < count; ++i) {
242             string sql = "INSERT OR REPLACE INTO " + g_tableName3
243                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
244                          "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
245             ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
246         }
247     }
248 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)249     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
250     {
251         std::vector<uint8_t> photo(photoSize, 'v');
252         std::vector<VBucket> record1;
253         std::vector<VBucket> extend1;
254         std::vector<VBucket> record2;
255         std::vector<VBucket> extend2;
256         Timestamp now = TimeHelper::GetSysCurrentTime();
257         for (int64_t i = begin; i < begin + count; ++i) {
258             VBucket data;
259             data.insert_or_assign("name", "Cloud" + std::to_string(i));
260             data.insert_or_assign("height", 166.0); // 166.0 is random double value
261             data.insert_or_assign("married", false);
262             data.insert_or_assign("photo", photo);
263             data.insert_or_assign("AGE", 13L);
264             Asset asset = g_cloudAsset;
265             asset.name = asset.name + std::to_string(i);
266             assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
267             record1.push_back(data);
268             VBucket log;
269             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
270             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
272             extend1.push_back(log);
273 
274             std::vector<Asset> assets;
275             data.insert_or_assign("id", i);
276             data.insert_or_assign("height", 180.3); // 180.3 is random double value
277             for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
278                 asset.name = g_cloudAsset.name + std::to_string(j);
279                 assets.push_back(asset);
280             }
281             data.erase("assert");
282             data.erase("married");
283             assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
284             record2.push_back(data);
285             extend2.push_back(log);
286         }
287         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
288         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
289         LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
290             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
291         std::this_thread::sleep_for(std::chrono::milliseconds(count));
292     }
293 
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)294     void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
295     {
296         string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
297         std::vector<uint8_t> assetBlob;
298         int errCode;
299         Asset asset = g_cloudAsset;
300         asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
301         if (opType == AssetOpType::UPDATE) {
302             asset.uri = "/data/test";
303             asset.hash = "";
304         } else if (opType == AssetOpType::INSERT) {
305             asset.name = "Test10";
306         }
307         asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
308         sqlite3_stmt *stmt = nullptr;
309         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
310         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
311         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
312             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
313         }
314         SQLiteUtils::ResetStatement(stmt, true, errCode);
315     }
316 
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)317     void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
318     {
319         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
320         Asset asset1 = g_localAsset;
321         Asset asset2 = g_localAsset;
322         Assets assets;
323         asset1.name = g_localAsset.name + std::to_string(rowid);
324         asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
325         asset2.name = g_localAsset.name + std::to_string(rowid + 1);
326         asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
327         if (opType == AssetOpType::UPDATE) {
328             assets.push_back(asset1);
329             asset2.uri = "/data/test";
330             asset2.hash = "";
331             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
332             assets.push_back(asset2);
333         } else if (opType == AssetOpType::INSERT) {
334             assets.push_back(asset1);
335             assets.push_back(asset2);
336             Asset asset3;
337             asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
338             asset3.name = "Test10";
339             assets.push_back(asset3);
340         } else if (opType == AssetOpType::DELETE) {
341             assets.push_back(asset1);
342             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
343             assets.push_back(asset2);
344         } else {
345             assets.push_back(asset1);
346             assets.push_back(asset2);
347         }
348         sqlite3_stmt *stmt = nullptr;
349         std::vector<uint8_t> assetsBlob;
350         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
351         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
352         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
353             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
354         }
355         int errCode;
356         SQLiteUtils::ResetStatement(stmt, true, errCode);
357     }
358 
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)359     void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
360     {
361         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
362         std::vector<uint8_t> assetsBlob;
363         int errCode;
364         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
365         sqlite3_stmt *stmt = nullptr;
366         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
367         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
368             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
369         }
370         SQLiteUtils::ResetStatement(stmt, true, errCode);
371     }
372 
UpdateDiffType(int64_t begin)373     void UpdateDiffType(int64_t begin)
374     {
375         std::vector<std::string> hash = {"DEC", "update_", "insert_"};
376         std::vector<std::string> name = {
377             g_cloudAsset.name + std::to_string(0),
378             g_cloudAsset.name + std::to_string(1),
379             g_cloudAsset.name + std::to_string(3) // 3 is insert id
380         };
381         std::vector<VBucket> record;
382         std::vector<VBucket> extend;
383         Assets assets;
384         for (int i = 0; i < 3; i ++) { // 3 is type num
385             Asset asset = g_cloudAsset;
386             asset.name = name[i];
387             asset.hash = hash[i];
388             assets.push_back(asset);
389         }
390         VBucket data;
391         data.insert_or_assign("name", "Cloud" + std::to_string(0));
392         data.insert_or_assign("id", 0L);
393         data.insert_or_assign("asserts", assets);
394         Timestamp now = TimeHelper::GetSysCurrentTime();
395         VBucket log;
396         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
397         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
398         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
399         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400         record.push_back(data);
401         extend.push_back(log);
402         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
403     }
404 
CheckDiffTypeAsset(sqlite3 * & db)405     void CheckDiffTypeAsset(sqlite3 *&db)
406     {
407         std::vector<std::string> names = {
408             g_cloudAsset.name + std::to_string(0),
409             g_cloudAsset.name + std::to_string(1),
410             g_cloudAsset.name + std::to_string(3) // 3 is insert id
411         };
412         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
413         sqlite3_stmt *stmt = nullptr;
414         int index = 0;
415         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
416         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
417             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
418             Type cloudValue;
419             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
420             std::vector<uint8_t> assetsBlob;
421             Assets assets;
422             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
423             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
424             for (const Asset &asset: assets) {
425                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
426                 ASSERT_EQ(asset.name, names[index]);
427                 LOGE("lyh_test: name: %s", names[index].c_str());
428                 index++;
429             }
430         }
431         int errCode;
432         SQLiteUtils::ResetStatement(stmt, true, errCode);
433     }
434 
CheckAssetForAssetTest006()435     void CheckAssetForAssetTest006()
436     {
437         VBucket extend;
438         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
439         std::vector<VBucket> data;
440         g_virtualCloudDb->Query(g_tables[1], extend, data);
441         for (size_t j = 0; j < data.size(); ++j) {
442             ASSERT_NE(data[j].find("asserts"), data[j].end());
443             ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
444             Assets &assets = std::get<Assets>(data[j]["asserts"]);
445             ASSERT_TRUE(assets.size() > 0);
446             Asset &asset = assets[0];
447             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
448             EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
449         }
450     }
451 
CheckFillAssetForTest10(sqlite3 * & db)452     void CheckFillAssetForTest10(sqlite3 *&db)
453     {
454         std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
455         sqlite3_stmt *stmt = nullptr;
456         int index = 0;
457         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
458         int suffixId = 6;
459         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
460             if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
461                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
462                 Type cloudValue;
463                 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
464                 std::vector<uint8_t> assetBlob;
465                 Asset asset;
466                 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
467                 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
468                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
469                 if (index == 0) {
470                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471                 } else if (index == 1) {
472                     ASSERT_EQ(asset.name, "Test10");
473                 } else {
474                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
475                     ASSERT_EQ(asset.uri, "/data/test");
476                 }
477             } else {
478                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
479             }
480             index++;
481         }
482         int errCode;
483         SQLiteUtils::ResetStatement(stmt, true, errCode);
484     }
485 
CheckFillAssetsForTest10(sqlite3 * & db)486     void CheckFillAssetsForTest10(sqlite3 *&db)
487     {
488         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
489         sqlite3_stmt *stmt = nullptr;
490         int index = 0;
491         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
492         int insertIndex = 2;
493         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
494             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
495             Type cloudValue;
496             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
497             std::vector<uint8_t> assetsBlob;
498             Assets assets;
499             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
500             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
501             if (index == 0) {
502                 ASSERT_EQ(assets.size(), 2u);
503                 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
504                 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
505             } else if (index == 1) {
506                 ASSERT_EQ(assets.size(), 3u);
507                 ASSERT_EQ(assets[insertIndex].name, "Test10");
508                 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
509             } else if (index == 2) { // 2 is the third element
510                 ASSERT_EQ(assets.size(), 1u);
511                 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
512             } else {
513                 ASSERT_EQ(assets.size(), 2u);
514                 ASSERT_EQ(assets[1].uri, "/data/test");
515                 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
516             }
517             index++;
518         }
519         int errCode;
520         SQLiteUtils::ResetStatement(stmt, true, errCode);
521     }
522 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)523     int QueryCountCallback(void *data, int count, char **colValue, char **colName)
524     {
525         if (count != 1) {
526             return 0;
527         }
528         auto expectCount = reinterpret_cast<int64_t>(data);
529         EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
530         return 0;
531     }
532 
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")533     void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
534     {
535         for (size_t i = 0; i < g_tables.size(); ++i) {
536             string queryDownload = "select count(*) from " + g_tables[i] + " where name "
537                                    + " like '" + keyStr + "%'";
538             EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
539                 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
540         }
541     }
542 
CheckCloudTotalCount(std::vector<int64_t> expectCounts)543     void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
544     {
545         VBucket extend;
546         for (size_t i = 0; i < g_tables.size(); ++i) {
547             extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
548             int64_t realCount = 0;
549             std::vector<VBucket> data;
550             g_virtualCloudDb->Query(g_tables[i], extend, data);
551             for (size_t j = 0; j < data.size(); ++j) {
552                 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
553                 if (entry != data[j].end() && std::get<bool>(entry->second)) {
554                     continue;
555                 }
556                 realCount++;
557             }
558             EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
559         }
560     }
561 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)562     void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
563     {
564         TableSchema tableSchema1 = {
565             .name = g_tableName1,
566             .sharedTableName = g_tableName1 + "_shared",
567             .fields = g_cloudFiled1
568         };
569         TableSchema tableSchema2 = {
570             .name = g_tableName2,
571             .sharedTableName = g_tableName2 + "_shared",
572             .fields = g_cloudFiled2
573         };
574         TableSchema tableSchemaWithOutPrimaryKey = {
575             .name = g_tableName3,
576             .sharedTableName = g_tableName3 + "_shared",
577             .fields = g_cloudFiledWithOutPrimaryKey3
578         };
579         TableSchema tableSchema4 = {
580             .name = g_tableName4,
581             .sharedTableName = g_tableName4 + "_shared",
582             .fields = g_cloudFiled2
583         };
584         dataBaseSchema.tables.push_back(tableSchema1);
585         dataBaseSchema.tables.push_back(tableSchema2);
586         dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
587         dataBaseSchema.tables.push_back(tableSchema4);
588     }
589 
590 
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)591     void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
592     {
593         TableSchema tableSchema1 = {
594             .name = g_tableName1,
595             .sharedTableName = "",
596             .fields = g_invalidCloudFiled1
597         };
598         TableSchema tableSchema2 = {
599             .name = g_tableName2,
600             .sharedTableName = "",
601             .fields = g_cloudFiled2
602         };
603         dataBaseSchema.tables.push_back(tableSchema1);
604         dataBaseSchema.tables.push_back(tableSchema2);
605     }
606 
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)607     void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
608         std::vector<SyncProcess> &expectProcess)
609     {
610         expectProcess.clear();
611         std::vector<TableProcessInfo> infos;
612         uint32_t index = 1;
613         infos.push_back(TableProcessInfo{
614             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615         });
616         infos.push_back(TableProcessInfo{
617             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
618         });
619 
620         infos.push_back(TableProcessInfo{
621             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622         });
623         infos.push_back(TableProcessInfo{
624             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
625         });
626 
627         infos.push_back(TableProcessInfo{
628             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629         });
630         infos.push_back(TableProcessInfo{
631             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
632         });
633 
634         infos.push_back(TableProcessInfo{
635             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
636         });
637         infos.push_back(TableProcessInfo{
638             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
639         });
640 
641         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
642             SyncProcess syncProcess;
643             syncProcess.errCode = OK;
644             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
645             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
646             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
647             expectProcess.push_back(syncProcess);
648         }
649     }
650 
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)651     void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
652     {
653         expectProcess.clear();
654         std::vector<TableProcessInfo> infos;
655         // first notify, first table
656         infos.push_back(TableProcessInfo{
657             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
658         });
659         // first notify, second table
660         infos.push_back(TableProcessInfo{
661             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
662         });
663         // second notify, first table
664         infos.push_back(TableProcessInfo{
665             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
666         });
667         // second notify, second table
668         infos.push_back(TableProcessInfo{
669             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
670         });
671 
672         infos.push_back(TableProcessInfo{
673             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
674         });
675         // second notify, second table
676         infos.push_back(TableProcessInfo{
677             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
678         });
679         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
680             SyncProcess syncProcess;
681             syncProcess.errCode = OK;
682             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
683             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
684             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
685             expectProcess.push_back(syncProcess);
686         }
687     }
688 
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)689     void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
690         std::vector<SyncProcess> &expectProcess)
691     {
692         expectProcess.clear();
693         std::vector<TableProcessInfo> infos;
694         uint32_t index = 1;
695         infos.push_back(TableProcessInfo{
696             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697         });
698         infos.push_back(TableProcessInfo{
699             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
700         });
701 
702         infos.push_back(TableProcessInfo{
703             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704         });
705         infos.push_back(TableProcessInfo{
706             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
707         });
708 
709         infos.push_back(TableProcessInfo{
710             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
711         });
712         infos.push_back(TableProcessInfo{
713             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
714         });
715 
716         infos.push_back(TableProcessInfo{
717             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
718         });
719         infos.push_back(TableProcessInfo{
720             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
721         });
722 
723         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
724             SyncProcess syncProcess;
725             syncProcess.errCode = OK;
726             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
727             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
728             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
729             expectProcess.push_back(syncProcess);
730         }
731     }
732 
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)733     void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
734         std::vector<SyncProcess> &expectProcess)
735     {
736         expectProcess.clear();
737         std::vector<TableProcessInfo> infos;
738         uint32_t index = 1;
739         infos.push_back(TableProcessInfo{
740             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741         });
742         infos.push_back(TableProcessInfo{
743             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
744         });
745 
746         infos.push_back(TableProcessInfo{
747             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748         });
749         infos.push_back(TableProcessInfo{
750             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
751         });
752 
753         infos.push_back(TableProcessInfo{
754             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755         });
756         infos.push_back(TableProcessInfo{
757             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
758         });
759 
760         infos.push_back(TableProcessInfo{
761             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
762         });
763         infos.push_back(TableProcessInfo{
764             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
765         });
766 
767         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
768             SyncProcess syncProcess;
769             syncProcess.errCode = OK;
770             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
771             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
772             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
773             expectProcess.push_back(syncProcess);
774         }
775     }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)776     void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
777         std::vector<SyncProcess> &expectProcess)
778     {
779         g_syncIndex = 0;
780         callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
781             LOGI("devices size = %d", process.size());
782             ASSERT_EQ(process.size(), 1u);
783             syncProcess = std::move(process.begin()->second);
784             ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
785             ASSERT_NE(syncProcess.tableProcess.empty(), true);
786             LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
787             std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
788                 auto table1 = syncProcess.tableProcess.find(item);
789                 if (table1 != syncProcess.tableProcess.end()) {
790                     LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
791                          "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
792                          item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
793                          table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
794                          table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
795                          table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
796                          table1->second.upLoadInfo.failCount);
797                 }
798             });
799             if (expectProcess.empty()) {
800                 if (syncProcess.process == FINISHED) {
801                     g_processCondition.notify_one();
802                 }
803                 return;
804             }
805             ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
806             for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
807                 SyncProcess head = expectProcess[g_syncIndex];
808                 for (auto &expect : head.tableProcess) {
809                     auto real = syncProcess.tableProcess.find(expect.first);
810                     ASSERT_NE(real, syncProcess.tableProcess.end());
811                     EXPECT_EQ(expect.second.process, real->second.process);
812                     EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
813                     EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
814                     EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
815                     EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
816                     EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
817                     EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
818                     EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
819                     EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
820                 }
821             }
822             g_syncIndex++;
823             if (syncProcess.process == FINISHED) {
824                 g_processCondition.notify_one();
825             }
826         };
827     }
828 
CheckAllAssetAfterUpload(int64_t localCount)829     void CheckAllAssetAfterUpload(int64_t localCount)
830     {
831         VBucket extend;
832         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
833         std::vector<VBucket> data1;
834         g_virtualCloudDb->Query(g_tables[0], extend, data1);
835         for (size_t j = 0; j < data1.size(); ++j) {
836             Type entry;
837             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
838             ASSERT_TRUE(isExisted);
839             Asset asset = std::get<Asset>(entry);
840             bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
841             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
842             EXPECT_EQ(asset.version, baseAsset.version);
843             EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
844             EXPECT_EQ(asset.uri, baseAsset.uri);
845             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
846             EXPECT_EQ(asset.createTime, baseAsset.createTime);
847             EXPECT_EQ(asset.size, baseAsset.size);
848             EXPECT_EQ(asset.hash, baseAsset.hash);
849         }
850 
851         std::vector<VBucket> data2;
852         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
853         g_virtualCloudDb->Query(g_tables[1], extend, data2);
854         for (size_t j = 0; j < data2.size(); ++j) {
855             Type entry;
856             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
857             ASSERT_TRUE(isExisted);
858             Assets assets = std::get<Assets>(entry);
859             Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
860             int index = static_cast<int>(j);
861             for (const auto &asset: assets) {
862                 EXPECT_EQ(asset.version, baseAsset.version);
863                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
864                 EXPECT_EQ(asset.uri, baseAsset.uri);
865                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
866                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
867                 EXPECT_EQ(asset.size, baseAsset.size);
868                 EXPECT_EQ(asset.hash, baseAsset.hash);
869             }
870         }
871     }
872 
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)873     void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
874     {
875         string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
876         for (int64_t i = 0; i < localCount; ++i) {
877             queryDownload +=  "'" + std::to_string(i) + "',";
878         }
879         queryDownload.pop_back();
880         queryDownload += ");";
881         sqlite3_stmt *stmt = nullptr;
882         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
883         int index = 0;
884         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
885             std::vector<uint8_t> blobValue;
886             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
887             Assets assets;
888             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
889             bool isLocal = index >= localCount / g_arrayHalfSub;
890             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
891             int nameIndex = index;
892             for (const auto &asset: assets) {
893                 EXPECT_EQ(asset.version, baseAsset.version);
894                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
895                 EXPECT_EQ(asset.uri, baseAsset.uri);
896                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
897                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
898                 EXPECT_EQ(asset.size, baseAsset.size);
899                 EXPECT_EQ(asset.hash, baseAsset.hash);
900                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
901                 nameIndex++;
902             }
903             index++;
904         }
905         int errCode;
906         SQLiteUtils::ResetStatement(stmt, true, errCode);
907     }
908 
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)909     void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
910     {
911         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
912         for (int64_t i = 0; i < localCount; ++i) {
913             queryDownload +=  "'" + std::to_string(i) + "',";
914         }
915         queryDownload.pop_back();
916         queryDownload += ");";
917         sqlite3_stmt *stmt = nullptr;
918         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
919         int index = 0;
920         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
921             std::vector<uint8_t> blobValue;
922             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
923             Asset asset;
924             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
925             bool isCloud = index >= localCount;
926             Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
927             EXPECT_EQ(asset.version, baseAsset.version);
928             EXPECT_EQ(asset.name,
929                 baseAsset.name + std::to_string(isCloud ?  index - localCount / g_arrayHalfSub : index));
930             EXPECT_EQ(asset.uri, baseAsset.uri);
931             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
932             EXPECT_EQ(asset.createTime, baseAsset.createTime);
933             EXPECT_EQ(asset.size, baseAsset.size);
934             EXPECT_EQ(asset.hash, baseAsset.hash);
935             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
936             index++;
937         }
938         int errCode;
939         SQLiteUtils::ResetStatement(stmt, true, errCode);
940     }
941 
UpdateCloudAssetForDownloadAssetTest003()942     void UpdateCloudAssetForDownloadAssetTest003()
943     {
944         VBucket data;
945         std::vector<uint8_t> photo(1, 'x');
946         data.insert_or_assign("name", "Cloud" + std::to_string(0));
947         data.insert_or_assign("photo", photo);
948         data.insert_or_assign("assert", g_cloudAsset);
949         Timestamp now = TimeHelper::GetSysCurrentTime();
950         VBucket log;
951         std::vector<VBucket> record;
952         std::vector<VBucket> extend;
953         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
954         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
955         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
956         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957         record.push_back(data);
958         extend.push_back(log);
959         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
960     }
961 
CheckAssetForDownloadAssetTest003(sqlite3 * & db)962     void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
963     {
964         string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
965         sqlite3_stmt *stmt = nullptr;
966         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
967         int index = 0;
968         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
969             std::vector<uint8_t> blobValue;
970             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
971             Asset asset;
972             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
973             EXPECT_EQ(asset.name, g_cloudAsset.name);
974             EXPECT_EQ(asset.hash, g_cloudAsset.hash);
975             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
976             index++;
977         }
978         int errCode;
979         SQLiteUtils::ResetStatement(stmt, true, errCode);
980     }
981 
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)982     void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
983     {
984         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
985         for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
986             queryDownload +=  "'" + std::to_string(i) + "',";
987         }
988         queryDownload.pop_back();
989         queryDownload += ");";
990         sqlite3_stmt *stmt = nullptr;
991         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
992         int index = 0;
993         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
994             std::vector<uint8_t> blobValue;
995             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
996             Asset asset;
997             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
998             EXPECT_EQ(asset.version, g_cloudAsset.version);
999             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1000             index++;
1001         }
1002         int errCode;
1003         SQLiteUtils::ResetStatement(stmt, true, errCode);
1004     }
1005 
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1006     void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1007     {
1008         VBucket data;
1009         std::vector<uint8_t> photo(1, 'v');
1010         data.insert_or_assign("name", "Local" + std::to_string(0));
1011         data.insert_or_assign("height", 166.0); // 166.0 is random double value
1012         data.insert_or_assign("married", false);
1013         data.insert_or_assign("age", 13L);
1014         data.insert_or_assign("photo", photo);
1015         Asset asset = g_cloudAsset;
1016         asset.name = asset.name + std::to_string(0);
1017         data.insert_or_assign("assert", asset);
1018         record.push_back(data);
1019         VBucket log;
1020         Timestamp now = TimeHelper::GetSysCurrentTime();
1021         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1022         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024         log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1025         extend.push_back(log);
1026     }
1027 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1028     void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1029     {
1030         std::unique_lock<std::mutex> lock(g_processMutex);
1031         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1032             return syncProcess.process == FINISHED;
1033         });
1034         ASSERT_EQ(result, true);
1035         LOGD("-------------------sync end--------------");
1036     }
1037 
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1038     void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1039     {
1040         g_syncProcess = {};
1041         Query query = Query::Select().FromTable(tableNames);
1042         std::vector<SyncProcess> expectProcess;
1043         CloudSyncStatusCallback callback;
1044         GetCallback(g_syncProcess, callback, expectProcess);
1045         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1046         if (dbStatus == DBStatus::OK) {
1047             WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1048         }
1049     }
1050 
CloseDb()1051     void CloseDb()
1052     {
1053         delete g_observer;
1054         g_virtualCloudDb = nullptr;
1055         if (g_delegate != nullptr) {
1056             EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1057             g_delegate = nullptr;
1058         }
1059     }
1060 
InitMockAssetLoader(DBStatus & status,int & index)1061     void InitMockAssetLoader(DBStatus &status, int &index)
1062     {
1063         std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1064         ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1065         EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1066             .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1067                 std::map<std::string, Assets> &assets) {
1068                 LOGD("Download GID:%s", gid.c_str());
1069                 for (auto &item: assets) {
1070                     for (auto &asset: item.second) {
1071                         uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1072                         EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1073                             lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1074                         LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1075                         asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1076                     }
1077                 }
1078                 return status;
1079         });
1080     }
1081 
1082     class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1083     public:
1084         static void SetUpTestCase(void);
1085         static void TearDownTestCase(void);
1086         void SetUp();
1087         void TearDown();
1088     protected:
1089         sqlite3 *db = nullptr;
1090     };
1091 
1092 
SetUpTestCase(void)1093     void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1094     {
1095         DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1096         g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1097         LOGI("The test db is:%s", g_testDir.c_str());
1098         RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1099     }
1100 
TearDownTestCase(void)1101     void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1102     {}
1103 
SetUp(void)1104     void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1105     {
1106         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1107             LOGE("rm test db files error.");
1108         }
1109         DistributedDBToolsUnitTest::PrintTestCaseInfo();
1110         LOGD("Test dir is %s", g_testDir.c_str());
1111         db = RelationalTestUtils::CreateDataBase(g_storePath);
1112         ASSERT_NE(db, nullptr);
1113         CreateUserDBAndTable(db);
1114         g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1115         ASSERT_NE(g_observer, nullptr);
1116         ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1117             g_delegate), DBStatus::OK);
1118         ASSERT_NE(g_delegate, nullptr);
1119         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1120         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1121         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1122         g_virtualCloudDb = make_shared<VirtualCloudDb>();
1123         g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1124         g_syncProcess = {};
1125         ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1126         ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1127         // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1128         Query query = Query::Select().FromTable(g_tables);
1129         CloudSyncStatusCallback callback;
1130         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1131             DBStatus::SCHEMA_MISMATCH);
1132         DataBaseSchema dataBaseSchema;
1133         GetCloudDbSchema(dataBaseSchema);
1134         ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1135     }
1136 
TearDown(void)1137     void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1138     {
1139         EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1140         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1141             LOGE("rm test db files error.");
1142         }
1143     }
1144 
1145 /**
1146  * @tc.name: CloudSyncTest001
1147  * @tc.desc: Cloud data is older than local data.
1148  * @tc.type: FUNC
1149  * @tc.require:
1150  * @tc.author: bty
1151  */
1152 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1153 {
1154     int64_t paddingSize = 10;
1155     int64_t cloudCount = 20;
1156     int64_t localCount = cloudCount / g_arrayHalfSub;
1157     ChangedData changedDataForTable1;
1158     ChangedData changedDataForTable2;
1159     changedDataForTable1.tableName = g_tableName1;
1160     changedDataForTable2.tableName = g_tableName2;
1161     changedDataForTable1.field.push_back(std::string("name"));
1162     changedDataForTable2.field.push_back(std::string("id"));
1163     for (int i = 0; i < cloudCount; i++) {
1164         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1165         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1166     }
1167     g_observer->SetExpectedResult(changedDataForTable1);
1168     g_observer->SetExpectedResult(changedDataForTable2);
1169     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1170     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1171     Query query = Query::Select().FromTable(g_tables);
1172     std::vector<SyncProcess> expectProcess;
1173     InitProcessForTest1(cloudCount, localCount, expectProcess);
1174     CloudSyncStatusCallback callback;
1175     GetCallback(g_syncProcess, callback, expectProcess);
1176     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1177     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1178     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1179     g_observer->ClearChangedData();
1180     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1181     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1182     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1183     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1184     CloseDb();
1185 }
1186 
1187 /**
1188  * @tc.name: CloudSyncTest002
1189  * @tc.desc: Local data is older than cloud data.
1190  * @tc.type: FUNC
1191  * @tc.require:
1192  * @tc.author: bty
1193  */
1194 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1195 {
1196     int64_t localCount = 20;
1197     int64_t cloudCount = 10;
1198     int64_t paddingSize = 100;
1199     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1200     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1201     Query query = Query::Select().FromTable(g_tables);
1202     std::vector<SyncProcess> expectProcess;
1203     InitProcessForTest2(cloudCount, localCount, expectProcess);
1204     CloudSyncStatusCallback callback;
1205     GetCallback(g_syncProcess, callback, expectProcess);
1206     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1207     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1208     LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1209     CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1210     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1211     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1212     CloseDb();
1213 }
1214 
1215 /**
1216  * @tc.name: CloudSyncTest003
1217  * @tc.desc: test with update and delete operator
1218  * @tc.type: FUNC
1219  * @tc.require:
1220  * @tc.author: bty
1221  */
1222 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1223 {
1224     int64_t paddingSize = 10;
1225     int cloudCount = 20;
1226     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1227     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1228     Query query = Query::Select().FromTable(g_tables);
1229     std::vector<SyncProcess> expectProcess;
1230     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1231     CloudSyncStatusCallback callback;
1232     GetCallback(g_syncProcess, callback, expectProcess);
1233     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1234     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1235     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1236     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1237 
1238     int updateCount = 10;
1239     UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1240     g_syncProcess = {};
1241     InitProcessForTest1(cloudCount, updateCount, expectProcess);
1242     GetCallback(g_syncProcess, callback, expectProcess);
1243     LOGD("-------------------sync after update--------------");
1244     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1245     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1246 
1247     VBucket extend;
1248     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1249     std::vector<VBucket> data1;
1250     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1251     for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1252         EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1253     }
1254 
1255     std::vector<VBucket> data2;
1256     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1257     for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1258         EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1259     }
1260 
1261     int deleteCount = 3;
1262     DeleteUserTableRecord(db, 0, deleteCount);
1263     g_syncProcess = {};
1264     InitProcessForTest1(updateCount, deleteCount, expectProcess);
1265     GetCallback(g_syncProcess, callback, expectProcess);
1266     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1267     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1268 
1269     CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1270     CloseDb();
1271 }
1272 
1273 /**
1274  * @tc.name: CloudSyncTest004
1275  * @tc.desc: Random write of local and cloud data
1276  * @tc.type: FUNC
1277  * @tc.require:
1278  * @tc.author: bty
1279  */
1280 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1281 {
1282     int64_t paddingSize = 1024 * 8;
1283     vector<thread> threads;
1284     int cloudCount = 1024;
1285     threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1286     threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1287     for (auto &thread: threads) {
1288         thread.join();
1289     }
1290     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1291     CloseDb();
1292 }
1293 
1294 /**
1295  * @tc.name: CloudSyncTest005
1296  * @tc.desc: sync with device sync query
1297  * @tc.type: FUNC
1298  * @tc.require:
1299  * @tc.author: bty
1300  */
1301 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1302 {
1303     Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1304     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1305         DBStatus::NOT_SUPPORT);
1306 
1307     query = Query::Select();
1308     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1309         DBStatus::INVALID_ARGS);
1310     CloseDb();
1311 }
1312 
1313 /**
1314  * @tc.name: CloudSyncTest006
1315  * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1316  * @tc.type: FUNC
1317  * @tc.require:
1318  * @tc.author: wanyi
1319  */
1320 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1321 {
1322     int64_t paddingSize = 10;
1323     int cloudCount = 20;
1324     ChangedData changedDataForTable1;
1325     ChangedData changedDataForTable2;
1326     changedDataForTable1.tableName = g_tableName1;
1327     changedDataForTable2.tableName = g_tableName2;
1328     changedDataForTable1.field.push_back(std::string("name"));
1329     changedDataForTable2.field.push_back(std::string("id"));
1330     for (int i = 0; i < cloudCount; i++) {
1331         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1332         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1333     }
1334     g_observer->SetExpectedResult(changedDataForTable1);
1335     g_observer->SetExpectedResult(changedDataForTable2);
1336     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1337     InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1338     // Set correct cloudDbSchema (correct version)
1339     DataBaseSchema correctSchema;
1340     GetCloudDbSchema(correctSchema);
1341     ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1342     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1343     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1344     g_observer->ClearChangedData();
1345     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1346     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1347     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1348     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1349 
1350     // Reset cloudDbSchema (invalid version - null)
1351     DataBaseSchema nullSchema;
1352     ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1353     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1354 
1355     // Reset cloudDbSchema (invalid version - field mismatch)
1356     DataBaseSchema invalidSchema;
1357     GetInvalidCloudDbSchema(invalidSchema);
1358     ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1359     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1360     CloseDb();
1361 }
1362 
1363 /**
1364  * @tc.name: CloudSyncTest007
1365  * @tc.desc: Check the asset types after sync
1366  * @tc.type: FUNC
1367  * @tc.require:
1368  * @tc.author: bty
1369  */
1370 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1371 {
1372     int64_t paddingSize = 100;
1373     int localCount = 20;
1374     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1375     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1376     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1377 
1378     CheckAssetAfterDownload(db, localCount);
1379     CheckAllAssetAfterUpload(localCount);
1380     CheckAssetsAfterDownload(db, localCount);
1381     CloseDb();
1382 }
1383 
1384 /*
1385  * @tc.name: CloudSyncTest008
1386  * @tc.desc: Test sync with invalid param
1387  * @tc.type: FUNC
1388  * @tc.require:
1389  * @tc.author: zhangqiquan
1390  */
1391 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1392 {
1393     ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK);   // it will not happen because cloudb has been set in SetUp()
1394     Query query = Query::Select().FromTable({g_tableName3});
1395     // clouddb has been set in SetUp() and it's not null
1396     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1397     CloseDb();
1398 }
1399 
1400 /**
1401  * @tc.name: CloudSyncTest009
1402  * @tc.desc: The second time there was no data change and sync was called.
1403  * @tc.type: FUNC
1404  * @tc.require:
1405  * @tc.author: bty
1406  */
1407 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1408 {
1409     int64_t paddingSize = 10;
1410     int cloudCount = 20;
1411     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1412     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1413     Query query = Query::Select().FromTable(g_tables);
1414     std::vector<SyncProcess> expectProcess;
1415     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1416     CloudSyncStatusCallback callback;
1417     GetCallback(g_syncProcess, callback, expectProcess);
1418     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1419     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1420     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1421     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1422     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1423     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1424 
1425     g_syncProcess = {};
1426     InitProcessForTest9(cloudCount, 0, expectProcess);
1427     GetCallback(g_syncProcess, callback, expectProcess);
1428     LOGD("--------------the second sync-------------");
1429     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1430     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1431     CloseDb();
1432 }
1433 
1434 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1435 {
1436     int64_t paddingSize = 10;
1437     int cloudCount = 20;
1438     int localCount = 10;
1439     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1440     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1441     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1442 
1443     int rowid = 27;
1444     UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1445     UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1446     UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1447     UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1448 
1449     int id = 0;
1450     UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1451     UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1452     UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1453     UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1454 
1455     LOGD("--------------the second sync-------------");
1456     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1457 
1458     CheckFillAssetForTest10(db);
1459     CheckFillAssetsForTest10(db);
1460     CloseDb();
1461 }
1462 
1463 /**
1464  * @tc.name: CloudSyncTest011
1465  * @tc.desc: Test sync with same table name.
1466  * @tc.type: FUNC
1467  * @tc.require:
1468  * @tc.author: zhangqiquan
1469  */
1470 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1471 {
1472     Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1473     bool syncFinish = false;
1474     std::mutex syncMutex;
1475     std::condition_variable cv;
1476     std::atomic<int> callCount = 0;
1477     CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anona8df80760602( const std::map<std::string, SyncProcess> &onProcess) 1478         const std::map<std::string, SyncProcess> &onProcess) {
1479         ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1480         SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1481         callCount++;
1482         if (syncProcess.process == FINISHED) {
1483             std::lock_guard<std::mutex> autoLock(syncMutex);
1484             syncFinish = true;
1485         }
1486         cv.notify_all();
1487     };
1488     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1489     std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anona8df80760702() 1490     cv.wait(uniqueLock, [&syncFinish]() {
1491         return syncFinish;
1492     });
1493     RuntimeContext::GetInstance()->StopTaskPool();
1494     EXPECT_EQ(callCount, 2); // 2 is onProcess count
1495     CloseDb();
1496 }
1497 
1498 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1499 {
1500     int64_t localCount = 20;
1501     int64_t cloudCount = 10;
1502     int64_t paddingSize = 10;
1503     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1504     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1505     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1506 
1507     InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1508     InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1509     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1510 
1511     InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1512     InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1513     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1514 
1515 
1516     InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1517     InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1518     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1519     CloseDb();
1520 }
1521 
1522 /*
1523  * @tc.name: CloudSyncTest013
1524  * @tc.desc: test increment watermark when cloud db query data size is 0
1525  * @tc.type: FUNC
1526  * @tc.require:
1527  * @tc.author: zhuwentao
1528  */
1529 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1530 {
1531     /**
1532      * @tc.steps: insert some data into cloud db
1533      * @tc.expected: return ok.
1534      */
1535     int64_t paddingSize = 10;
1536     int64_t cloudCount = 10;
1537     SyncProcess syncProcess;
1538     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1539     /**
1540      * @tc.steps: try to cloud sync
1541      * @tc.expected: return ok.
1542      */
1543     Query query = Query::Select().FromTable(g_tables);
__anona8df80760802(const std::map<std::string, SyncProcess> &process) 1544     CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1545         LOGI("devices size = %d", process.size());
1546         ASSERT_EQ(process.size(), 1u);
1547         syncProcess = std::move(process.begin()->second);
1548         if (syncProcess.process == FINISHED) {
1549             g_processCondition.notify_one();
1550         }
1551     };
1552     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1553     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1554     uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1555     /**
1556      * @tc.steps: insert some increment data into cloud db
1557      * @tc.expected: return ok.
1558      */
1559     VBucket data;
1560     Timestamp now = TimeHelper::GetSysCurrentTime();
1561     data.insert_or_assign("name", "Cloud" + std::to_string(0));
1562     data.insert_or_assign("height", 166.0); // 166.0 is random double value
1563     data.insert_or_assign("married", false);
1564     data.insert_or_assign("age", 13L);
1565     VBucket log;
1566     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1567     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1568     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1569     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1570     log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1571     g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1572     syncProcess.process = PREPARED;
1573     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1574     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1575     uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1576     ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1577     CloseDb();
1578 }
1579 
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1580 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1581 {
1582     std::mutex dataMutex;
1583     std::condition_variable cv;
1584     bool finish = false;
1585     DBStatus res = OK;
1586     CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1587         const std::map<std::string, SyncProcess> &process) {
1588         std::map<std::string, SyncProcess> syncProcess;
1589         {
1590             std::lock_guard<std::mutex> autoLock(dataMutex);
1591             syncProcess = process;
1592             if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1593                 finish = true;
1594             }
1595             res = syncProcess[DEVICE_CLOUD].errCode;
1596         }
1597         cv.notify_one();
1598     };
1599     Query query = Query::Select().FromTable({g_tableName3});
1600     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1601     {
1602         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1603         cv.wait(uniqueLock, [&finish] {
1604             return finish;
1605         });
1606     }
1607     EXPECT_EQ(res, expectStatus);
1608 }
1609 
1610 /*
1611  * @tc.name: CloudSyncTest015
1612  * @tc.desc: Test sync with cloud error
1613  * @tc.type: FUNC
1614  * @tc.require:
1615  * @tc.author: zhangqiquan
1616  */
1617 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1618 {
1619     g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1620     TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1621 
1622     g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1623     TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1624 
1625     g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1626     TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1627 
1628     g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1629     TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1630 
1631     g_virtualCloudDb->SetActionStatus(DB_ERROR);
1632     TestSyncForStatus(g_delegate, CLOUD_ERROR);
1633 
1634     g_virtualCloudDb->SetActionStatus(OK);
1635     CloseDb();
1636 }
1637 
1638 /*
1639  * @tc.name: CloudSyncTest014
1640  * @tc.desc: Test sync with s4
1641  * @tc.type: FUNC
1642  * @tc.require:
1643  * @tc.author: zhangqiquan
1644  */
1645 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1646 {
1647     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1648     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1649 
1650     // sync failed because get security option failed
__anona8df80760b02(const std::string&, SecurityOption &option) 1651     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1652         option.securityLabel = S0;
1653         return DB_ERROR;
1654     });
1655     Query query = Query::Select().FromTable({g_tableName3});
1656     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1657         SECURITY_OPTION_CHECK_ERROR);
1658 
1659     // sync failed because get S4
__anona8df80760c02(const std::string&, SecurityOption &option) 1660     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1661         option.securityLabel = S4;
1662         return NOT_SUPPORT;
1663     });
1664     Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1665     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1666         NOT_SUPPORT);
1667 
1668     // sync failed because get S4
__anona8df80760d02(const std::string&, SecurityOption &option) 1669     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1670         option.securityLabel = S4;
1671         return OK;
1672     });
1673     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1674         SECURITY_OPTION_CHECK_ERROR);
1675 
1676     // sync failed because S4 has been cached
__anona8df80760e02(const std::string&, SecurityOption &option) 1677     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1678         option.securityLabel = S0;
1679         return OK;
1680     });
1681     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1682         SECURITY_OPTION_CHECK_ERROR);
1683     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1684     CloseDb();
1685 }
1686 
1687 /*
1688  * @tc.name: CloudSyncTest016
1689  * @tc.desc: Test sync when push before merge
1690  * @tc.type: FUNC
1691  * @tc.require:
1692  * @tc.author: chenchaohao
1693  */
1694 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1695 {
1696     int64_t localCount = 10;
1697     int64_t paddingSize = 10;
1698     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1699     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1700     CheckCloudTotalCount({10L, 10L});
1701     UpdateUserTableRecord(db, 0, localCount);
1702     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1703 
1704     VBucket extend;
1705     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1706     std::vector<VBucket> data1;
1707     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1708     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1709         EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1710     }
1711 
1712     std::vector<VBucket> data2;
1713     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1714     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1715         EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1716     }
1717 
1718     CloseDb();
1719 }
1720 
1721 /*
1722  * @tc.name: DataNotifier001
1723  * @tc.desc: Notify data without primary key
1724  * @tc.type: FUNC
1725  * @tc.require:
1726  * @tc.author: wanyi
1727  */
1728 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1729 {
1730     int64_t paddingSize = 10;
1731     int localCount = 20;
1732     InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1733     callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1734     CloseDb();
1735 }
1736 
1737 /**
1738  * @tc.name: CloudSyncAssetTest001
1739  * @tc.desc:
1740  * @tc.type: FUNC
1741  * @tc.require:
1742  * @tc.author: wanyi
1743  */
1744 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1745 {
1746     int64_t paddingSize = 100;
1747     int localCount = 20;
1748     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1749     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1750     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1751 
1752     CheckAssetAfterDownload(db, localCount);
1753     CheckAllAssetAfterUpload(localCount);
1754     CloseDb();
1755 }
1756 
1757 /*
1758  * @tc.name: MannualNotify001
1759  * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1760  * @tc.type: FUNC
1761  * @tc.require:
1762  * @tc.author: huangboxin
1763  */
1764 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1765 {
1766     int64_t paddingSize = 10;
1767     int localCount = 10;
1768     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1769     Query query = Query::Select().FromTable(g_tables);
1770     std::vector<SyncProcess> expectProcess;
1771     InitProcessForMannualSync1(expectProcess);
1772     CloudSyncStatusCallback callback;
1773     GetCallback(g_syncProcess, callback, expectProcess);
1774     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1775         DBStatus::OK);
1776     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1777     CloseDb();
1778 }
1779 
1780 /**
1781  * @tc.name: CloudProcessNotify001
1782  * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1783  * @tc.type: FUNC
1784  * @tc.require:
1785  * @tc.author: liufuchenxing
1786  */
1787 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1788 {
1789     /**
1790      * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1791      * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1792      */
1793     int64_t paddingSize = 10;
1794     int64_t localCount = 1;
1795     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1796     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1797     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1798     g_observer->ClearChangedData();
1799     LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1800     CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1801     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1802     CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1803 
1804     /**
1805      * @tc.steps: step2. reset data
1806      * @tc.expected: step2. return ok.
1807      */
1808     std::this_thread::sleep_for(std::chrono::milliseconds(100));
1809     g_syncProcess = {};
1810     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1811 
1812     /**
1813      * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1814      * @tc.expected: step3. return ok.
1815      */
1816     VBucket idMap;
1817     idMap.insert_or_assign("#_gid", std::to_string(0));
1818     ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1819 
1820     /**
1821      * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1822      * @tc.expected: step4. return ok.
1823      */
1824     std::vector<VBucket> record1;
1825     std::vector<VBucket> extend1;
1826     InsertCloudForCloudProcessNotify001(record1, extend1);
1827     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1828 
1829     /**
1830      * @tc.steps: step5. sync() and check local data.
1831      * @tc.expected: step5. return ok.
1832      */
1833     ChangedData changedDataForTable1;
1834     changedDataForTable1.tableName = g_tableName1;
1835     changedDataForTable1.field.push_back(std::string("name"));
1836     changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1837     g_observer->SetExpectedResult(changedDataForTable1);
1838 
1839     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1840     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1841     g_observer->ClearChangedData();
1842     LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1843     // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1844     CheckDownloadResult(db, {1L, 1L}, "Local");
1845     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1846     CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1847 
1848     /**
1849      * @tc.steps: step6. CloseDb().
1850      * @tc.expected: step6. return ok.
1851      */
1852     CloseDb();
1853 }
1854 
1855 /*
1856  * @tc.name: CloudSyncAssetTest002
1857  * @tc.desc:
1858  * @tc.type: FUNC
1859  * @tc.require:
1860  * @tc.author: huangboxin
1861  */
1862 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1863 {
1864     int64_t paddingSize = 10;
1865     int localCount = 3;
1866     int cloudCount = 3;
1867     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1868     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1869     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1870     CloseDb();
1871 }
1872 
1873 /*
1874  * @tc.name: CloudSyncAssetTest003
1875  * @tc.desc:
1876  * @tc.type: FUNC
1877  * @tc.require:
1878  * @tc.author: bty
1879  */
1880 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1881 {
1882     int64_t paddingSize = 10;
1883     int localCount = 3;
1884     int cloudCount = 3;
1885     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1886     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1887     Assets assets;
1888     assets.push_back(g_localAsset);
1889     assets.push_back(g_localAsset);
1890     UpdateLocalAssets(db, assets, 1);
1891     Query query = Query::Select().FromTable(g_tables);
1892     std::vector<SyncProcess> expectProcess;
__anona8df80760f02(const std::map<std::string, SyncProcess> &process) 1893     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1894         ASSERT_EQ(process.size(), 1u);
1895         g_syncProcess = std::move(process.begin()->second);
1896 
1897         if (g_syncProcess.process == FINISHED) {
1898             g_processCondition.notify_one();
1899         }
1900     };
1901     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1902         DBStatus::OK);
1903     {
1904         std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761002() 1905         g_processCondition.wait(lock, []() {
1906             return g_syncProcess.process == FINISHED;
1907         });
1908         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1909     }
1910     CloseDb();
1911 }
1912 
1913 /*
1914  * @tc.name: CloudSyncAssetTest004
1915  * @tc.desc:
1916  * @tc.type: FUNC
1917  * @tc.require:
1918  * @tc.author: bty
1919  */
1920 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1921 {
1922     int64_t paddingSize = 10;
1923     int localCount = 3;
1924     int cloudCount = 3;
1925     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1926     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1927     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1928 
1929     UpdateDiffType(localCount);
1930     g_syncProcess = {};
__anona8df80761102(const std::map<std::string, SyncProcess> &process) 1931     CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1932         ASSERT_EQ(process.size(), 1u);
1933         g_syncProcess = std::move(process.begin()->second);
1934         if (g_syncProcess.process == FINISHED) {
1935             g_processCondition.notify_one();
1936         }
1937     };
1938     Query query = Query::Select().FromTable(g_tables);
1939     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1940         DBStatus::OK);
1941     {
1942         std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761202() 1943         g_processCondition.wait(lock, []() {
1944             return g_syncProcess.process == FINISHED;
1945         });
1946         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1947     }
1948     CheckDiffTypeAsset(db);
1949     CloseDb();
1950 }
1951 
1952 /*
1953  * @tc.name: CloudSyncAssetTest005
1954  * @tc.desc: Test erase all no change Asset
1955  * @tc.type: FUNC
1956  * @tc.require:
1957  * @tc.author: bty
1958  */
1959 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
1960 {
1961     /**
1962      * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
1963      * @tc.expected: step1. return ok.
1964      */
1965     int64_t paddingSize = 10;
1966     int localCount = 3;
1967     int cloudCount = 3;
1968     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1969     Assets assets;
1970     for (int64_t j = 0; j < cloudCount; j++) {
1971         Asset asset = g_cloudAsset;
1972         asset.name = g_cloudAsset.name + std::to_string(j);
1973         assets.push_back(asset);
1974     }
1975     UpdateLocalAssets(db, assets, 0);
1976     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
1977 
1978     /**
1979      * @tc.steps:step2. Construct cloud data
1980      * @tc.expected: step2. return ok.
1981      */
1982     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1983 
1984     /**
1985      * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
1986      * @tc.expected: step3. return ok.
1987      */
1988     Query query = Query::Select().FromTable(g_tables);
1989     std::vector<SyncProcess> expectProcess;
__anona8df80761302(const std::map<std::string, SyncProcess> &process) 1990     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1991         ASSERT_EQ(process.size(), 1u);
1992         g_syncProcess = std::move(process.begin()->second);
1993 
1994         if (g_syncProcess.process == FINISHED) {
1995             g_processCondition.notify_one();
1996         }
1997     };
1998     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1999         DBStatus::OK);
2000     {
2001         std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761402() 2002         g_processCondition.wait(lock, []() {
2003             return g_syncProcess.process == FINISHED;
2004         });
2005         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2006     }
2007     CloseDb();
2008 }
2009 
2010 /*
2011  * @tc.name: CloudSyncAssetTest006
2012  * @tc.desc: Test upload new data without assets
2013  * @tc.type: FUNC
2014  * @tc.require:
2015  * @tc.author: bty
2016  */
2017 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2018 {
2019     /**
2020      * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2021      * @tc.expected: step1. return ok.
2022      */
2023     int64_t paddingSize = 10;
2024     int localCount = 6;
2025     int cloudCount = 3;
2026     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2027     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2028     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2029 
2030     /**
2031      * @tc.steps:step2. sync, upload new data without assets,
2032      * @tc.expected: step2. return ok.
2033      */
2034     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2035     CloseDb();
2036 }
2037 
2038 /*
2039  * @tc.name: CloudSyncAssetTest007
2040  * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2041  * regarded as NO-CHANGE, rather than delete
2042  * @tc.type: FUNC
2043  * @tc.require:
2044  * @tc.author: wanyi
2045  */
2046 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2047 {
2048     /**
2049      * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2050      * @tc.expected: step1. return ok.
2051      */
2052     int64_t paddingSize = 10;
2053     int localCount = 1;
2054     int cloudCount = 1;
2055     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2056     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2057     /**
2058      * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2059      * @tc.expected: step2. return ok.
2060      */
2061     Assets assets;
2062     for (int64_t j = 0; j < cloudCount; j++) {
2063         Asset asset = g_cloudAsset;
2064         asset.name = g_cloudAsset.name + std::to_string(j);
2065         asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2066         assets.push_back(asset);
2067     }
2068     UpdateLocalAssets(db, assets, 0);
2069     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2070     /**
2071      * @tc.steps:step3. Do sync
2072      * @tc.expected: step3. return ok.
2073      */
2074     Query query = Query::Select().FromTable(g_tables);
2075     std::vector<SyncProcess> expectProcess;
__anona8df80761502(const std::map<std::string, SyncProcess> &process) 2076     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2077         ASSERT_EQ(process.size(), 1u);
2078         g_syncProcess = std::move(process.begin()->second);
2079 
2080         if (g_syncProcess.process == FINISHED) {
2081             g_processCondition.notify_one();
2082         }
2083     };
2084     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2085         DBStatus::OK);
2086     {
2087         std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761602() 2088         g_processCondition.wait(lock, []() {
2089             return g_syncProcess.process == FINISHED;
2090         });
2091         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2092     }
2093     /**
2094      * @tc.steps:step4. Check result. Cloud db should not contain asset.
2095      * @tc.expected: step4. return ok.
2096      */
2097     CheckAssetForAssetTest006();
2098     CloseDb();
2099 }
2100 
2101 /**
2102  * @tc.name: DownloadAssetTest001
2103  * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2104  * @tc.type: FUNC
2105  * @tc.require:
2106  * @tc.author: bty
2107  */
2108 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2109 {
2110     /**
2111      * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2112      * @tc.expected: step1. return ok.
2113      */
2114     DBStatus expectStatus = DBStatus::OK;
2115     int index = 0;
2116     InitMockAssetLoader(expectStatus, index);
2117 
2118     /**
2119      * @tc.steps:step2. init download data
2120      * @tc.expected: step2. return ok.
2121      */
2122     int64_t paddingSize = 1;
2123     int localCount = 120;
2124     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2125     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2126 
2127     /**
2128      * @tc.steps:step3. sync
2129      * @tc.expected: step3. return ok.
2130      */
2131     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2132 
2133     /**
2134      * @tc.steps:step4. Expect all states to be normal
2135      * @tc.expected: step4. return ok.
2136      */
2137     CheckAssetAfterDownload(db, localCount);
2138     CloseDb();
2139 }
2140 
2141 /*
2142  * @tc.name: CloudSyncAssetTest008
2143  * @tc.desc: sync failed with download asset
2144  * @tc.type: FUNC
2145  * @tc.require:
2146  * @tc.author: zhangqiquan
2147  */
2148 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2149 {
2150     /**
2151      * @tc.steps:step1. prepare asset data
2152      */
2153     int64_t paddingSize = 10;
2154     int localCount = 1;
2155     int cloudCount = 1;
2156     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2157     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2158     /**
2159      * @tc.steps:step2. set download asset status failed
2160      */
2161     g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2162     Query query = Query::Select().FromTable(g_tables);
2163     std::vector<SyncProcess> expectProcess;
__anona8df80761702(const std::map<std::string, SyncProcess> &process) 2164     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2165         for (const auto &item: process) {
2166             g_syncProcess = item.second;
2167         }
2168         if (g_syncProcess.process == FINISHED) {
2169             g_processCondition.notify_one();
2170         }
2171     };
2172     /**
2173      * @tc.steps:step3. sync and wait sync finished.
2174      * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2175      */
2176     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2177         DBStatus::OK);
2178     {
2179         std::unique_lock<std::mutex> lock(g_processMutex);
__anona8df80761802() 2180         g_processCondition.wait(lock, []() {
2181             return g_syncProcess.process == FINISHED;
2182         });
2183         ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2184     }
2185     /**
2186      * @tc.steps:step4. clear data.
2187      */
2188     g_virtualAssetLoader->SetDownloadStatus(OK);
2189     CloseDb();
2190 }
2191 
2192 /*
2193  * @tc.name: CloudSyncAssetTest009
2194  * @tc.desc:
2195  * @tc.type: FUNC
2196  * @tc.require:
2197  * @tc.author: zhangqiquan
2198  */
2199 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level0)
2200 {
2201     // insert 3 data with asset 3 data without asset into local
2202     // sync them to cloud
2203     int64_t paddingSize = 10;
2204     int localCount = 3;
2205     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2206     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2207     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2208     // update these data and sync again
2209     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2210     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2211     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2212     EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2213     CloseDb();
2214 }
2215 
2216 /**
2217  * @tc.name: DownloadAssetTest002
2218  * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2219  * @tc.type: FUNC
2220  * @tc.require:
2221  * @tc.author: bty
2222  */
2223 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2224 {
2225     /**
2226      * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2227      * @tc.expected: step1. return ok.
2228      */
2229     DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2230     int index = 0;
2231     InitMockAssetLoader(expectStatus, index);
2232     int64_t paddingSize = 1;
2233     int localCount = 100;
2234 
2235     /**
2236      * @tc.steps:step2. init download data
2237      * @tc.expected: step2. return ok.
2238      */
2239     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2240     InsertCloudTableRecord(0, localCount, paddingSize, false);
2241 
2242     /**
2243      * @tc.steps:step3. sync
2244      * @tc.expected: step3. return ok.
2245      */
2246     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2247 
2248     /**
2249      * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2250      * @tc.expected: step4. return ok.
2251      */
2252     CheckAssetAfterDownload2(db, localCount);
2253     CloseDb();
2254 }
2255 
2256 /**
2257  * @tc.name: DownloadAssetTest003
2258  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2259  * @tc.type: FUNC
2260  * @tc.require:
2261  * @tc.author: bty
2262  */
2263 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2264 {
2265     /**
2266      * @tc.steps:step1. Init data and sync
2267      * @tc.expected: step1. return ok.
2268      */
2269     int64_t paddingSize = 1;
2270     int localCount = 10;
2271     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2272     InsertCloudTableRecord(0, localCount, paddingSize, false);
2273     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2274 
2275     /**
2276      * @tc.steps:step2. update cloud Asset where gid = 0
2277      * @tc.expected: step2. return ok.
2278      */
2279     UpdateCloudAssetForDownloadAssetTest003();
2280 
2281     /**
2282      * @tc.steps:step3. sync again
2283      * @tc.expected: step3. return ok.
2284      */
2285     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2286 
2287     /**
2288      * @tc.steps:step4. check asset after download where gid = 0
2289      * @tc.expected: step4. return ok.
2290      */
2291     CheckAssetForDownloadAssetTest003(db);
2292     CloseDb();
2293 }
2294 
2295 /**
2296  * @tc.name: DownloadAssetTest004
2297  * @tc.desc: Test total count, fail count and success count when drop table
2298  * @tc.type: FUNC
2299  * @tc.require:
2300  * @tc.author: liufuchenxing
2301  */
2302 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2303 {
2304     /**
2305      * @tc.steps:step1. Init data and sync
2306      * @tc.expected: step1. return ok.
2307      */
2308     int64_t paddingSize = 1;
2309     int count = 10;
2310     InsertUserTableRecord(db, 0, count, paddingSize, false);
2311     g_syncProcess = {};
__anona8df80761902(const std::map<std::string, SyncProcess> &process) 2312     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2313         for (const auto &item : process) {
2314             g_syncProcess = item.second;
2315         }
2316         if (g_syncProcess.process == FINISHED) {
2317             g_processCondition.notify_one();
2318         }
2319     };
2320     Query query = Query::Select().FromTable(g_tables);
2321     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2322     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2323 
2324     /**
2325      * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2326      * @tc.expected: step2. total = 20, success=0, fail=20
2327      */
2328     g_syncProcess = {};
2329     InsertCloudTableRecord(0, count, paddingSize, false);
2330     EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2331     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2332         DBStatus::NOT_FOUND);
2333 
2334     /**
2335      * @tc.steps:step3. close db.
2336      * @tc.expected: step3. close success.
2337      */
2338     CloseDb();
2339 }
2340 
2341 /**
2342  * @tc.name: SchemaTest001
2343  * @tc.desc: Create table with Cloud cooperation mode and do sync
2344  * @tc.type: FUNC
2345  * @tc.require:
2346  * @tc.author: wanyi
2347  */
2348 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2349 {
2350     /**
2351      * @tc.steps:step1. Create table with Cloud cooperation mode
2352      * @tc.expected: step1. return ok.
2353      */
2354     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2355     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2356     /**
2357      * @tc.steps:step1. do sync
2358      * @tc.expected: step1. return ok.
2359      */
2360     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2361     CloseDb();
2362 }
2363 
2364 /**
2365  * @tc.name: SchemaTest002
2366  * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2367  * @tc.type: FUNC
2368  * @tc.require:
2369  * @tc.author: wanyi
2370  */
2371 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2372 {
2373     /**
2374      * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2375      * @tc.expected: step1. return ok.
2376      */
2377     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2378     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2379     /**
2380      * @tc.steps:step1. do sync
2381      * @tc.expected: step1. return NOT_SUPPORT.
2382      */
2383     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2384     CloseDb();
2385 }
2386 
2387 /**
2388  * @tc.name: CloudCursorTest001
2389  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2390  * @tc.type: FUNC
2391  * @tc.require:
2392  * @tc.author: bty
2393  */
2394 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2395 {
2396     /**
2397      * @tc.steps:step1. Init data and sync
2398      * @tc.expected: step1. return ok.
2399      */
2400     int64_t paddingSize = 1;
2401     int localCount = 10;
2402     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2403     InsertCloudTableRecord(0, localCount, paddingSize, true);
2404     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2405 
2406     /**
2407      * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2408      * although it is unTrackerTable
2409      * @tc.expected: step2. return ok.
2410      */
2411     string sql = "select cursor from " + DBConstant::RELATIONAL_PREFIX + g_tableName1 + "_log";
2412     sqlite3_stmt *stmt = nullptr;
2413     EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2414     int64_t index = 0;
2415     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2416         EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2417     }
2418     int errCode;
2419     SQLiteUtils::ResetStatement(stmt, true, errCode);
2420     CloseDb();
2421 }
2422 }
2423 #endif // RELATIONAL_STORE
2424