• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "mock_asset_loader.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_storeID = "Relational_Store_SYNC";
43     const string g_tableName1 = "worker1";
44     const string g_tableName2 = "worker2";
45     const string g_tableName3 = "worker3";
46     const string g_tableName4 = "worker4";
47     const string DEVICE_CLOUD = "cloud_dev";
48     const string DB_SUFFIX = ".db";
49     const int64_t g_syncWaitTime = 60;
50     const int g_arrayHalfSub = 2;
51     int g_syncIndex = 0;
52     string g_testDir;
53     string g_storePath;
54     std::mutex g_processMutex;
55     std::condition_variable g_processCondition;
56     std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
57     std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
58     DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
59     RelationalStoreObserverUnitTest *g_observer = nullptr;
60     RelationalStoreDelegate *g_delegate = nullptr;
61     SyncProcess g_syncProcess;
62     using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
63     const std::string CREATE_LOCAL_TABLE_SQL =
64             "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
65     "name TEXT PRIMARY KEY," \
66     "height REAL ," \
67     "married BOOLEAN ," \
68     "photo BLOB NOT NULL," \
69     "assert BLOB," \
70     "age INT);";
71     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
72             "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
73     "id INTEGER PRIMARY KEY," \
74     "name TEXT ," \
75     "height REAL ," \
76     "photo BLOB ," \
77     "asserts BLOB," \
78     "age INT);";
79     const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
80     const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
81             "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
82     "name TEXT," \
83     "height REAL ," \
84     "married BOOLEAN ," \
85     "photo BLOB NOT NULL," \
86     "assert BLOB," \
87     "age INT);";
88     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
89             "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
90     "id INTEGER PRIMARY KEY," \
91     "name TEXT ," \
92     "height REAL ," \
93     "photo BLOB ," \
94     "asserts BLOB," \
95     "age INT);";
96     const std::vector<Field> g_cloudFiled1 = {
97         {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
98         {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
99         {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
100     };
101     const std::vector<Field> g_invalidCloudFiled1 = {
102         {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
103         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
104         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
105     };
106     const std::vector<Field> g_cloudFiled2 = {
107         {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
108         {"height", TYPE_INDEX<double>},  {"photo", TYPE_INDEX<Bytes>},
109         {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
110     };
111     const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
112         {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
113         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
114         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
115     };
116     const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
117     const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
118     const std::vector<string> g_prefix = {"Local", ""};
119     const Asset g_localAsset = {
120         .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
121         .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
122     };
123     const Asset g_cloudAsset = {
124         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
125         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
126     };
127 
CreateUserDBAndTable(sqlite3 * & db)128     void CreateUserDBAndTable(sqlite3 *&db)
129     {
130         EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
131         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
132         EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
133         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
134     }
135 
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)136     void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
137     {
138         std::string photo(photoSize, 'v');
139         int errCode;
140         std::vector<uint8_t> assetBlob;
141         for (int64_t i = begin; i < begin + count; ++i) {
142             Asset asset = g_localAsset;
143             asset.name = asset.name + std::to_string(i);
144             RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
145             string sql = "INSERT OR REPLACE INTO " + g_tableName1
146                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
147                          "', '175.8', '0', '" + photo + "', ? , '18');";
148             sqlite3_stmt *stmt = nullptr;
149             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
150             if (assetIsNull) {
151                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
152             } else {
153                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
154             }
155             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
156             SQLiteUtils::ResetStatement(stmt, true, errCode);
157         }
158         for (int64_t i = begin; i < begin + count; ++i) {
159             std::vector<Asset> assets;
160             Asset asset = g_localAsset;
161             asset.name = g_localAsset.name + std::to_string(i);
162             assets.push_back(asset);
163             asset.name = g_localAsset.name + std::to_string(i + 1);
164             assets.push_back(asset);
165             RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
166             string sql = "INSERT OR REPLACE INTO " + g_tableName2
167                          + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
168                          + std::to_string(i) + "', '155.10', '"+ photo + "',  ? , '21');";
169             sqlite3_stmt *stmt = nullptr;
170             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
171             if (assetIsNull) {
172                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
173             } else {
174                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
175             }
176             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
177             SQLiteUtils::ResetStatement(stmt, true, errCode);
178         }
179         LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
180             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
181     }
182 
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)183     void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
184     {
185         for (size_t i = 0; i < g_tables.size(); i++) {
186             string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
187             for (int64_t j = begin; j < begin + count; ++j) {
188                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
189             }
190             updateAge.pop_back();
191             updateAge += ");";
192             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
193         }
194         LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
195             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
196     }
197 
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)198     void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
199     {
200         for (size_t i = 0; i < g_tables.size(); i++) {
201             string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
202             for (int64_t j = begin; j < count; ++j) {
203                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
204             }
205             updateAge.pop_back();
206             updateAge += ");";
207             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
208         }
209         LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
210             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
211     }
212 
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)213     void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
214     {
215         std::vector<uint8_t> photo(photoSize, 'v');
216         std::string photoLocal(photoSize, 'v');
217         Asset asset = { .version = 1, .name = "Phone" };
218         std::vector<uint8_t> assetBlob;
219         RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
220         std::string assetStr(assetBlob.begin(), assetBlob.end());
221         std::vector<VBucket> record1;
222         std::vector<VBucket> extend1;
223         for (int64_t i = begin; i < count; ++i) {
224             Timestamp now = TimeHelper::GetSysCurrentTime();
225             VBucket data;
226             data.insert_or_assign("name", "Cloud" + std::to_string(i));
227             data.insert_or_assign("height", 166.0); // 166.0 is random double value
228             data.insert_or_assign("married", (bool)0);
229             data.insert_or_assign("photo", photo);
230             data.insert_or_assign("assert", KEY_1);
231             data.insert_or_assign("age", 13L);
232             record1.push_back(data);
233             VBucket log;
234             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
236             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
237             extend1.push_back(log);
238             std::this_thread::sleep_for(std::chrono::milliseconds(1));  // wait for 1 ms
239         }
240         int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
241         ASSERT_EQ(errCode, DBStatus::OK);
242         for (int64_t i = begin; i < count; ++i) {
243             string sql = "INSERT OR REPLACE INTO " + g_tableName3
244                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
245                          "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
246             ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
247         }
248     }
249 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)250     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
251     {
252         std::vector<uint8_t> photo(photoSize, 'v');
253         std::vector<VBucket> record1;
254         std::vector<VBucket> extend1;
255         std::vector<VBucket> record2;
256         std::vector<VBucket> extend2;
257         Timestamp now = TimeHelper::GetSysCurrentTime();
258         for (int64_t i = begin; i < begin + count; ++i) {
259             VBucket data;
260             data.insert_or_assign("name", "Cloud" + std::to_string(i));
261             data.insert_or_assign("height", 166.0); // 166.0 is random double value
262             data.insert_or_assign("married", false);
263             data.insert_or_assign("photo", photo);
264             data.insert_or_assign("AGE", 13L);
265             Asset asset = g_cloudAsset;
266             asset.name = asset.name + std::to_string(i);
267             assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
268             record1.push_back(data);
269             VBucket log;
270             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
272             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
273             extend1.push_back(log);
274 
275             std::vector<Asset> assets;
276             data.insert_or_assign("id", i);
277             data.insert_or_assign("height", 180.3); // 180.3 is random double value
278             for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
279                 asset.name = g_cloudAsset.name + std::to_string(j);
280                 assets.push_back(asset);
281             }
282             data.erase("assert");
283             data.erase("married");
284             assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
285             record2.push_back(data);
286             extend2.push_back(log);
287         }
288         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
289         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
290         LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
291             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
292         std::this_thread::sleep_for(std::chrono::milliseconds(count));
293     }
294 
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)295     void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
296     {
297         string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
298         std::vector<uint8_t> assetBlob;
299         int errCode;
300         Asset asset = g_cloudAsset;
301         asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
302         if (opType == AssetOpType::UPDATE) {
303             asset.uri = "/data/test";
304             asset.hash = "";
305         } else if (opType == AssetOpType::INSERT) {
306             asset.name = "Test10";
307         }
308         asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
309         sqlite3_stmt *stmt = nullptr;
310         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
311         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
312         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
313             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
314         }
315         SQLiteUtils::ResetStatement(stmt, true, errCode);
316     }
317 
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)318     void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
319     {
320         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
321         Asset asset1 = g_localAsset;
322         Asset asset2 = g_localAsset;
323         Assets assets;
324         asset1.name = g_localAsset.name + std::to_string(rowid);
325         asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326         asset2.name = g_localAsset.name + std::to_string(rowid + 1);
327         asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
328         if (opType == AssetOpType::UPDATE) {
329             assets.push_back(asset1);
330             asset2.uri = "/data/test";
331             asset2.hash = "";
332             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
333             assets.push_back(asset2);
334         } else if (opType == AssetOpType::INSERT) {
335             assets.push_back(asset1);
336             assets.push_back(asset2);
337             Asset asset3;
338             asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
339             asset3.name = "Test10";
340             assets.push_back(asset3);
341         } else if (opType == AssetOpType::DELETE) {
342             assets.push_back(asset1);
343             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
344             assets.push_back(asset2);
345         } else {
346             assets.push_back(asset1);
347             assets.push_back(asset2);
348         }
349         sqlite3_stmt *stmt = nullptr;
350         std::vector<uint8_t> assetsBlob;
351         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
352         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
353         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
354             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
355         }
356         int errCode;
357         SQLiteUtils::ResetStatement(stmt, true, errCode);
358     }
359 
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)360     void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
361     {
362         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
363         std::vector<uint8_t> assetsBlob;
364         int errCode;
365         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
366         sqlite3_stmt *stmt = nullptr;
367         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
368         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
369             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
370         }
371         SQLiteUtils::ResetStatement(stmt, true, errCode);
372     }
373 
UpdateDiffType(int64_t begin)374     void UpdateDiffType(int64_t begin)
375     {
376         std::vector<std::string> hash = {"DEC", "update_", "insert_"};
377         std::vector<std::string> name = {
378             g_cloudAsset.name + std::to_string(0),
379             g_cloudAsset.name + std::to_string(1),
380             g_cloudAsset.name + std::to_string(3) // 3 is insert id
381         };
382         std::vector<VBucket> record;
383         std::vector<VBucket> extend;
384         Assets assets;
385         for (int i = 0; i < 3; i ++) { // 3 is type num
386             Asset asset = g_cloudAsset;
387             asset.name = name[i];
388             asset.hash = hash[i];
389             assets.push_back(asset);
390         }
391         VBucket data;
392         data.insert_or_assign("name", "Cloud" + std::to_string(0));
393         data.insert_or_assign("id", 0L);
394         data.insert_or_assign("asserts", assets);
395         Timestamp now = TimeHelper::GetSysCurrentTime();
396         VBucket log;
397         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
398         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
399         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
401         record.push_back(data);
402         extend.push_back(log);
403         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
404     }
405 
CheckDiffTypeAsset(sqlite3 * & db)406     void CheckDiffTypeAsset(sqlite3 *&db)
407     {
408         std::vector<std::string> names = {
409             g_cloudAsset.name + std::to_string(0),
410             g_cloudAsset.name + std::to_string(1),
411             g_cloudAsset.name + std::to_string(3) // 3 is insert id
412         };
413         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
414         sqlite3_stmt *stmt = nullptr;
415         int index = 0;
416         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
417         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
418             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
419             Type cloudValue;
420             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
421             std::vector<uint8_t> assetsBlob;
422             Assets assets;
423             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
424             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
425             for (const Asset &asset: assets) {
426                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
427                 ASSERT_EQ(asset.name, names[index]);
428                 index++;
429             }
430         }
431         int errCode;
432         SQLiteUtils::ResetStatement(stmt, true, errCode);
433     }
434 
CheckAssetForAssetTest006()435     void CheckAssetForAssetTest006()
436     {
437         VBucket extend;
438         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
439         std::vector<VBucket> data;
440         g_virtualCloudDb->Query(g_tables[1], extend, data);
441         for (size_t j = 0; j < data.size(); ++j) {
442             ASSERT_NE(data[j].find("asserts"), data[j].end());
443             ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
444             Assets &assets = std::get<Assets>(data[j]["asserts"]);
445             ASSERT_TRUE(assets.size() > 0);
446             Asset &asset = assets[0];
447             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
448             EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
449         }
450     }
451 
CheckFillAssetForTest10(sqlite3 * & db)452     void CheckFillAssetForTest10(sqlite3 *&db)
453     {
454         std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
455         sqlite3_stmt *stmt = nullptr;
456         int index = 0;
457         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
458         int suffixId = 6;
459         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
460             if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
461                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
462                 Type cloudValue;
463                 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
464                 std::vector<uint8_t> assetBlob;
465                 Asset asset;
466                 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
467                 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
468                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
469                 if (index == 0) {
470                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
471                 } else if (index == 1) {
472                     ASSERT_EQ(asset.name, "Test10");
473                 } else {
474                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
475                     ASSERT_EQ(asset.uri, "/data/test");
476                 }
477             } else {
478                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
479             }
480             index++;
481         }
482         int errCode;
483         SQLiteUtils::ResetStatement(stmt, true, errCode);
484     }
485 
CheckFillAssetsForTest10(sqlite3 * & db)486     void CheckFillAssetsForTest10(sqlite3 *&db)
487     {
488         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
489         sqlite3_stmt *stmt = nullptr;
490         int index = 0;
491         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
492         int insertIndex = 2;
493         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
494             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
495             Type cloudValue;
496             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
497             std::vector<uint8_t> assetsBlob;
498             Assets assets;
499             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
500             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
501             if (index == 0) {
502                 ASSERT_EQ(assets.size(), 2u);
503                 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
504                 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
505             } else if (index == 1) {
506                 ASSERT_EQ(assets.size(), 3u);
507                 ASSERT_EQ(assets[insertIndex].name, "Test10");
508                 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
509             } else if (index == 2) { // 2 is the third element
510                 ASSERT_EQ(assets.size(), 1u);
511                 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
512             } else {
513                 ASSERT_EQ(assets.size(), 2u);
514                 ASSERT_EQ(assets[1].uri, "/data/test");
515                 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
516             }
517             index++;
518         }
519         int errCode;
520         SQLiteUtils::ResetStatement(stmt, true, errCode);
521     }
522 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)523     int QueryCountCallback(void *data, int count, char **colValue, char **colName)
524     {
525         if (count != 1) {
526             return 0;
527         }
528         auto expectCount = reinterpret_cast<int64_t>(data);
529         EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
530         return 0;
531     }
532 
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="Cloud")533     void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
534     {
535         for (size_t i = 0; i < g_tables.size(); ++i) {
536             string queryDownload = "select count(*) from " + g_tables[i] + " where name "
537                                    + " like '" + keyStr + "%'";
538             EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
539                 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
540         }
541     }
542 
CheckCloudTotalCount(std::vector<int64_t> expectCounts)543     void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
544     {
545         VBucket extend;
546         for (size_t i = 0; i < g_tables.size(); ++i) {
547             extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
548             int64_t realCount = 0;
549             std::vector<VBucket> data;
550             g_virtualCloudDb->Query(g_tables[i], extend, data);
551             for (size_t j = 0; j < data.size(); ++j) {
552                 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
553                 if (entry != data[j].end() && std::get<bool>(entry->second)) {
554                     continue;
555                 }
556                 realCount++;
557             }
558             EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
559         }
560     }
561 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)562     void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
563     {
564         TableSchema tableSchema1 = {
565             .name = g_tableName1,
566             .sharedTableName = g_tableName1 + "_shared",
567             .fields = g_cloudFiled1
568         };
569         TableSchema tableSchema2 = {
570             .name = g_tableName2,
571             .sharedTableName = g_tableName2 + "_shared",
572             .fields = g_cloudFiled2
573         };
574         TableSchema tableSchemaWithOutPrimaryKey = {
575             .name = g_tableName3,
576             .sharedTableName = g_tableName3 + "_shared",
577             .fields = g_cloudFiledWithOutPrimaryKey3
578         };
579         TableSchema tableSchema4 = {
580             .name = g_tableName4,
581             .sharedTableName = g_tableName4 + "_shared",
582             .fields = g_cloudFiled2
583         };
584         dataBaseSchema.tables.push_back(tableSchema1);
585         dataBaseSchema.tables.push_back(tableSchema2);
586         dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
587         dataBaseSchema.tables.push_back(tableSchema4);
588     }
589 
590 
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)591     void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
592     {
593         TableSchema tableSchema1 = {
594             .name = g_tableName1,
595             .sharedTableName = "",
596             .fields = g_invalidCloudFiled1
597         };
598         TableSchema tableSchema2 = {
599             .name = g_tableName2,
600             .sharedTableName = "",
601             .fields = g_cloudFiled2
602         };
603         dataBaseSchema.tables.push_back(tableSchema1);
604         dataBaseSchema.tables.push_back(tableSchema2);
605     }
606 
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)607     void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
608         std::vector<SyncProcess> &expectProcess)
609     {
610         expectProcess.clear();
611         std::vector<TableProcessInfo> infos;
612         uint32_t index = 1;
613         infos.push_back(TableProcessInfo{
614             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
615         });
616         infos.push_back(TableProcessInfo{
617             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
618         });
619 
620         infos.push_back(TableProcessInfo{
621             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
622         });
623         infos.push_back(TableProcessInfo{
624             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
625         });
626 
627         infos.push_back(TableProcessInfo{
628             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
629         });
630         infos.push_back(TableProcessInfo{
631             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
632         });
633 
634         infos.push_back(TableProcessInfo{
635             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
636         });
637         infos.push_back(TableProcessInfo{
638             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
639         });
640 
641         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
642             SyncProcess syncProcess;
643             syncProcess.errCode = OK;
644             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
645             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
646             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
647             expectProcess.push_back(syncProcess);
648         }
649     }
650 
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)651     void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
652     {
653         expectProcess.clear();
654         std::vector<TableProcessInfo> infos;
655         // first notify, first table
656         infos.push_back(TableProcessInfo{
657             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
658         });
659         // first notify, second table
660         infos.push_back(TableProcessInfo{
661             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
662         });
663         // second notify, first table
664         infos.push_back(TableProcessInfo{
665             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
666         });
667         // second notify, second table
668         infos.push_back(TableProcessInfo{
669             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
670         });
671 
672         infos.push_back(TableProcessInfo{
673             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
674         });
675         // second notify, second table
676         infos.push_back(TableProcessInfo{
677             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
678         });
679         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
680             SyncProcess syncProcess;
681             syncProcess.errCode = OK;
682             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
683             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
684             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
685             expectProcess.push_back(syncProcess);
686         }
687     }
688 
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)689     void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
690         std::vector<SyncProcess> &expectProcess)
691     {
692         expectProcess.clear();
693         std::vector<TableProcessInfo> infos;
694         uint32_t index = 1;
695         infos.push_back(TableProcessInfo{
696             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
697         });
698         infos.push_back(TableProcessInfo{
699             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
700         });
701 
702         infos.push_back(TableProcessInfo{
703             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
704         });
705         infos.push_back(TableProcessInfo{
706             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
707         });
708 
709         infos.push_back(TableProcessInfo{
710             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
711         });
712         infos.push_back(TableProcessInfo{
713             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
714         });
715 
716         infos.push_back(TableProcessInfo{
717             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
718         });
719         infos.push_back(TableProcessInfo{
720             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
721         });
722 
723         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
724             SyncProcess syncProcess;
725             syncProcess.errCode = OK;
726             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
727             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
728             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
729             expectProcess.push_back(syncProcess);
730         }
731     }
732 
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)733     void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
734         std::vector<SyncProcess> &expectProcess)
735     {
736         expectProcess.clear();
737         std::vector<TableProcessInfo> infos;
738         uint32_t index = 1;
739         infos.push_back(TableProcessInfo{
740             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
741         });
742         infos.push_back(TableProcessInfo{
743             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
744         });
745 
746         infos.push_back(TableProcessInfo{
747             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
748         });
749         infos.push_back(TableProcessInfo{
750             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
751         });
752 
753         infos.push_back(TableProcessInfo{
754             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
755         });
756         infos.push_back(TableProcessInfo{
757             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
758         });
759 
760         infos.push_back(TableProcessInfo{
761             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
762         });
763         infos.push_back(TableProcessInfo{
764             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
765         });
766 
767         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
768             SyncProcess syncProcess;
769             syncProcess.errCode = OK;
770             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
771             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
772             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
773             expectProcess.push_back(syncProcess);
774         }
775     }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)776     void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
777         std::vector<SyncProcess> &expectProcess)
778     {
779         g_syncIndex = 0;
780         callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
781             LOGI("devices size = %d", process.size());
782             ASSERT_EQ(process.size(), 1u);
783             syncProcess = std::move(process.begin()->second);
784             ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
785             ASSERT_NE(syncProcess.tableProcess.empty(), true);
786             LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
787             std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
788                 auto table1 = syncProcess.tableProcess.find(item);
789                 if (table1 != syncProcess.tableProcess.end()) {
790                     LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
791                          "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
792                          item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
793                          table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
794                          table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
795                          table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
796                          table1->second.upLoadInfo.failCount);
797                 }
798             });
799             if (expectProcess.empty()) {
800                 if (syncProcess.process == FINISHED) {
801                     g_processCondition.notify_one();
802                 }
803                 return;
804             }
805             ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
806             for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
807                 SyncProcess head = expectProcess[g_syncIndex];
808                 for (auto &expect : head.tableProcess) {
809                     auto real = syncProcess.tableProcess.find(expect.first);
810                     ASSERT_NE(real, syncProcess.tableProcess.end());
811                     EXPECT_EQ(expect.second.process, real->second.process);
812                     EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
813                     EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
814                     EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
815                     EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
816                     EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
817                     EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
818                     EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
819                     EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
820                 }
821             }
822             g_syncIndex++;
823             if (syncProcess.process == FINISHED) {
824                 g_processCondition.notify_one();
825             }
826         };
827     }
828 
CheckAllAssetAfterUpload(int64_t localCount)829     void CheckAllAssetAfterUpload(int64_t localCount)
830     {
831         VBucket extend;
832         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
833         std::vector<VBucket> data1;
834         g_virtualCloudDb->Query(g_tables[0], extend, data1);
835         for (size_t j = 0; j < data1.size(); ++j) {
836             Type entry;
837             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
838             ASSERT_TRUE(isExisted);
839             Asset asset = std::get<Asset>(entry);
840             bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
841             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
842             EXPECT_EQ(asset.version, baseAsset.version);
843             EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
844             EXPECT_EQ(asset.uri, baseAsset.uri);
845             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
846             EXPECT_EQ(asset.createTime, baseAsset.createTime);
847             EXPECT_EQ(asset.size, baseAsset.size);
848             EXPECT_EQ(asset.hash, baseAsset.hash);
849         }
850 
851         std::vector<VBucket> data2;
852         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
853         g_virtualCloudDb->Query(g_tables[1], extend, data2);
854         for (size_t j = 0; j < data2.size(); ++j) {
855             Type entry;
856             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
857             ASSERT_TRUE(isExisted);
858             Assets assets = std::get<Assets>(entry);
859             Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
860             int index = static_cast<int>(j);
861             for (const auto &asset: assets) {
862                 EXPECT_EQ(asset.version, baseAsset.version);
863                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
864                 EXPECT_EQ(asset.uri, baseAsset.uri);
865                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
866                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
867                 EXPECT_EQ(asset.size, baseAsset.size);
868                 EXPECT_EQ(asset.hash, baseAsset.hash);
869             }
870         }
871     }
872 
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)873     void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
874     {
875         string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
876         for (int64_t i = 0; i < localCount; ++i) {
877             queryDownload +=  "'" + std::to_string(i) + "',";
878         }
879         queryDownload.pop_back();
880         queryDownload += ");";
881         sqlite3_stmt *stmt = nullptr;
882         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
883         int index = 0;
884         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
885             std::vector<uint8_t> blobValue;
886             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
887             Assets assets;
888             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
889             bool isLocal = index >= localCount / g_arrayHalfSub;
890             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
891             int nameIndex = index;
892             for (const auto &asset: assets) {
893                 EXPECT_EQ(asset.version, baseAsset.version);
894                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
895                 EXPECT_EQ(asset.uri, baseAsset.uri);
896                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
897                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
898                 EXPECT_EQ(asset.size, baseAsset.size);
899                 EXPECT_EQ(asset.hash, baseAsset.hash);
900                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
901                 nameIndex++;
902             }
903             index++;
904         }
905         int errCode;
906         SQLiteUtils::ResetStatement(stmt, true, errCode);
907     }
908 
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)909     void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
910     {
911         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
912         for (int64_t i = 0; i < localCount; ++i) {
913             queryDownload +=  "'" + std::to_string(i) + "',";
914         }
915         queryDownload.pop_back();
916         queryDownload += ");";
917         sqlite3_stmt *stmt = nullptr;
918         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
919         int index = 0;
920         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
921             std::vector<uint8_t> blobValue;
922             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
923             Asset asset;
924             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
925             bool isCloud = index >= localCount;
926             Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
927             EXPECT_EQ(asset.version, baseAsset.version);
928             EXPECT_EQ(asset.name,
929                 baseAsset.name + std::to_string(isCloud ?  index - localCount / g_arrayHalfSub : index));
930             EXPECT_EQ(asset.uri, baseAsset.uri);
931             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
932             EXPECT_EQ(asset.createTime, baseAsset.createTime);
933             EXPECT_EQ(asset.size, baseAsset.size);
934             EXPECT_EQ(asset.hash, baseAsset.hash);
935             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
936             index++;
937         }
938         int errCode;
939         SQLiteUtils::ResetStatement(stmt, true, errCode);
940     }
941 
UpdateCloudAssetForDownloadAssetTest003()942     void UpdateCloudAssetForDownloadAssetTest003()
943     {
944         VBucket data;
945         std::vector<uint8_t> photo(1, 'x');
946         data.insert_or_assign("name", "Cloud" + std::to_string(0));
947         data.insert_or_assign("photo", photo);
948         data.insert_or_assign("assert", g_cloudAsset);
949         Timestamp now = TimeHelper::GetSysCurrentTime();
950         VBucket log;
951         std::vector<VBucket> record;
952         std::vector<VBucket> extend;
953         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
954         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
955         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
956         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957         record.push_back(data);
958         extend.push_back(log);
959         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
960     }
961 
CheckAssetForDownloadAssetTest003(sqlite3 * & db)962     void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
963     {
964         string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
965         sqlite3_stmt *stmt = nullptr;
966         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
967         int index = 0;
968         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
969             std::vector<uint8_t> blobValue;
970             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
971             Asset asset;
972             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
973             EXPECT_EQ(asset.name, g_cloudAsset.name);
974             EXPECT_EQ(asset.hash, g_cloudAsset.hash);
975             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
976             index++;
977         }
978         int errCode;
979         SQLiteUtils::ResetStatement(stmt, true, errCode);
980     }
981 
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)982     void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
983     {
984         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
985         for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
986             queryDownload +=  "'" + std::to_string(i) + "',";
987         }
988         queryDownload.pop_back();
989         queryDownload += ");";
990         sqlite3_stmt *stmt = nullptr;
991         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
992         int index = 0;
993         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
994             std::vector<uint8_t> blobValue;
995             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
996             Asset asset;
997             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
998             EXPECT_EQ(asset.version, g_cloudAsset.version);
999             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1000             index++;
1001         }
1002         int errCode;
1003         SQLiteUtils::ResetStatement(stmt, true, errCode);
1004     }
1005 
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1006     void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1007     {
1008         VBucket data;
1009         std::vector<uint8_t> photo(1, 'v');
1010         data.insert_or_assign("name", "Local" + std::to_string(0));
1011         data.insert_or_assign("height", 166.0); // 166.0 is random double value
1012         data.insert_or_assign("married", false);
1013         data.insert_or_assign("age", 13L);
1014         data.insert_or_assign("photo", photo);
1015         Asset asset = g_cloudAsset;
1016         asset.name = asset.name + std::to_string(0);
1017         data.insert_or_assign("assert", asset);
1018         record.push_back(data);
1019         VBucket log;
1020         Timestamp now = TimeHelper::GetSysCurrentTime();
1021         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1022         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1024         log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1025         extend.push_back(log);
1026     }
1027 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1028     void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1029     {
1030         std::unique_lock<std::mutex> lock(g_processMutex);
1031         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1032             return syncProcess.process == FINISHED;
1033         });
1034         ASSERT_EQ(result, true);
1035         LOGD("-------------------sync end--------------");
1036     }
1037 
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1038     void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1039     {
1040         g_syncProcess = {};
1041         Query query = Query::Select().FromTable(tableNames);
1042         std::vector<SyncProcess> expectProcess;
1043         CloudSyncStatusCallback callback;
1044         GetCallback(g_syncProcess, callback, expectProcess);
1045         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1046         if (dbStatus == DBStatus::OK) {
1047             WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1048         }
1049     }
1050 
CloseDb()1051     void CloseDb()
1052     {
1053         if (g_delegate != nullptr) {
1054             EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1055             g_delegate = nullptr;
1056         }
1057         if (g_observer != nullptr) {
1058             delete g_observer;
1059             g_observer = nullptr;
1060         }
1061         g_virtualCloudDb = nullptr;
1062     }
1063 
InitMockAssetLoader(DBStatus & status,int & index)1064     void InitMockAssetLoader(DBStatus &status, int &index)
1065     {
1066         std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1067         ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1068         EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1069             .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1070                 std::map<std::string, Assets> &assets) {
1071                 LOGD("Download GID:%s", gid.c_str());
1072                 for (auto &item: assets) {
1073                     for (auto &asset: item.second) {
1074                         uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1075                         EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1076                             lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1077                         LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1078                         asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1079                     }
1080                 }
1081                 return status;
1082         });
1083     }
1084 
1085     class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1086     public:
1087         static void SetUpTestCase(void);
1088         static void TearDownTestCase(void);
1089         void SetUp();
1090         void TearDown();
1091     protected:
1092         sqlite3 *db = nullptr;
1093         VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
1094     };
1095 
1096 
SetUpTestCase(void)1097     void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1098     {
1099         DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1100         g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1101         LOGI("The test db is:%s", g_testDir.c_str());
1102         RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1103     }
1104 
TearDownTestCase(void)1105     void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1106     {}
1107 
SetUp(void)1108     void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1109     {
1110         RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1111         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1112             LOGE("rm test db files error.");
1113         }
1114         DistributedDBToolsUnitTest::PrintTestCaseInfo();
1115         LOGD("Test dir is %s", g_testDir.c_str());
1116         db = RelationalTestUtils::CreateDataBase(g_storePath);
1117         ASSERT_NE(db, nullptr);
1118         CreateUserDBAndTable(db);
1119         g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1120         ASSERT_NE(g_observer, nullptr);
1121         ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1122             g_delegate), DBStatus::OK);
1123         ASSERT_NE(g_delegate, nullptr);
1124         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1125         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1126         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1127         g_virtualCloudDb = make_shared<VirtualCloudDb>();
1128         g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1129         g_syncProcess = {};
1130         ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1131         ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1132         // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1133         Query query = Query::Select().FromTable(g_tables);
1134         CloudSyncStatusCallback callback;
1135         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1136             DBStatus::SCHEMA_MISMATCH);
1137         DataBaseSchema dataBaseSchema;
1138         GetCloudDbSchema(dataBaseSchema);
1139         ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1140         communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1141         ASSERT_TRUE(communicatorAggregator_ != nullptr);
1142         RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1143     }
1144 
TearDown(void)1145     void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1146     {
1147         EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1148         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1149             LOGE("rm test db files error.");
1150         }
1151         RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1152         communicatorAggregator_ = nullptr;
1153         RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
1154     }
1155 
1156 /**
1157  * @tc.name: CloudSyncTest001
1158  * @tc.desc: Cloud data is older than local data.
1159  * @tc.type: FUNC
1160  * @tc.require:
1161  * @tc.author: bty
1162  */
1163 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level1)
1164 {
1165     int64_t paddingSize = 10;
1166     int64_t cloudCount = 20;
1167     int64_t localCount = cloudCount / g_arrayHalfSub;
1168     ChangedData changedDataForTable1;
1169     ChangedData changedDataForTable2;
1170     changedDataForTable1.tableName = g_tableName1;
1171     changedDataForTable2.tableName = g_tableName2;
1172     changedDataForTable1.field.push_back(std::string("name"));
1173     changedDataForTable2.field.push_back(std::string("id"));
1174     for (int i = 0; i < cloudCount; i++) {
1175         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1176         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1177     }
1178     g_observer->SetExpectedResult(changedDataForTable1);
1179     g_observer->SetExpectedResult(changedDataForTable2);
1180     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1181     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1182     Query query = Query::Select().FromTable(g_tables);
1183     std::vector<SyncProcess> expectProcess;
1184     InitProcessForTest1(cloudCount, localCount, expectProcess);
1185     CloudSyncStatusCallback callback;
1186     GetCallback(g_syncProcess, callback, expectProcess);
1187     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1188     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1189     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1190     g_observer->ClearChangedData();
1191     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1192     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1193     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1194     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1195     CloseDb();
1196 }
1197 
1198 /**
1199  * @tc.name: CloudSyncTest002
1200  * @tc.desc: Local data is older than cloud data.
1201  * @tc.type: FUNC
1202  * @tc.require:
1203  * @tc.author: bty
1204  */
1205 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level1)
1206 {
1207     int64_t localCount = 20;
1208     int64_t cloudCount = 10;
1209     int64_t paddingSize = 100;
1210     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1211     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1212     Query query = Query::Select().FromTable(g_tables);
1213     std::vector<SyncProcess> expectProcess;
1214     InitProcessForTest2(cloudCount, localCount, expectProcess);
1215     CloudSyncStatusCallback callback;
1216     GetCallback(g_syncProcess, callback, expectProcess);
1217     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1218     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1219     LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1220     CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1221     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1222     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1223     CloseDb();
1224 }
1225 
1226 /**
1227  * @tc.name: CloudSyncTest003
1228  * @tc.desc: test with update and delete operator
1229  * @tc.type: FUNC
1230  * @tc.require:
1231  * @tc.author: bty
1232  */
1233 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level1)
1234 {
1235     int64_t paddingSize = 10;
1236     int cloudCount = 20;
1237     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1238     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1239     Query query = Query::Select().FromTable(g_tables);
1240     std::vector<SyncProcess> expectProcess;
1241     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1242     CloudSyncStatusCallback callback;
1243     GetCallback(g_syncProcess, callback, expectProcess);
1244     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1245     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1246     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1247     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1248 
1249     int updateCount = 10;
1250     UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1251     g_syncProcess = {};
1252     InitProcessForTest1(cloudCount, updateCount, expectProcess);
1253     GetCallback(g_syncProcess, callback, expectProcess);
1254     LOGD("-------------------sync after update--------------");
1255     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1256     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1257 
1258     VBucket extend;
1259     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1260     std::vector<VBucket> data1;
1261     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1262     for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1263         EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1264     }
1265 
1266     std::vector<VBucket> data2;
1267     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1268     for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1269         EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1270     }
1271 
1272     int deleteCount = 3;
1273     DeleteUserTableRecord(db, 0, deleteCount);
1274     g_syncProcess = {};
1275     InitProcessForTest1(updateCount, deleteCount, expectProcess);
1276     GetCallback(g_syncProcess, callback, expectProcess);
1277     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1278     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1279 
1280     CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1281     CloseDb();
1282 }
1283 
1284 /**
1285  * @tc.name: CloudSyncTest004
1286  * @tc.desc: Random write of local and cloud data
1287  * @tc.type: FUNC
1288  * @tc.require:
1289  * @tc.author: bty
1290  */
1291 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level1)
1292 {
1293     int64_t paddingSize = 1024 * 8;
1294     vector<thread> threads;
1295     int cloudCount = 1024;
1296     threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1297     threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1298     for (auto &thread: threads) {
1299         thread.join();
1300     }
1301     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1302     CloseDb();
1303 }
1304 
1305 /**
1306  * @tc.name: CloudSyncTest005
1307  * @tc.desc: sync with device sync query
1308  * @tc.type: FUNC
1309  * @tc.require:
1310  * @tc.author: bty
1311  */
1312 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level1)
1313 {
1314     Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1315     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1316         DBStatus::NOT_SUPPORT);
1317 
1318     query = Query::Select();
1319     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1320         DBStatus::INVALID_ARGS);
1321     CloseDb();
1322 }
1323 
1324 /**
1325  * @tc.name: CloudSyncTest006
1326  * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1327  * @tc.type: FUNC
1328  * @tc.require:
1329  * @tc.author: wanyi
1330  */
1331 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level1)
1332 {
1333     int64_t paddingSize = 10;
1334     int cloudCount = 20;
1335     ChangedData changedDataForTable1;
1336     ChangedData changedDataForTable2;
1337     changedDataForTable1.tableName = g_tableName1;
1338     changedDataForTable2.tableName = g_tableName2;
1339     changedDataForTable1.field.push_back(std::string("name"));
1340     changedDataForTable2.field.push_back(std::string("id"));
1341     for (int i = 0; i < cloudCount; i++) {
1342         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1343         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1344     }
1345     g_observer->SetExpectedResult(changedDataForTable1);
1346     g_observer->SetExpectedResult(changedDataForTable2);
1347     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1348     InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1349     // Set correct cloudDbSchema (correct version)
1350     DataBaseSchema correctSchema;
1351     GetCloudDbSchema(correctSchema);
1352     ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1353     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1354     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1355     g_observer->ClearChangedData();
1356     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1357     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1358     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1359     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1360 
1361     // Reset cloudDbSchema (invalid version - null)
1362     DataBaseSchema nullSchema;
1363     ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1364     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1365 
1366     // Reset cloudDbSchema (invalid version - field mismatch)
1367     DataBaseSchema invalidSchema;
1368     GetInvalidCloudDbSchema(invalidSchema);
1369     ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1370     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1371     CloseDb();
1372 }
1373 
1374 /**
1375  * @tc.name: CloudSyncTest007
1376  * @tc.desc: Check the asset types after sync
1377  * @tc.type: FUNC
1378  * @tc.require:
1379  * @tc.author: bty
1380  */
1381 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1382 {
1383     int64_t paddingSize = 100;
1384     int localCount = 20;
1385     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1386     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1387     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1388 
1389     CheckAssetAfterDownload(db, localCount);
1390     CheckAllAssetAfterUpload(localCount);
1391     CheckAssetsAfterDownload(db, localCount);
1392     CloseDb();
1393 }
1394 
1395 /*
1396  * @tc.name: CloudSyncTest008
1397  * @tc.desc: Test sync with invalid param
1398  * @tc.type: FUNC
1399  * @tc.require:
1400  * @tc.author: zhangqiquan
1401  */
1402 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level1)
1403 {
1404     ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK);   // it will not happen because cloudb has been set in SetUp()
1405     Query query = Query::Select().FromTable({g_tableName3});
1406     // clouddb has been set in SetUp() and it's not null
1407     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1408     CloseDb();
1409 }
1410 
1411 /**
1412  * @tc.name: CloudSyncTest009
1413  * @tc.desc: The second time there was no data change and sync was called.
1414  * @tc.type: FUNC
1415  * @tc.require:
1416  * @tc.author: bty
1417  */
1418 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level1)
1419 {
1420     int64_t paddingSize = 10;
1421     int cloudCount = 20;
1422     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1423     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1424     Query query = Query::Select().FromTable(g_tables);
1425     std::vector<SyncProcess> expectProcess;
1426     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1427     CloudSyncStatusCallback callback;
1428     GetCallback(g_syncProcess, callback, expectProcess);
1429     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1430     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1431     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1432     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1433     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1434     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1435 
1436     g_syncProcess = {};
1437     InitProcessForTest9(cloudCount, 0, expectProcess);
1438     GetCallback(g_syncProcess, callback, expectProcess);
1439     LOGD("--------------the second sync-------------");
1440     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1441     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1442     CloseDb();
1443 }
1444 
1445 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level1)
1446 {
1447     int64_t paddingSize = 10;
1448     int cloudCount = 20;
1449     int localCount = 10;
1450     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1451     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1452     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1453 
1454     int rowid = 27;
1455     UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1456     UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1457     UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1458     UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1459 
1460     int id = 0;
1461     UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1462     UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1463     UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1464     UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1465 
1466     LOGD("--------------the second sync-------------");
1467     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1468 
1469     CheckFillAssetForTest10(db);
1470     CheckFillAssetsForTest10(db);
1471     CloseDb();
1472 }
1473 
1474 /**
1475  * @tc.name: CloudSyncTest011
1476  * @tc.desc: Test sync with same table name.
1477  * @tc.type: FUNC
1478  * @tc.require:
1479  * @tc.author: zhangqiquan
1480  */
1481 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level1)
1482 {
1483     Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1484     bool syncFinish = false;
1485     std::mutex syncMutex;
1486     std::condition_variable cv;
1487     std::atomic<int> callCount = 0;
1488     CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anona0f839580602( const std::map<std::string, SyncProcess> &onProcess) 1489         const std::map<std::string, SyncProcess> &onProcess) {
1490         ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1491         SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1492         callCount++;
1493         if (syncProcess.process == FINISHED) {
1494             std::lock_guard<std::mutex> autoLock(syncMutex);
1495             syncFinish = true;
1496         }
1497         cv.notify_all();
1498     };
1499     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1500     std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anona0f839580702() 1501     cv.wait(uniqueLock, [&syncFinish]() {
1502         return syncFinish;
1503     });
1504     EXPECT_EQ(callCount, 2); // 2 is onProcess count
1505     CloseDb();
1506 }
1507 
1508 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level1)
1509 {
1510     int64_t localCount = 20;
1511     int64_t cloudCount = 10;
1512     int64_t paddingSize = 10;
1513     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1514     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1515     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1516 
1517     InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1518     InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1519     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1520 
1521     InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1522     InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1523     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1524 
1525 
1526     InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1527     InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1528     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1529     CloseDb();
1530 }
1531 
1532 /*
1533  * @tc.name: CloudSyncTest013
1534  * @tc.desc: test increment watermark when cloud db query data size is 0
1535  * @tc.type: FUNC
1536  * @tc.require:
1537  * @tc.author: zhuwentao
1538  */
1539 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level1)
1540 {
1541     /**
1542      * @tc.steps: insert some data into cloud db
1543      * @tc.expected: return ok.
1544      */
1545     int64_t paddingSize = 10;
1546     int64_t cloudCount = 10;
1547     SyncProcess syncProcess;
1548     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1549     /**
1550      * @tc.steps: try to cloud sync
1551      * @tc.expected: return ok.
1552      */
1553     Query query = Query::Select().FromTable(g_tables);
__anona0f839580802(const std::map<std::string, SyncProcess> &process) 1554     CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1555         LOGI("devices size = %d", process.size());
1556         ASSERT_EQ(process.size(), 1u);
1557         syncProcess = std::move(process.begin()->second);
1558         if (syncProcess.process == FINISHED) {
1559             g_processCondition.notify_one();
1560         }
1561     };
1562     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1563     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1564     uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1565     /**
1566      * @tc.steps: insert some increment data into cloud db
1567      * @tc.expected: return ok.
1568      */
1569     VBucket data;
1570     Timestamp now = TimeHelper::GetSysCurrentTime();
1571     data.insert_or_assign("name", "Cloud" + std::to_string(0));
1572     data.insert_or_assign("height", 166.0); // 166.0 is random double value
1573     data.insert_or_assign("married", false);
1574     data.insert_or_assign("age", 13L);
1575     VBucket log;
1576     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1577     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1578     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1579     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1580     log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1581     g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1582     syncProcess.process = PREPARED;
1583     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1584     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1585     uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1586     ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1587     CloseDb();
1588 }
1589 
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1590 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1591 {
1592     std::mutex dataMutex;
1593     std::condition_variable cv;
1594     bool finish = false;
1595     DBStatus res = OK;
1596     CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1597         const std::map<std::string, SyncProcess> &process) {
1598         std::map<std::string, SyncProcess> syncProcess;
1599         {
1600             std::lock_guard<std::mutex> autoLock(dataMutex);
1601             syncProcess = process;
1602             if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1603                 finish = true;
1604             }
1605             res = syncProcess[DEVICE_CLOUD].errCode;
1606         }
1607         cv.notify_one();
1608     };
1609     Query query = Query::Select().FromTable({g_tableName3});
1610     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1611     {
1612         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1613         cv.wait(uniqueLock, [&finish] {
1614             return finish;
1615         });
1616     }
1617     EXPECT_EQ(res, expectStatus);
1618 }
1619 
1620 /*
1621  * @tc.name: CloudSyncTest015
1622  * @tc.desc: Test sync with cloud error
1623  * @tc.type: FUNC
1624  * @tc.require:
1625  * @tc.author: zhangqiquan
1626  */
1627 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level1)
1628 {
1629     g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1630     TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1631 
1632     g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1633     TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1634 
1635     g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1636     TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1637 
1638     g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1639     TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1640 
1641     g_virtualCloudDb->SetActionStatus(DB_ERROR);
1642     TestSyncForStatus(g_delegate, CLOUD_ERROR);
1643 
1644     g_virtualCloudDb->SetActionStatus(OK);
1645     CloseDb();
1646 }
1647 
1648 /*
1649  * @tc.name: CloudSyncTest014
1650  * @tc.desc: Test sync with s4
1651  * @tc.type: FUNC
1652  * @tc.require:
1653  * @tc.author: zhangqiquan
1654  */
1655 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level1)
1656 {
1657     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1658     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1659 
1660     // sync failed because get security option failed
__anona0f839580b02(const std::string&, SecurityOption &option) 1661     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1662         option.securityLabel = S0;
1663         return DB_ERROR;
1664     });
1665     Query query = Query::Select().FromTable({g_tableName3});
1666     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1667         SECURITY_OPTION_CHECK_ERROR);
1668 
1669     // sync failed because get S4
__anona0f839580c02(const std::string&, SecurityOption &option) 1670     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1671         option.securityLabel = S4;
1672         return NOT_SUPPORT;
1673     });
1674     Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1675     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1676         NOT_SUPPORT);
1677 
1678     // sync failed because get S4
__anona0f839580d02(const std::string&, SecurityOption &option) 1679     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1680         option.securityLabel = S4;
1681         return OK;
1682     });
1683     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1684         SECURITY_OPTION_CHECK_ERROR);
1685 
1686     // sync failed because S4 has been cached
__anona0f839580e02(const std::string&, SecurityOption &option) 1687     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1688         option.securityLabel = S0;
1689         return OK;
1690     });
1691     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1692         SECURITY_OPTION_CHECK_ERROR);
1693     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1694     CloseDb();
1695 }
1696 
1697 /*
1698  * @tc.name: CloudSyncTest016
1699  * @tc.desc: Test sync when push before merge
1700  * @tc.type: FUNC
1701  * @tc.require:
1702  * @tc.author: chenchaohao
1703  */
1704 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level1)
1705 {
1706     int64_t localCount = 10;
1707     int64_t paddingSize = 10;
1708     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1709     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1710     CheckCloudTotalCount({10L, 10L});
1711     UpdateUserTableRecord(db, 0, localCount);
1712     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1713 
1714     VBucket extend;
1715     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1716     std::vector<VBucket> data1;
1717     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1718     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1719         EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1720     }
1721 
1722     std::vector<VBucket> data2;
1723     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1724     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1725         EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1726     }
1727 
1728     CloseDb();
1729 }
1730 
1731 /*
1732  * @tc.name: CloudSyncTest017
1733  * @tc.desc: Test sync to push when local data deleted and not upload to cloud
1734  * @tc.type: FUNC
1735  * @tc.require:
1736  * @tc.author: wangxiangdong
1737  */
1738 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest017, TestSize.Level1)
1739 {
1740     /**
1741      * @tc.steps: step1. make data: 20 records on local, 20 records on cloud
1742      */
1743     int64_t localCount = 20;
1744     int64_t paddingSize = 20;
1745     InsertCloudTableRecord(0, localCount, paddingSize, true);
1746     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1747     localCount = 10;
1748     /**
1749      * @tc.steps: step2. delete 10 local record before sync
1750      */
1751     DeleteUserTableRecord(db, 0, localCount);
1752     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1753     /**
1754      * @tc.steps: step3. check local and cloud num
1755      */
1756     CheckCloudTotalCount({30L, 20L});
1757     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(g_tables[0]) +
1758         " where data_key=-1 and cloud_gid='';";
1759     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback,
1760         reinterpret_cast<void *>(10), nullptr), SQLITE_OK);
1761     CloseDb();
1762 }
1763 
1764 /*
1765  * @tc.name: DataNotifier001
1766  * @tc.desc: Notify data without primary key
1767  * @tc.type: FUNC
1768  * @tc.require:
1769  * @tc.author: wanyi
1770  */
1771 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level1)
1772 {
1773     int64_t paddingSize = 10;
1774     int localCount = 20;
1775     InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1776     callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1777     CloseDb();
1778 }
1779 
1780 /**
1781  * @tc.name: CloudSyncAssetTest001
1782  * @tc.desc:
1783  * @tc.type: FUNC
1784  * @tc.require:
1785  * @tc.author: wanyi
1786  */
1787 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1788 {
1789     int64_t paddingSize = 100;
1790     int localCount = 20;
1791     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1792     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1793     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1794 
1795     CheckAssetAfterDownload(db, localCount);
1796     CheckAllAssetAfterUpload(localCount);
1797     CloseDb();
1798 }
1799 
1800 /*
1801  * @tc.name: MannualNotify001
1802  * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1803  * @tc.type: FUNC
1804  * @tc.require:
1805  * @tc.author: huangboxin
1806  */
1807 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level1)
1808 {
1809     int64_t paddingSize = 10;
1810     int localCount = 10;
1811     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1812     Query query = Query::Select().FromTable(g_tables);
1813     std::vector<SyncProcess> expectProcess;
1814     InitProcessForMannualSync1(expectProcess);
1815     CloudSyncStatusCallback callback;
1816     GetCallback(g_syncProcess, callback, expectProcess);
1817     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1818         DBStatus::OK);
1819     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1820     CloseDb();
1821 }
1822 
1823 /**
1824  * @tc.name: CloudProcessNotify001
1825  * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1826  * @tc.type: FUNC
1827  * @tc.require:
1828  * @tc.author: liufuchenxing
1829  */
1830 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1831 {
1832     /**
1833      * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1834      * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1835      */
1836     int64_t paddingSize = 10;
1837     int64_t localCount = 1;
1838     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1839     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1840     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1841     g_observer->ClearChangedData();
1842     LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1843     CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1844     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1845     CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1846 
1847     /**
1848      * @tc.steps: step2. reset data
1849      * @tc.expected: step2. return ok.
1850      */
1851     std::this_thread::sleep_for(std::chrono::milliseconds(100));
1852     g_syncProcess = {};
1853     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1854 
1855     /**
1856      * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1857      * @tc.expected: step3. return ok.
1858      */
1859     VBucket idMap;
1860     idMap.insert_or_assign("#_gid", std::to_string(0));
1861     ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1862 
1863     /**
1864      * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1865      * @tc.expected: step4. return ok.
1866      */
1867     std::vector<VBucket> record1;
1868     std::vector<VBucket> extend1;
1869     InsertCloudForCloudProcessNotify001(record1, extend1);
1870     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1871 
1872     /**
1873      * @tc.steps: step5. sync() and check local data.
1874      * @tc.expected: step5. return ok.
1875      */
1876     ChangedData changedDataForTable1;
1877     changedDataForTable1.tableName = g_tableName1;
1878     changedDataForTable1.field.push_back(std::string("name"));
1879     changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1880     g_observer->SetExpectedResult(changedDataForTable1);
1881 
1882     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1883     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1884     g_observer->ClearChangedData();
1885     LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1886     // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1887     CheckDownloadResult(db, {1L, 1L}, "Local");
1888     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1889     CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1890 
1891     /**
1892      * @tc.steps: step6. CloseDb().
1893      * @tc.expected: step6. return ok.
1894      */
1895     CloseDb();
1896 }
1897 
1898 /*
1899  * @tc.name: CloudSyncAssetTest002
1900  * @tc.desc:
1901  * @tc.type: FUNC
1902  * @tc.require:
1903  * @tc.author: huangboxin
1904  */
1905 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level1)
1906 {
1907     int64_t paddingSize = 10;
1908     int localCount = 3;
1909     int cloudCount = 3;
1910     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1911     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1912     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1913     CloseDb();
1914 }
1915 
1916 /*
1917  * @tc.name: CloudSyncAssetTest003
1918  * @tc.desc:
1919  * @tc.type: FUNC
1920  * @tc.require:
1921  * @tc.author: bty
1922  */
1923 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level1)
1924 {
1925     int64_t paddingSize = 10;
1926     int localCount = 3;
1927     int cloudCount = 3;
1928     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1929     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1930     Assets assets;
1931     assets.push_back(g_localAsset);
1932     assets.push_back(g_localAsset);
1933     UpdateLocalAssets(db, assets, 1);
1934     Query query = Query::Select().FromTable(g_tables);
1935     std::vector<SyncProcess> expectProcess;
__anona0f839580f02(const std::map<std::string, SyncProcess> &process) 1936     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1937         ASSERT_EQ(process.size(), 1u);
1938         g_syncProcess = std::move(process.begin()->second);
1939 
1940         if (g_syncProcess.process == FINISHED) {
1941             g_processCondition.notify_one();
1942         }
1943     };
1944     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1945         DBStatus::OK);
1946     {
1947         std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581002() 1948         g_processCondition.wait(lock, []() {
1949             return g_syncProcess.process == FINISHED;
1950         });
1951         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1952     }
1953     CloseDb();
1954 }
1955 
1956 /*
1957  * @tc.name: CloudSyncAssetTest004
1958  * @tc.desc:
1959  * @tc.type: FUNC
1960  * @tc.require:
1961  * @tc.author: bty
1962  */
1963 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level1)
1964 {
1965     int64_t paddingSize = 10;
1966     int localCount = 3;
1967     int cloudCount = 3;
1968     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1969     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1970     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1971 
1972     UpdateDiffType(localCount);
1973     g_syncProcess = {};
__anona0f839581102(const std::map<std::string, SyncProcess> &process) 1974     CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1975         ASSERT_EQ(process.size(), 1u);
1976         g_syncProcess = std::move(process.begin()->second);
1977         if (g_syncProcess.process == FINISHED) {
1978             g_processCondition.notify_one();
1979         }
1980     };
1981     Query query = Query::Select().FromTable(g_tables);
1982     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1983         DBStatus::OK);
1984     {
1985         std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581202() 1986         g_processCondition.wait(lock, []() {
1987             return g_syncProcess.process == FINISHED;
1988         });
1989         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1990     }
1991     CheckDiffTypeAsset(db);
1992     CloseDb();
1993 }
1994 
1995 /*
1996  * @tc.name: CloudSyncAssetTest005
1997  * @tc.desc: Test erase all no change Asset
1998  * @tc.type: FUNC
1999  * @tc.require:
2000  * @tc.author: bty
2001  */
2002 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level1)
2003 {
2004     /**
2005      * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
2006      * @tc.expected: step1. return ok.
2007      */
2008     int64_t paddingSize = 10;
2009     int localCount = 3;
2010     int cloudCount = 3;
2011     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2012     Assets assets;
2013     for (int64_t j = 0; j < cloudCount; j++) {
2014         Asset asset = g_cloudAsset;
2015         asset.name = g_cloudAsset.name + std::to_string(j);
2016         assets.push_back(asset);
2017     }
2018     UpdateLocalAssets(db, assets, 0);
2019     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2020 
2021     /**
2022      * @tc.steps:step2. Construct cloud data
2023      * @tc.expected: step2. return ok.
2024      */
2025     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2026 
2027     /**
2028      * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
2029      * @tc.expected: step3. return ok.
2030      */
2031     Query query = Query::Select().FromTable(g_tables);
2032     std::vector<SyncProcess> expectProcess;
__anona0f839581302(const std::map<std::string, SyncProcess> &process) 2033     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2034         ASSERT_EQ(process.size(), 1u);
2035         g_syncProcess = std::move(process.begin()->second);
2036 
2037         if (g_syncProcess.process == FINISHED) {
2038             g_processCondition.notify_one();
2039         }
2040     };
2041     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2042         DBStatus::OK);
2043     {
2044         std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581402() 2045         g_processCondition.wait(lock, []() {
2046             return g_syncProcess.process == FINISHED;
2047         });
2048         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2049     }
2050     CloseDb();
2051 }
2052 
2053 /*
2054  * @tc.name: CloudSyncAssetTest006
2055  * @tc.desc: Test upload new data without assets
2056  * @tc.type: FUNC
2057  * @tc.require:
2058  * @tc.author: bty
2059  */
2060 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level1)
2061 {
2062     /**
2063      * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2064      * @tc.expected: step1. return ok.
2065      */
2066     int64_t paddingSize = 10;
2067     int localCount = 6;
2068     int cloudCount = 3;
2069     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2070     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2071     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2072 
2073     /**
2074      * @tc.steps:step2. sync, upload new data without assets,
2075      * @tc.expected: step2. return ok.
2076      */
2077     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2078     CloseDb();
2079 }
2080 
2081 /*
2082  * @tc.name: CloudSyncAssetTest007
2083  * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2084  * regarded as NO-CHANGE, rather than delete
2085  * @tc.type: FUNC
2086  * @tc.require:
2087  * @tc.author: wanyi
2088  */
2089 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level1)
2090 {
2091     /**
2092      * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2093      * @tc.expected: step1. return ok.
2094      */
2095     int64_t paddingSize = 10;
2096     int localCount = 1;
2097     int cloudCount = 1;
2098     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2099     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2100     /**
2101      * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2102      * @tc.expected: step2. return ok.
2103      */
2104     Assets assets;
2105     for (int64_t j = 0; j < cloudCount; j++) {
2106         Asset asset = g_cloudAsset;
2107         asset.name = g_cloudAsset.name + std::to_string(j);
2108         asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2109         assets.push_back(asset);
2110     }
2111     UpdateLocalAssets(db, assets, 0);
2112     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2113     /**
2114      * @tc.steps:step3. Do sync
2115      * @tc.expected: step3. return ok.
2116      */
2117     Query query = Query::Select().FromTable(g_tables);
2118     std::vector<SyncProcess> expectProcess;
__anona0f839581502(const std::map<std::string, SyncProcess> &process) 2119     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2120         ASSERT_EQ(process.size(), 1u);
2121         g_syncProcess = std::move(process.begin()->second);
2122 
2123         if (g_syncProcess.process == FINISHED) {
2124             g_processCondition.notify_one();
2125         }
2126     };
2127     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2128         DBStatus::OK);
2129     {
2130         std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581602() 2131         g_processCondition.wait(lock, []() {
2132             return g_syncProcess.process == FINISHED;
2133         });
2134         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2135     }
2136     /**
2137      * @tc.steps:step4. Check result. Cloud db should not contain asset.
2138      * @tc.expected: step4. return ok.
2139      */
2140     CheckAssetForAssetTest006();
2141     CloseDb();
2142 }
2143 
2144 /**
2145  * @tc.name: DownloadAssetTest001
2146  * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2147  * @tc.type: FUNC
2148  * @tc.require:
2149  * @tc.author: bty
2150  */
2151 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level1)
2152 {
2153     /**
2154      * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2155      * @tc.expected: step1. return ok.
2156      */
2157     DBStatus expectStatus = DBStatus::OK;
2158     int index = 0;
2159     InitMockAssetLoader(expectStatus, index);
2160 
2161     /**
2162      * @tc.steps:step2. init download data
2163      * @tc.expected: step2. return ok.
2164      */
2165     int64_t paddingSize = 1;
2166     int localCount = 120;
2167     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2168     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2169 
2170     /**
2171      * @tc.steps:step3. sync
2172      * @tc.expected: step3. return ok.
2173      */
2174     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2175 
2176     /**
2177      * @tc.steps:step4. Expect all states to be normal
2178      * @tc.expected: step4. return ok.
2179      */
2180     CheckAssetAfterDownload(db, localCount);
2181     CloseDb();
2182 }
2183 
2184 /*
2185  * @tc.name: CloudSyncAssetTest008
2186  * @tc.desc: sync failed with download asset
2187  * @tc.type: FUNC
2188  * @tc.require:
2189  * @tc.author: zhangqiquan
2190  */
2191 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level1)
2192 {
2193     /**
2194      * @tc.steps:step1. prepare asset data
2195      */
2196     int64_t paddingSize = 10;
2197     int localCount = 1;
2198     int cloudCount = 1;
2199     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2200     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2201     /**
2202      * @tc.steps:step2. set download asset status failed
2203      */
2204     g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2205     Query query = Query::Select().FromTable(g_tables);
2206     std::vector<SyncProcess> expectProcess;
__anona0f839581702(const std::map<std::string, SyncProcess> &process) 2207     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2208         for (const auto &item: process) {
2209             g_syncProcess = item.second;
2210         }
2211         if (g_syncProcess.process == FINISHED) {
2212             g_processCondition.notify_one();
2213         }
2214     };
2215     /**
2216      * @tc.steps:step3. sync and wait sync finished.
2217      * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2218      */
2219     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2220         DBStatus::OK);
2221     {
2222         std::unique_lock<std::mutex> lock(g_processMutex);
__anona0f839581802() 2223         g_processCondition.wait(lock, []() {
2224             return g_syncProcess.process == FINISHED;
2225         });
2226         ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2227     }
2228     /**
2229      * @tc.steps:step4. clear data.
2230      */
2231     g_virtualAssetLoader->SetDownloadStatus(OK);
2232     CloseDb();
2233 }
2234 
2235 /*
2236  * @tc.name: CloudSyncAssetTest009
2237  * @tc.desc:
2238  * @tc.type: FUNC
2239  * @tc.require:
2240  * @tc.author: zhangqiquan
2241  */
2242 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level1)
2243 {
2244     // insert 3 data with asset 3 data without asset into local
2245     // sync them to cloud
2246     int64_t paddingSize = 10;
2247     int localCount = 3;
2248     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2249     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2250     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2251     // update these data and sync again
2252     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2253     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2254     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2255     EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2256     CloseDb();
2257 }
2258 
2259 /**
2260  * @tc.name: DownloadAssetTest002
2261  * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2262  * @tc.type: FUNC
2263  * @tc.require:
2264  * @tc.author: bty
2265  */
2266 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level1)
2267 {
2268     /**
2269      * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2270      * @tc.expected: step1. return ok.
2271      */
2272     DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2273     int index = 0;
2274     InitMockAssetLoader(expectStatus, index);
2275     int64_t paddingSize = 1;
2276     int localCount = 100;
2277 
2278     /**
2279      * @tc.steps:step2. init download data
2280      * @tc.expected: step2. return ok.
2281      */
2282     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2283     InsertCloudTableRecord(0, localCount, paddingSize, false);
2284 
2285     /**
2286      * @tc.steps:step3. sync
2287      * @tc.expected: step3. return ok.
2288      */
2289     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2290 
2291     /**
2292      * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2293      * @tc.expected: step4. return ok.
2294      */
2295     CheckAssetAfterDownload2(db, localCount);
2296     CloseDb();
2297 }
2298 
2299 /**
2300  * @tc.name: DownloadAssetTest003
2301  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2302  * @tc.type: FUNC
2303  * @tc.require:
2304  * @tc.author: bty
2305  */
2306 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level1)
2307 {
2308     /**
2309      * @tc.steps:step1. Init data and sync
2310      * @tc.expected: step1. return ok.
2311      */
2312     int64_t paddingSize = 1;
2313     int localCount = 10;
2314     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2315     InsertCloudTableRecord(0, localCount, paddingSize, false);
2316     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2317 
2318     /**
2319      * @tc.steps:step2. update cloud Asset where gid = 0
2320      * @tc.expected: step2. return ok.
2321      */
2322     UpdateCloudAssetForDownloadAssetTest003();
2323 
2324     /**
2325      * @tc.steps:step3. sync again
2326      * @tc.expected: step3. return ok.
2327      */
2328     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2329 
2330     /**
2331      * @tc.steps:step4. check asset after download where gid = 0
2332      * @tc.expected: step4. return ok.
2333      */
2334     CheckAssetForDownloadAssetTest003(db);
2335     CloseDb();
2336 }
2337 
2338 /**
2339  * @tc.name: DownloadAssetTest004
2340  * @tc.desc: Test total count, fail count and success count when drop table
2341  * @tc.type: FUNC
2342  * @tc.require:
2343  * @tc.author: liufuchenxing
2344  */
2345 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level1)
2346 {
2347     /**
2348      * @tc.steps:step1. Init data and sync
2349      * @tc.expected: step1. return ok.
2350      */
2351     int64_t paddingSize = 1;
2352     int count = 10;
2353     InsertUserTableRecord(db, 0, count, paddingSize, false);
2354     g_syncProcess = {};
__anona0f839581902(const std::map<std::string, SyncProcess> &process) 2355     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2356         for (const auto &item : process) {
2357             g_syncProcess = item.second;
2358         }
2359         if (g_syncProcess.process == FINISHED) {
2360             g_processCondition.notify_one();
2361         }
2362     };
2363     Query query = Query::Select().FromTable(g_tables);
2364     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2365     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2366 
2367     /**
2368      * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2369      * @tc.expected: step2. total = 20, success=0, fail=20
2370      */
2371     g_syncProcess = {};
2372     InsertCloudTableRecord(0, count, paddingSize, false);
2373     EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2374     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2375         DBStatus::NOT_FOUND);
2376 
2377     /**
2378      * @tc.steps:step3. close db.
2379      * @tc.expected: step3. close success.
2380      */
2381     CloseDb();
2382 }
2383 
2384 /**
2385  * @tc.name: SchemaTest001
2386  * @tc.desc: Create table with Cloud cooperation mode and do sync
2387  * @tc.type: FUNC
2388  * @tc.require:
2389  * @tc.author: wanyi
2390  */
2391 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level1)
2392 {
2393     /**
2394      * @tc.steps:step1. Create table with Cloud cooperation mode
2395      * @tc.expected: step1. return ok.
2396      */
2397     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2398     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2399     /**
2400      * @tc.steps:step1. do sync
2401      * @tc.expected: step1. return ok.
2402      */
2403     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2404     CloseDb();
2405 }
2406 
2407 /**
2408  * @tc.name: SchemaTest002
2409  * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2410  * @tc.type: FUNC
2411  * @tc.require:
2412  * @tc.author: wanyi
2413  */
2414 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level1)
2415 {
2416     /**
2417      * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2418      * @tc.expected: step1. return ok.
2419      */
2420     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2421     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2422     /**
2423      * @tc.steps:step1. do sync
2424      * @tc.expected: step1. return NOT_SUPPORT.
2425      */
2426     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2427     CloseDb();
2428 }
2429 
2430 /**
2431  * @tc.name: CloudCursorTest001
2432  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2433  * @tc.type: FUNC
2434  * @tc.require:
2435  * @tc.author: bty
2436  */
2437 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2438 {
2439     /**
2440      * @tc.steps:step1. Init data and sync
2441      * @tc.expected: step1. return ok.
2442      */
2443     int64_t paddingSize = 1;
2444     int localCount = 10;
2445     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2446     InsertCloudTableRecord(0, localCount, paddingSize, true);
2447     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2448 
2449     /**
2450      * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2451      * although it is unTrackerTable
2452      * @tc.expected: step2. return ok.
2453      */
2454     string sql = "select cursor from " + std::string(DBConstant::RELATIONAL_PREFIX) + g_tableName1 + "_log";
2455     sqlite3_stmt *stmt = nullptr;
2456     EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2457     int64_t index = 0;
2458     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2459         EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2460     }
2461     int errCode;
2462     SQLiteUtils::ResetStatement(stmt, true, errCode);
2463     CloseDb();
2464 }
2465 
2466 /*
2467  * @tc.name: RDBSupportEncryptTest001
2468  * @tc.desc: Test sync when security label is not set and different encryption para is set
2469  * @tc.type: FUNC
2470  * @tc.require:
2471  * @tc.author: suyue
2472  */
2473 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest001, TestSize.Level1)
2474 {
2475     /**
2476      * @tc.steps: step1. sync when security label is not set and encryption is not supported
2477      * @tc.expected: step1. return ok.
2478      */
2479     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2480     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2481     Query query = Query::Select().FromTable({g_tableName3});
2482     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2483 
2484     /**
2485      * @tc.steps: step2. sync when security label is not set and encryption is supported
2486      * @tc.expected: step2. return ok.
2487      */
2488     CloudSyncConfig config;
2489     config.isSupportEncrypt = true;
2490     g_delegate->SetCloudSyncConfig(config);
2491     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2492     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2493     CloseDb();
2494 }
2495 
2496 /*
2497  * @tc.name: RDBSupportEncryptTest002
2498  * @tc.desc: Test sync when security label is S4 and different encryption para is set
2499  * @tc.type: FUNC
2500  * @tc.require:
2501  * @tc.author: suyue
2502  */
2503 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest002, TestSize.Level0)
2504 {
2505     /**
2506      * @tc.steps: step1. sync when security label is S4 and encryption is not supported
2507      * @tc.expected: step1. return SECURITY_OPTION_CHECK_ERROR.
2508      */
2509     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2510     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2511     Query query = Query::Select().FromTable({g_tableName3});
__anona0f839581a02(const std::string&, SecurityOption &option) 2512     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
2513         option.securityLabel = S4;
2514         return OK;
2515     });
2516     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
2517         SECURITY_OPTION_CHECK_ERROR);
2518 
2519     /**
2520      * @tc.steps: step2. sync when security label is S4 and encryption is supported
2521      * @tc.expected: step2. return OK.
2522      */
2523     CloudSyncConfig config;
2524     config.isSupportEncrypt = true;
2525     g_delegate->SetCloudSyncConfig(config);
2526     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2527 
2528     /**
2529      * @tc.steps: step3. sync when isSupportEncrypt is set to false for the second time
2530      * @tc.expected: step3. return SECURITY_OPTION_CHECK_ERROR.
2531      */
2532     config.isSupportEncrypt = false;
2533     g_delegate->SetCloudSyncConfig(config);
2534     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
2535         SECURITY_OPTION_CHECK_ERROR);
2536     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2537     CloseDb();
2538 }
2539 
2540 /*
2541  * @tc.name: RDBSupportEncryptTest003
2542  * @tc.desc: Test sync when SecurityOption is not supported and different encryption para is set
2543  * @tc.type: FUNC
2544  * @tc.require:
2545  * @tc.author: suyue
2546  */
2547 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, RDBSupportEncryptTest003, TestSize.Level1)
2548 {
2549     /**
2550      * @tc.steps: step1. sync when unable to get security options
2551      * @tc.expected: step1. return OK whether or not encryption is supported.
2552      */
2553     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
2554     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
2555     Query query = Query::Select().FromTable({g_tableName3});
__anona0f839581b02(const std::string&, SecurityOption &option) 2556     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
2557         return NOT_SUPPORT;
2558     });
2559     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2560     CloudSyncConfig config;
2561     config.isSupportEncrypt = true;
2562     g_delegate->SetCloudSyncConfig(config);
2563     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
2564     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
2565     CloseDb();
2566 }
2567 }
2568 #endif // RELATIONAL_STORE
2569