• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42     "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43     "id TEXT PRIMARY KEY," \
44     "name TEXT," \
45     "height REAL ," \
46     "photo BLOB," \
47     "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49     "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50     "id TEXT," \
51     "name TEXT," \
52     "height REAL ," \
53     "photo BLOB," \
54     "age INT);";
55 const int64_t g_syncWaitTime = 60;
56 
57 const Asset g_cloudAsset = {
58     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61 
62 std::vector<DBStatus> g_actualDBStatus;
63 
CreateUserDBAndTable(sqlite3 * & db)64 void CreateUserDBAndTable(sqlite3 *&db)
65 {
66     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
67     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
68     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
69 }
70 
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)71 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
72 {
73     option.devices = { "CLOUD" };
74     option.mode = SYNC_MODE_CLOUD_MERGE;
75     option.query = query;
76     option.waitTime = g_syncWaitTime;
77     option.priorityTask = isPriorityTask;
78     option.compensatedSyncOnly = isCompensatedSyncOnly;
79 }
80 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)81 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
82     bool prioritySync = false)
83 {
84     std::mutex dataMutex;
85     std::condition_variable cv;
86     bool finish = false;
87     auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
88         for (const auto &item: process) {
89             actualDBStatus.push_back(item.second.errCode);
90             if (item.second.process == DistributedDB::FINISHED) {
91                 {
92                     std::lock_guard<std::mutex> autoLock(dataMutex);
93                     finish = true;
94                 }
95                 cv.notify_one();
96             }
97         }
98     };
99     CloudSyncOption option;
100     option.devices = { "CLOUD" };
101     option.mode = SYNC_MODE_CLOUD_MERGE;
102     option.query = query;
103     option.waitTime = g_syncWaitTime;
104     option.priorityTask = prioritySync;
105     ASSERT_EQ(delegate->Sync(option, callback), OK);
106     std::unique_lock<std::mutex> uniqueLock(dataMutex);
107     cv.wait(uniqueLock, [&finish]() {
108         return finish;
109     });
110 }
111 
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)112 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
113     bool isCompensatedSyncOnly = false)
114 {
115     std::mutex dataMutex;
116     std::condition_variable cv;
117     bool finish = false;
118     auto callback = [&cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
119         for (const auto &item: process) {
120             if (item.second.process == DistributedDB::FINISHED) {
121                 {
122                     std::lock_guard<std::mutex> autoLock(dataMutex);
123                     finish = true;
124                 }
125                 cv.notify_one();
126             }
127         }
128     };
129     CloudSyncOption option;
130     PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
131     ASSERT_EQ(delegate->Sync(option, callback), expectResult);
132     if (expectResult == OK) {
133         std::unique_lock<std::mutex> uniqueLock(dataMutex);
134         cv.wait(uniqueLock, [&finish]() {
135             return finish;
136         });
137     }
138 }
139 
BlockCompensatedSync(const Query & query,RelationalStoreDelegate * delegate,DBStatus expectResult,const std::function<void (const std::map<std::string,SyncProcess> & syncProcess)> & processCallback)140 void BlockCompensatedSync(const Query &query, RelationalStoreDelegate *delegate, DBStatus expectResult,
141     const std::function<void(const std::map<std::string, SyncProcess> &syncProcess)> &processCallback)
142 {
143     std::mutex dataMutex;
144     std::condition_variable cv;
145     bool finish = false;
146     auto callback = [&processCallback, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
147         for (const auto &item: process) {
148             if (item.second.process == DistributedDB::FINISHED) {
149                 {
150                     std::lock_guard<std::mutex> autoLock(dataMutex);
151                     finish = true;
152                 }
153                 cv.notify_one();
154             }
155         }
156         if (processCallback != nullptr) {
157             processCallback(process);
158         }
159     };
160     CloudSyncOption option;
161     PrepareOption(option, query, false, true);
162     ASSERT_EQ(delegate->Sync(option, callback), expectResult);
163     if (expectResult == OK) {
164         std::unique_lock<std::mutex> uniqueLock(dataMutex);
165         cv.wait(uniqueLock, [&finish]() {
166             return finish;
167         });
168     }
169 }
170 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)171 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
172 {
173     if (count != 1) {
174         return 0;
175     }
176     auto expectCount = reinterpret_cast<int64_t>(data);
177     EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
178     return 0;
179 }
180 
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)181 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
182 {
183     string query = "select count(*) from " + tableName + ";";
184     EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
185         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
186 }
187 
188 class DistributedDBCloudCheckSyncTest : public testing::Test {
189 public:
190     static void SetUpTestCase();
191     static void TearDownTestCase();
192     void SetUp() override;
193     void TearDown() override;
194 protected:
195     void InitTestDir();
196     DataBaseSchema GetSchema();
197     void CloseDb();
198     void InitDataAndSync();
199     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
200     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
201     void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
202         bool assetIsNull);
203     void DeleteUserTableRecord(int64_t id);
204     void DeleteUserTableRecord(int64_t begin, int64_t end);
205     void DeleteCloudTableRecord(int64_t gid);
206     void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
207     bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
208     bool CheckSyncProcessInner(SyncProcess &actualSyncProcess, SyncProcess &expectSyncProcess);
209     bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
210         vector<SyncProcess> &expectSyncProcessV);
211     void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
212         RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
213         bool isCheckProcess);
214     void DeleteCloudDBData(int64_t begin, int64_t count);
215     void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
216     void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
217     void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
218     void CheckLocalCount(int64_t expectCount);
219     void CheckLogCleaned(int64_t expectCount);
220     void CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo);
221     void CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo);
222     void SyncDataStatusTest(bool isCompensatedSyncOnly);
223     void CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast);
224     std::string testDir_;
225     std::string storePath_;
226     sqlite3 *db_ = nullptr;
227     RelationalStoreDelegate *delegate_ = nullptr;
228     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
229     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
230     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
231     std::string tableName_ = "DistributedDBCloudCheckSyncTest";
232     std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
233     std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
234     std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
235     std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
236     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
237 };
238 
SetUpTestCase()239 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
240 {
241     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
242 }
243 
TearDownTestCase()244 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
245 {}
246 
SetUp()247 void DistributedDBCloudCheckSyncTest::SetUp()
248 {
249     DistributedDBToolsUnitTest::PrintTestCaseInfo();
250     InitTestDir();
251     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
252         LOGE("rm test db files error.");
253     }
254     DistributedDBToolsUnitTest::PrintTestCaseInfo();
255     LOGD("Test dir is %s", testDir_.c_str());
256     db_ = RelationalTestUtils::CreateDataBase(storePath_);
257     ASSERT_NE(db_, nullptr);
258     CreateUserDBAndTable(db_);
259     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
260     RelationalStoreDelegate::Option option;
261     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
262     ASSERT_NE(delegate_, nullptr);
263     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
264     ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
265     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
266     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
267     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
268     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
269     DataBaseSchema dataBaseSchema = GetSchema();
270     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
271     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
272     ASSERT_TRUE(communicatorAggregator_ != nullptr);
273     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
274 }
275 
TearDown()276 void DistributedDBCloudCheckSyncTest::TearDown()
277 {
278     virtualCloudDb_->ForkQuery(nullptr);
279     virtualCloudDb_->SetCloudError(false);
280     CloseDb();
281     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
282     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
283         LOGE("rm test db files error.");
284     }
285     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
286     communicatorAggregator_ = nullptr;
287     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
288 }
289 
InitTestDir()290 void DistributedDBCloudCheckSyncTest::InitTestDir()
291 {
292     if (!testDir_.empty()) {
293         return;
294     }
295     DistributedDBToolsUnitTest::TestDirInit(testDir_);
296     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
297     LOGI("The test db is:%s", testDir_.c_str());
298 }
299 
GetSchema()300 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
301 {
302     DataBaseSchema schema;
303     TableSchema tableSchema;
304     tableSchema.name = tableName_;
305     tableSchema.sharedTableName = tableNameShared_;
306     tableSchema.fields = {
307         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
308         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
309     };
310     TableSchema tableWithoutPrimaryKeySchema;
311     tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
312     tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
313     tableWithoutPrimaryKeySchema.fields = {
314         {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
315         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
316     };
317     schema.tables.push_back(tableSchema);
318     schema.tables.push_back(tableWithoutPrimaryKeySchema);
319     return schema;
320 }
321 
CloseDb()322 void DistributedDBCloudCheckSyncTest::CloseDb()
323 {
324     virtualCloudDb_ = nullptr;
325     if (mgr_ != nullptr) {
326         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
327         delegate_ = nullptr;
328         mgr_ = nullptr;
329     }
330 }
331 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)332 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
333     int64_t recordCounts, int64_t begin)
334 {
335     ASSERT_NE(db_, nullptr);
336     for (int64_t i = begin; i < begin + recordCounts; ++i) {
337         string sql = "INSERT OR REPLACE INTO " + tableName
338             + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
339             + std::to_string(i) + "', '155.10',  'text', '21');";
340         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
341     }
342 }
343 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)344 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
345     bool assetIsNull)
346 {
347     InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
348 }
349 
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)350 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
351     int64_t photoSize, bool assetIsNull)
352 {
353     std::vector<uint8_t> photo(photoSize, 'v');
354     std::vector<VBucket> record1;
355     std::vector<VBucket> extend1;
356     std::vector<VBucket> record2;
357     std::vector<VBucket> extend2;
358     Timestamp now = TimeHelper::GetSysCurrentTime();
359     for (int64_t i = begin; i < begin + count; ++i) {
360         VBucket data;
361         data.insert_or_assign("id", std::to_string(i));
362         data.insert_or_assign("name", "Cloud" + std::to_string(i));
363         data.insert_or_assign("height", 166.0); // 166.0 is random double value
364         data.insert_or_assign("married", false);
365         data.insert_or_assign("photo", photo);
366         data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
367         Asset asset = g_cloudAsset;
368         asset.name = asset.name + std::to_string(i);
369         assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
370         record1.push_back(data);
371         VBucket log;
372         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
373             now / CloudDbConstant::TEN_THOUSAND + i));
374         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
375             now / CloudDbConstant::TEN_THOUSAND + i));
376         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
377         extend1.push_back(log);
378 
379         std::vector<Asset> assets;
380         data.insert_or_assign("height", 180.3); // 180.3 is random double value
381         for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
382             asset.name = g_cloudAsset.name + std::to_string(j);
383             assets.push_back(asset);
384         }
385         data.erase("assert");
386         data.erase("married");
387         assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
388         record2.push_back(data);
389         extend2.push_back(log);
390     }
391     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
392     std::this_thread::sleep_for(std::chrono::milliseconds(count));
393 }
394 
DeleteUserTableRecord(int64_t id)395 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
396 {
397     ASSERT_NE(db_, nullptr);
398     string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
399     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
400 }
401 
DeleteUserTableRecord(int64_t begin,int64_t end)402 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
403 {
404     ASSERT_NE(db_, nullptr);
405     std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
406     for (int64_t i = begin; i <= end; ++i) {
407         sql += "'" + std::to_string(i) + "',";
408     }
409     if (sql.back() == ',') {
410         sql.pop_back();
411     }
412     sql += ");";
413     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
414 }
415 
DeleteCloudTableRecord(int64_t gid)416 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
417 {
418     VBucket idMap;
419     idMap.insert_or_assign("#_gid", std::to_string(gid));
420     ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
421 }
422 
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)423 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
424 {
425     VBucket extend;
426     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
427     int64_t realCount = 0;
428     std::vector<VBucket> data;
429     virtualCloudDb_->Query(tableName, extend, data);
430     for (size_t j = 0; j < data.size(); ++j) {
431         auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
432         if (entry != data[j].end() && std::get<bool>(entry->second)) {
433             continue;
434         }
435         realCount++;
436     }
437     EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
438 }
439 
CheckSyncCount(const Info actualInfo,const Info expectInfo)440 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
441 {
442     if (actualInfo.batchIndex != expectInfo.batchIndex) {
443         return false;
444     }
445     if (actualInfo.total != expectInfo.total) {
446         return false;
447     }
448     if (actualInfo.successCount != expectInfo.successCount) {
449         return false;
450     }
451     if (actualInfo.failCount != expectInfo.failCount) {
452         return false;
453     }
454     return true;
455 }
456 
CheckSyncProcessInner(SyncProcess & actualSyncProcess,SyncProcess & expectSyncProcess)457 bool DistributedDBCloudCheckSyncTest::CheckSyncProcessInner(SyncProcess &actualSyncProcess,
458     SyncProcess &expectSyncProcess)
459 {
460     for (const auto &itInner : actualSyncProcess.tableProcess) {
461         std::string tableName = itInner.first;
462         if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
463             return false;
464         }
465         TableProcessInfo actualTableProcessInfo = itInner.second;
466         TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
467         if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
468             return false;
469         }
470         if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
471             return false;
472         }
473     }
474     return true;
475 }
476 
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)477 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
478     std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
479 {
480     vector<map<string, SyncProcess>> expectSyncProcess;
481     for (auto syncProcess : expectSyncProcessV) {
482         map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
483         expectSyncProcess.emplace_back(expectSyncProcessMap);
484     }
485     for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
486         map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
487         map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
488         for (auto &it : actualSyncProcessMap) {
489             string mapKey = it.first;
490             if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
491                 return false;
492             }
493             SyncProcess actualSyncProcess = it.second;
494             SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
495             if (!CheckSyncProcessInner(actualSyncProcess, expectSyncProcess)) {
496                 return false;
497             }
498         }
499     }
500     return true;
501 }
502 
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)503 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
504     RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
505     bool isCheckProcess)
506 {
507     std::mutex dataMutex;
508     std::condition_variable cv;
509     bool normalFinish = false;
510     bool priorityFinish = false;
511     auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
512         const std::map<std::string, SyncProcess> &process) {
513         auto foundFinishedProcess = std::find_if(process.begin(), process.end(), [](const auto &item) {
514             return item.second.process == DistributedDB::FINISHED;
515         });
516         if (foundFinishedProcess != process.end()) {
517             normalFinish = true;
518             if (isCheckProcess) {
519                 ASSERT_EQ(priorityFinish, true);
520             }
521             cv.notify_one();
522         }
523         prioritySyncProcess.emplace_back(process);
524     };
525     auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
526         const std::map<std::string, SyncProcess> &process) {
527         auto it = std::find_if(process.begin(), process.end(), [](const auto &item) {
528             return item.second.process == DistributedDB::FINISHED;
529         });
530         if (it != process.end()) {
531             priorityFinish = true;
532             cv.notify_one();
533         }
534         prioritySyncProcess.emplace_back(process);
535     };
536     CloudSyncOption option;
537     PrepareOption(option, normalQuery, false);
538     virtualCloudDb_->SetBlockTime(500); // 500 ms
539     ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
540     PrepareOption(option, priorityQuery, true);
541     std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
542     ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
543     std::unique_lock<std::mutex> uniqueLock(dataMutex);
544     cv.wait(uniqueLock, [&normalFinish]() {
545         return normalFinish;
546     });
547 }
548 
DeleteCloudDBData(int64_t begin,int64_t count)549 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
550 {
551     for (int64_t i = begin; i < begin + count; i++) {
552         VBucket idMap;
553         idMap.insert_or_assign("#_gid", std::to_string(i));
554         ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
555     }
556 }
557 
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)558 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
559 {
560     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
561         count++;
562         if (count == 1) { // taskid1
563             std::this_thread::sleep_for(std::chrono::seconds(1));
564         }
565         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
566             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
567         }
568         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
569             CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
570         }
571         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
572             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
573         }
574     });
575 }
576 
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)577 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
578 {
579     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
580         count++;
581         if (count == 1) { // taskid1
582             std::this_thread::sleep_for(std::chrono::seconds(1));
583         }
584         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
585             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
586         }
587         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
588             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
589         }
590         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
591             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
592         }
593     });
594 }
595 
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)596 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
597 {
598     // prepare data
599     InsertUserTableRecord(tableName_, dataCount);
600     // sync
601     Query query = Query::Select().FromTable({ tableName_ });
602     BlockSync(query, delegate_, g_actualDBStatus);
603     // delete cloud data
604     for (int i = 0; i < dataCount; ++i) {
605         DeleteCloudTableRecord(i);
606     }
607     // sync again
608     BlockSync(query, delegate_, g_actualDBStatus);
609 }
610 
CheckLocalCount(int64_t expectCount)611 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
612 {
613     // check local data
614     int dataCnt = -1;
615     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
616     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
617         dataCnt = sqlite3_column_int(stmt, 0);
618         return E_OK;
619     });
620     EXPECT_EQ(dataCnt, expectCount);
621 }
622 
CheckLogCleaned(int64_t expectCount)623 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
624 {
625     std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
626         " where device = 'cloud';";
627     EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
628         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
629     std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
630         " is not null and cloud_gid != '';";
631     EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
632         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
633     std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
634         " where flag & 0x02 != 0;";
635     EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
636         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
637 }
638 
CheckUploadInfo(const Info & actualUploadInfo,const Info & expectUploadInfo)639 void DistributedDBCloudCheckSyncTest::CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo)
640 {
641     EXPECT_EQ(actualUploadInfo.batchIndex, expectUploadInfo.batchIndex);
642     EXPECT_EQ(actualUploadInfo.total, expectUploadInfo.total);
643     EXPECT_EQ(actualUploadInfo.successCount, expectUploadInfo.successCount);
644     EXPECT_EQ(actualUploadInfo.failCount, expectUploadInfo.failCount);
645     EXPECT_EQ(actualUploadInfo.insertCount, expectUploadInfo.insertCount);
646     EXPECT_EQ(actualUploadInfo.updateCount, expectUploadInfo.updateCount);
647     EXPECT_EQ(actualUploadInfo.deleteCount, expectUploadInfo.deleteCount);
648 }
649 
CheckDownloadInfo(const Info & actualDownloadInfo,const Info & expectDownloadInfo)650 void DistributedDBCloudCheckSyncTest::CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo)
651 {
652     EXPECT_EQ(actualDownloadInfo.batchIndex, expectDownloadInfo.batchIndex);
653     EXPECT_EQ(actualDownloadInfo.total, expectDownloadInfo.total);
654     EXPECT_EQ(actualDownloadInfo.successCount, expectDownloadInfo.successCount);
655     EXPECT_EQ(actualDownloadInfo.failCount, expectDownloadInfo.failCount);
656     EXPECT_EQ(actualDownloadInfo.insertCount, expectDownloadInfo.insertCount);
657     EXPECT_EQ(actualDownloadInfo.updateCount, expectDownloadInfo.updateCount);
658     EXPECT_EQ(actualDownloadInfo.deleteCount, expectDownloadInfo.deleteCount);
659 }
660 
661 /**
662  * @tc.name: CloudSyncTest001
663  * @tc.desc: sync with device sync query
664  * @tc.type: FUNC
665  * @tc.require:
666  * @tc.author: zhangqiquan
667  */
668 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level1)
669 {
670     // prepare data
671     const int actualCount = 10;
672     InsertUserTableRecord(tableName_, actualCount);
673     // sync twice
674     Query query = Query::Select().FromTable({ tableName_ });
675     BlockSync(query, delegate_, g_actualDBStatus);
676     BlockSync(query, delegate_, g_actualDBStatus);
677     // remove cloud data
678     delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
679     // check local data
680     int dataCnt = -1;
681     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon6d2f4c681002(sqlite3_stmt *stmt) 682     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
683         dataCnt = sqlite3_column_int(stmt, 0);
684         return E_OK;
685     });
686     EXPECT_EQ(dataCnt, 0);
687 }
688 
689 /**
690  * @tc.name: CloudSyncTest002
691  * @tc.desc: sync with same data in one batch
692  * @tc.type: FUNC
693  * @tc.require:
694  * @tc.author: zhangqiquan
695  */
696 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level1)
697 {
698     // prepare data
699     const int actualCount = 1;
700     InsertUserTableRecord(tableName_, actualCount);
701     // sync twice
702     Query query = Query::Select().FromTable({ tableName_ });
703     BlockSync(query, delegate_, g_actualDBStatus);
704     // cloud delete id=0 and insert id=0 but its gid is 1
705     // local delete id=0
706     DeleteCloudTableRecord(0); // cloud gid is 0
707     InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
708     DeleteUserTableRecord(0); // 0 is id
709     BlockSync(query, delegate_, g_actualDBStatus);
710     bool deleteStatus = true;
711     EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
712     EXPECT_EQ(deleteStatus, false);
713 }
714 
715 /**
716  * @tc.name: CloudSyncTest003
717  * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
718  * @tc.type: FUNC
719  * @tc.require:
720  * @tc.author: zhangshijie
721  */
722 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level1)
723 {
724     // prepare data
725     const int actualCount = 1;
726     InsertUserTableRecord(tableName_, actualCount);
727 
728     InsertCloudTableRecord(0, actualCount, 0, false);
729     // delete local data
730     DeleteUserTableRecord(0);
731     Query query = Query::Select().FromTable({ tableName_ });
732     BlockSync(query, delegate_, g_actualDBStatus);
733 
734     // check local data, cloud date will insert into local
735     int dataCnt = -1;
736     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon6d2f4c681102(sqlite3_stmt *stmt) 737     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
738         dataCnt = sqlite3_column_int(stmt, 0);
739         return E_OK;
740     });
741     EXPECT_EQ(dataCnt, actualCount);
742 }
743 
744 /**
745  * @tc.name: CloudSyncTest004
746  * @tc.desc: sync after insert failed
747  * @tc.type: FUNC
748  * @tc.require:
749  * @tc.author: zhangqiquan
750  */
751 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level1)
752 {
753     // prepare data
754     const int actualCount = 1;
755     InsertUserTableRecord(tableName_, actualCount);
756     // sync twice
757     Query query = Query::Select().FromTable({ tableName_ });
758     LOGW("Block Sync");
759     virtualCloudDb_->SetInsertFailed(1);
760     BlockSync(query, delegate_, g_actualDBStatus);
761     // delete local data
762     DeleteUserTableRecord(0); // 0 is id
763     LOGW("Block Sync");
764     // sync again and this record with be synced to cloud
765     BlockSync(query, delegate_, g_actualDBStatus);
766     bool deleteStatus = true;
767     EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
768     EXPECT_EQ(deleteStatus, true);
769 }
770 
771 /**
772  * @tc.name: CloudSyncTest005
773  * @tc.desc: check device in process after sync
774  * @tc.type: FUNC
775  * @tc.require:
776  * @tc.author: liaoyonghuang
777  */
778 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest005, TestSize.Level1)
779 {
780     /**
781      * @tc.steps:step1. init data and sync
782      * @tc.expected: step1. ok.
783      */
784     const int localCount = 20; // 20 is count of local
785     const int cloudCount = 10; // 10 is count of cloud
786     InsertUserTableRecord(tableName_, localCount);
787     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
788     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
789     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
790 
791     /**
792      * @tc.steps:step2. check device name in process
793      * @tc.expected: step2. ok.
794      */
795     Query query = Query::Select().FromTable({tableName_});
__anon6d2f4c681202(const std::map<std::string, SyncProcess> &syncProcess) 796     auto callback = [](const std::map<std::string, SyncProcess> &syncProcess) {
797         EXPECT_TRUE(syncProcess.find("CLOUD") != syncProcess.end());
798     };
799     BlockCompensatedSync(query, delegate_, OK, callback);
800 }
801 
InitDataAndSync()802 void DistributedDBCloudCheckSyncTest::InitDataAndSync()
803 {
804     const int localCount = 120; // 120 is count of local
805     const int cloudCount = 100; // 100 is count of cloud
806     InsertUserTableRecord(tableName_, localCount, 0);
807     InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
808     InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false); // 80 is begin sync number
809 }
810 
811 /**
812  * @tc.name: CloudSyncTest006
813  * @tc.desc: check reDownload when common sync pause.
814  * @tc.type: FUNC
815  * @tc.require:
816  * @tc.author: luoguo
817  */
818 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level1)
819 {
820     /**
821      * @tc.steps:step1. init data and sync
822      * @tc.expected: step1. ok.
823      */
824     InitDataAndSync();
825 
826     /**
827      * @tc.steps:step2. common sync will pause
828      * @tc.expected: step2. ok.
829      */
830     std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
831     Query normalQuery = Query::Select().FromTable({tableNames});
832     std::vector<std::string> idValue = {"0", "1", "2"};
833     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
834     CloudSyncOption option;
835     CloudSyncOption priorityOption;
836     PrepareOption(option, normalQuery, false);
837     PrepareOption(priorityOption, priorityQuery, true);
838     bool isUpload = false;
839     uint32_t blockTime = 2000;
__anon6d2f4c681302(const std::string &tableName, VBucket &extend) 840     virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
841         if (isUpload == false) {
842             isUpload = true;
843             std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
844         }
845     });
846     bool isFinished = false;
847     bool priorityFinish = false;
__anon6d2f4c681402(const std::map<std::string, SyncProcess> &process) 848     auto normalCallback = [&isFinished, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
849         for (const auto &item : process) {
850             if (item.second.process == DistributedDB::FINISHED) {
851                 isFinished = true;
852                 ASSERT_EQ(priorityFinish, true);
853             }
854         }
855     };
856     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
857 
858     /**
859      * @tc.steps:step3. wait common upload and priority sync.
860      * @tc.expected: step3. ok.
861      */
862     while (isUpload == false) {
863         std::this_thread::sleep_for(std::chrono::milliseconds(50));
864     }
__anon6d2f4c681502(const std::map<std::string, SyncProcess> &process) 865     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
866         for (const auto &item : process) {
867             if (item.second.process == DistributedDB::FINISHED) {
868                 priorityFinish = true;
869             }
870         }
871     };
872     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
873     while (isFinished == false || priorityFinish == false) {
874         std::this_thread::sleep_for(std::chrono::milliseconds(50));
875     }
876 
877     /**
878      * @tc.steps:step4. wait common sync and priority sync finish, check query Times.
879      * @tc.expected: step4. ok.
880      */
881     uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
882     ASSERT_EQ(times, 3u);
883     virtualCloudDb_->ForkUpload(nullptr);
884 }
885 
886 /**
887  * @tc.name: CloudSyncTest007
888  * @tc.desc: check process info when version conflict sync process.
889  * @tc.type: FUNC
890  * @tc.require:
891  * @tc.author: luoguo
892  */
893 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level1)
894 {
895     /**
896      * @tc.steps:step1. init data and sync
897      * @tc.expected: step1. ok.
898      */
899     const int localCount = 60;
900     InsertUserTableRecord(tableName_, localCount, 0);
901     Query query = Query::Select().FromTable({tableName_});
902     BlockSync(query, delegate_, g_actualDBStatus);
903 
904     /**
905      * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
906      * @tc.expected: step2. ok.
907      */
908     DeleteUserTableRecord(30, 59);
909     bool isUpload = false;
__anon6d2f4c681602(const std::string &tableName, VBucket &extend) 910     virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
911         if (isUpload == false) {
912             isUpload = true;
913             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
914         }
915     });
916     bool isFinished = false;
917     std::map<std::string, TableProcessInfo> retSyncProcess;
__anon6d2f4c681702(const std::map<std::string, SyncProcess> &process) 918     auto normalCallback = [&isFinished, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
919         for (const auto &item : process) {
920             if (item.second.process == DistributedDB::FINISHED) {
921                 isFinished = true;
922                 ASSERT_EQ(process.empty(), false);
923                 auto lastProcess = process.rbegin();
924                 retSyncProcess = lastProcess->second.tableProcess;
925             }
926         }
927     };
928 
929     /**
930      * @tc.steps:step3. sync.
931      * @tc.expected: step3. ok.
932      */
933     std::vector<std::string> tableNames = {tableName_};
934     Query normalQuery = Query::Select().FromTable({tableNames});
935     CloudSyncOption option;
936     PrepareOption(option, normalQuery, false);
937     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
938 
939     /**
940      * @tc.steps:step4. wait upload process and delete 30 record in cloud table.
941      * @tc.expected: step4. ok.
942      */
943     while (isUpload == false) {
944         std::this_thread::sleep_for(std::chrono::milliseconds(50));
945     }
946     DeleteCloudTableRecord(30);
947 
948     /**
949      * @tc.steps:step5. wait sync process end and check data.
950      * @tc.expected: step5. ok.
951      */
952     while (isFinished == false) {
953         std::this_thread::sleep_for(std::chrono::milliseconds(50));
954     }
955     ASSERT_EQ(retSyncProcess.empty(), false);
956     auto taskInfo = retSyncProcess.rbegin();
957     ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
958     virtualCloudDb_->ForkUpload(nullptr);
959 }
960 
961 /**
962  * @tc.name: CloudSyncTest008
963  * @tc.desc: test when normal sync interrupted by priority sync, process info should be consistent
964  * @tc.type: FUNC
965  * @tc.require:
966  * @tc.author: suyuchen
967  */
968 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest008, TestSize.Level1)
969 {
970     /**
971      * @tc.steps: step1. insert 35 records to user table
972      * @tc.expected: step1. OK.
973      */
974     const int localCount = 35;
975     InsertUserTableRecord(tableName_, localCount, 0);
976     Query query = Query::Select().FromTable({tableName_});
977 
978     /**
979      * @tc.steps: step2. Set CLOUD_VERSION_CONFLICT when normal sync task upload
980      * @tc.expected: step2. OK.
981      */
982     int recordIndex = 0;
983     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6d2f4c681802(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 984         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
985         recordIndex++;
986         if (recordIndex == 20) {
987             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
988             std::this_thread::sleep_for(std::chrono::seconds(1));
989             return CLOUD_VERSION_CONFLICT;
990         }
991         return OK;
992     });
993 
994     /**
995      * @tc.steps: step3. set callback function for normal sync
996      * @tc.expected: step3. OK.
997      */
998     std::map<std::string, TableProcessInfo> retSyncProcess;
__anon6d2f4c681902(const std::map<std::string, SyncProcess> &process) 999     auto normalCallback = [&retSyncProcess](const std::map<std::string, SyncProcess> &process) {
1000         for (const auto &item : process) {
1001             if (item.second.process == DistributedDB::FINISHED) {
1002                 ASSERT_EQ(process.empty(), false);
1003                 auto lastProcess = process.rbegin();
1004                 retSyncProcess = lastProcess->second.tableProcess;
1005             }
1006         }
1007     };
1008 
1009     /**
1010      * @tc.steps: step4. start normal sync
1011      * @tc.expected: step4. OK.
1012      */
1013     CloudSyncOption option;
1014     PrepareOption(option, query, false);
1015     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
__anon6d2f4c681a02() 1016     std::thread syncThread1([&]() {
1017         BlockSync(query, delegate_, g_actualDBStatus);
1018     });
1019 
1020     /**
1021      * @tc.steps: step5. start priority sync
1022      * @tc.expected: step5. OK.
1023      */
1024     std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anon6d2f4c681b02() 1025     std::thread syncThread2([&]() {
1026         BlockSync(query, delegate_, g_actualDBStatus, true);
1027     });
1028     syncThread1.join();
1029     syncThread2.join();
1030 
1031     /**
1032      * @tc.steps: step6. Check notification of normal sync
1033      * @tc.expected: step6. OK.
1034      */
1035     ASSERT_EQ(retSyncProcess.empty(), false);
1036     auto taskInfo = retSyncProcess.rbegin();
1037     ASSERT_EQ(taskInfo->second.upLoadInfo.total, 35u);
1038     ASSERT_EQ(taskInfo->second.upLoadInfo.successCount, 35u);
1039     ASSERT_EQ(taskInfo->second.upLoadInfo.failCount, 0u);
1040 
1041     virtualCloudDb_->ForkInsertConflict(nullptr);
1042 }
1043 
1044 /**
1045  * @tc.name: CloudSyncTest009
1046  * @tc.desc: reopen database and sync
1047  * @tc.type: FUNC
1048  * @tc.require:
1049  * @tc.author: wangxiangdong
1050  */
1051 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level1)
1052 {
1053     /**
1054      * @tc.steps: step1. insert 1 record to user table
1055      * @tc.expected: step1. OK.
1056      */
1057     const int actualCount = 1;
1058     InsertUserTableRecord(tableName_, actualCount);
1059     /**
1060      * @tc.steps: step2. sync data to cloud
1061      * @tc.expected: step2. OK.
1062      */
1063     Query query = Query::Select().FromTable({ tableName_ });
1064     BlockSync(query, delegate_, g_actualDBStatus);
1065     CheckCloudTableCount(tableName_, 1);
1066     /**
1067      * @tc.steps: step3. drop data table then close db
1068      * @tc.expected: step3. OK.
1069      */
1070     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1071     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1072     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1073     delegate_ = nullptr;
1074     /**
1075      * @tc.steps: step4. recreate data table and reopen database
1076      * @tc.expected: step4. OK.
1077      */
1078     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1079     RelationalStoreDelegate::Option option;
1080     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1081     ASSERT_NE(delegate_, nullptr);
1082     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1083     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1084     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1085     DataBaseSchema dataBaseSchema = GetSchema();
1086     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1087     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1088     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1089     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1090     /**
1091      * @tc.steps: step5. sync and cloud data should be deleted
1092      * @tc.expected: step5. OK.
1093      */
1094     BlockSync(query, delegate_, g_actualDBStatus);
1095     CheckCloudTableCount(tableName_, 0);
1096 }
1097 
1098 /**
1099  * @tc.name: CloudSyncTest010
1100  * @tc.desc: reopen database, recreate table with less columns and sync
1101  * @tc.type: FUNC
1102  * @tc.require:
1103  * @tc.author: wangxiangdong
1104  */
1105 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level1)
1106 {
1107     /**
1108      * @tc.steps: step1. insert 1 record to user table
1109      * @tc.expected: step1. OK.
1110      */
1111     const int actualCount = 1;
1112     InsertUserTableRecord(tableName_, actualCount);
1113     /**
1114      * @tc.steps: step2. sync data to cloud
1115      * @tc.expected: step2. OK.
1116      */
1117     Query query = Query::Select().FromTable({ tableName_ });
1118     BlockSync(query, delegate_, g_actualDBStatus);
1119     CheckCloudTableCount(tableName_, 1);
1120     /**
1121      * @tc.steps: step3. drop data table then close db
1122      * @tc.expected: step3. OK.
1123      */
1124     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1125     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1126     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1127     delegate_ = nullptr;
1128     /**
1129      * @tc.steps: step4. recreate data table and reopen database
1130      * @tc.expected: step4. OK.
1131      */
1132     std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
1133     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
1134     RelationalStoreDelegate::Option option;
1135     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1136     ASSERT_NE(delegate_, nullptr);
1137     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1138     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1139     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1140     DataBaseSchema dataBaseSchema = GetSchema();
1141     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1142     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1143     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1144     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1145     /**
1146      * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1147      * @tc.expected: step5. OK.
1148      */
1149     BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1150     CheckCloudTableCount(tableName_, 1);
1151 }
1152 
1153 /**
1154  * @tc.name: CloudSyncTest011
1155  * @tc.desc: reopen database, do not recreate table and sync
1156  * @tc.type: FUNC
1157  * @tc.require:
1158  * @tc.author: wangxiangdong
1159  */
1160 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest011, TestSize.Level1)
1161 {
1162     /**
1163      * @tc.steps: step1. insert 1 record to user table
1164      * @tc.expected: step1. OK.
1165      */
1166     const int actualCount = 1;
1167     InsertUserTableRecord(tableName_, actualCount);
1168     /**
1169      * @tc.steps: step2. sync data to cloud
1170      * @tc.expected: step2. OK.
1171      */
1172     Query query = Query::Select().FromTable({ tableName_ });
1173     BlockSync(query, delegate_, g_actualDBStatus);
1174     CheckCloudTableCount(tableName_, 1);
1175     /**
1176      * @tc.steps: step3. drop data table then close db
1177      * @tc.expected: step3. OK.
1178      */
1179     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1180     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1181     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1182     delegate_ = nullptr;
1183     /**
1184      * @tc.steps: step4. reopen database
1185      * @tc.expected: step4. OK.
1186      */
1187     RelationalStoreDelegate::Option option;
1188     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1189     ASSERT_NE(delegate_, nullptr);
1190     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1191     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1192     DataBaseSchema dataBaseSchema = GetSchema();
1193     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1194     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1195     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1196     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1197     /**
1198      * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1199      * @tc.expected: step5. OK.
1200      */
1201     BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1202 }
1203 
1204 /**
1205  * @tc.name: CloudSyncTest012
1206  * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1207  * @tc.type: FUNC
1208  * @tc.require:
1209  * @tc.author: wangxiangdong
1210  */
1211 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest012, TestSize.Level1)
1212 {
1213     /**
1214      * @tc.steps: step1. insert 1 record to user table
1215      * @tc.expected: step1. OK.
1216      */
1217     const int actualCount = 1;
1218     InsertUserTableRecord(tableName_, actualCount);
1219     /**
1220      * @tc.steps: step2. sync data to cloud
1221      * @tc.expected: step2. OK.
1222      */
1223     Query query = Query::Select().FromTable({ tableName_ });
1224     BlockSync(query, delegate_, g_actualDBStatus);
1225     CheckCloudTableCount(tableName_, 1);
1226     /**
1227      * @tc.steps: step3. drop data table then close db
1228      * @tc.expected: step3. OK.
1229      */
1230     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1231     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1232     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1233     delegate_ = nullptr;
1234     /**
1235      * @tc.steps: step4. recreate data table and reopen database
1236      * @tc.expected: step4. OK.
1237      */
1238     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1239     RelationalStoreDelegate::Option option;
1240     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1241     ASSERT_NE(delegate_, nullptr);
1242     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1243     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1244     DataBaseSchema dataBaseSchema = GetSchema();
1245     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1246     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1247     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1248     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1249     /**
1250      * @tc.steps: step5. insert data to new table
1251      * @tc.expected: step5. OK.
1252      */
1253     int begin = 1;
1254     InsertUserTableRecord(tableName_, actualCount, begin);
1255     /**
1256      * @tc.steps: step6. sync and cloud data should be deleted
1257      * @tc.expected: step6. OK.
1258      */
1259     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1260     BlockSync(query, delegate_, g_actualDBStatus);
1261     CheckCloudTableCount(tableName_, 1);
1262 }
1263 
1264 /**
1265  * @tc.name: CloudSyncTest013
1266  * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1267  * @tc.type: FUNC
1268  * @tc.require:
1269  * @tc.author: tankaisheng
1270  */
1271 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest013, TestSize.Level0)
1272 {
1273     /**
1274      * @tc.steps: step1. insert 1 record to user table
1275      * @tc.expected: step1. OK.
1276      */
1277     const int actualCount = 10;
1278     InsertUserTableRecord(tableName_, actualCount);
1279     /**
1280      * @tc.steps: step2. sync data to cloud
1281      * @tc.expected: step2. OK.
1282      */
1283     Query query = Query::Select().FromTable({ tableName_ });
1284     BlockSync(query, delegate_, g_actualDBStatus);
1285     CheckCloudTableCount(tableName_, 10);
1286     /**
1287      * @tc.steps: step3. drop data table then close db
1288      * @tc.expected: step3. OK.
1289      */
1290     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1291     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1292     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1293     delegate_ = nullptr;
1294     /**
1295      * @tc.steps: step4. recreate data table and reopen database
1296      * @tc.expected: step4. OK.
1297      */
1298     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1299     RelationalStoreDelegate::Option option;
1300     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1301     ASSERT_NE(delegate_, nullptr);
1302     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1303     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1304     DataBaseSchema dataBaseSchema = GetSchema();
1305     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1306     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1307     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1308     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1309     /**
1310      * @tc.steps: step5. insert data to new table
1311      * @tc.expected: step5. OK.
1312      */
1313     int begin = 0;
1314     InsertUserTableRecord(tableName_, actualCount, begin);
1315     /**
1316      * @tc.steps: step6. sync and cloud data should be deleted
1317      * @tc.expected: step6. OK.
1318      */
1319     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1320     BlockSync(query, delegate_, g_actualDBStatus);
1321     CheckCloudTableCount(tableName_, 10);
1322 }
1323 
1324 /**
1325  * @tc.name: CloudSyncObserverTest001
1326  * @tc.desc: test cloud sync multi observer
1327  * @tc.type: FUNC
1328  * @tc.require:
1329  * @tc.author: zhangshijie
1330  */
1331 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level1)
1332 {
1333     // prepare data
1334     const int actualCount = 10;
1335     InsertUserTableRecord(tableName_, actualCount);
1336 
1337     /**
1338      * @tc.steps:step1. open two delegate with two observer.
1339      * @tc.expected: step1. ok.
1340      */
1341     RelationalStoreDelegate::Option option;
1342     auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
1343     ASSERT_NE(observer1, nullptr);
1344     option.observer = observer1;
1345     RelationalStoreDelegate *delegate1 = nullptr;
1346     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1347     ASSERT_NE(delegate1, nullptr);
1348 
1349     auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
1350     ASSERT_NE(observer2, nullptr);
1351     option.observer = observer2;
1352     RelationalStoreDelegate *delegate2 = nullptr;
1353     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
1354     ASSERT_NE(delegate2, nullptr);
1355 
1356     /**
1357      * @tc.steps:step2. insert 1-10 cloud data, start.
1358      * @tc.expected: step2. ok.
1359      */
1360     InsertCloudTableRecord(0, actualCount, actualCount, false);
1361     Query query = Query::Select().FromTable({ tableName_ });
1362     BlockSync(query, delegate_, g_actualDBStatus);
1363 
1364     /**
1365      * @tc.steps:step3. check observer.
1366      * @tc.expected: step3. ok.
1367      */
1368     EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1369     EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1370 
1371     /**
1372      * @tc.steps:step4. insert 11-20 cloud data, start.
1373      * @tc.expected: step4. ok.
1374      */
1375     delegate2->UnRegisterObserver();
1376     observer2->ResetCloudSyncToZero();
1377     int64_t begin = 11;
1378     InsertCloudTableRecord(begin, actualCount, actualCount, false);
1379     BlockSync(query, delegate_, g_actualDBStatus);
1380 
1381     /**
1382      * @tc.steps:step5. check observer.
1383      * @tc.expected: step5. ok.
1384      */
1385     EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1386     EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1387 
1388     delete observer1;
1389     observer1 = nullptr;
1390     EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1391 
1392     delete observer2;
1393     observer2 = nullptr;
1394     EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1395 }
1396 
1397 /**
1398  * @tc.name: CloudPrioritySyncTest001
1399  * @tc.desc: use priority sync interface when query in or from table
1400  * @tc.type: FUNC
1401  * @tc.require:
1402  * @tc.author: chenchaohao
1403  */
1404 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level1)
1405 {
1406     /**
1407      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1408      * @tc.expected: step1. ok.
1409      */
1410     const int actualCount = 10; // 10 is count of records
1411     InsertUserTableRecord(tableName_, actualCount);
1412     std::vector<std::string> idValue = {"0", "1", "2"};
1413     Query query = Query::Select().From(tableName_).In("id", idValue);
1414 
1415     /**
1416      * @tc.steps:step2. check ParserQueryNodes
1417      * @tc.expected: step2. ok.
1418      */
__anon6d2f4c681c02(const std::string &tableName, VBucket &extend) 1419     virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1420         EXPECT_EQ(tableName_, tableName);
1421         if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1422             return;
1423         }
1424         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1425         DBStatus status = OK;
1426         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1427         EXPECT_EQ(status, OK);
1428         ASSERT_EQ(queryNodes.size(), 1u);
1429         EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1430         EXPECT_EQ(queryNodes[0].fieldName, "id");
1431         ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1432         for (size_t i = 0u; i < idValue.size(); i++) {
1433             std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1434             EXPECT_EQ(val, idValue[i]);
1435         }
1436     });
1437     BlockPrioritySync(query, delegate_, true, OK);
1438     virtualCloudDb_->ForkQuery(nullptr);
1439     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1440 
1441     /**
1442      * @tc.steps:step3. use priority sync interface but not priority.
1443      * @tc.expected: step3. ok.
1444      */
1445     query = Query::Select().FromTable({ tableName_ });
1446     BlockPrioritySync(query, delegate_, false, OK);
1447     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1448 
1449     /**
1450      * @tc.steps:step4. insert user table record and query from table, then priority sync.
1451      * @tc.expected: step4. ok.
1452      */
1453     InsertUserTableRecord(tableName_, actualCount, actualCount);
1454     BlockPrioritySync(query, delegate_, true, OK);
1455     CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1456 }
1457 
1458 
1459 /**
1460  * @tc.name: CloudPrioritySyncTest002
1461  * @tc.desc: priority sync in some abnormal query situations
1462  * @tc.type: FUNC
1463  * @tc.require:
1464  * @tc.author: chenchaohao
1465  */
1466 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level1)
1467 {
1468     /**
1469      * @tc.steps:step1. insert user table record.
1470      * @tc.expected: step1. ok.
1471      */
1472     const int actualCount = 1; // 1 is count of records
1473     InsertUserTableRecord(tableName_, actualCount);
1474 
1475     /**
1476      * @tc.steps:step2. query select tablename then priority sync.
1477      * @tc.expected: step2. invalid.
1478      */
1479     Query query = Query::Select(tableName_);
1480     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1481     CheckCloudTableCount(tableName_, 0);
1482 
1483     /**
1484      * @tc.steps:step3. query select without from then priority sync.
1485      * @tc.expected: step3. invalid.
1486      */
1487     query = Query::Select();
1488     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1489     CheckCloudTableCount(tableName_, 0);
1490 
1491     /**
1492      * @tc.steps:step4. query select and from without in then priority sync.
1493      * @tc.expected: step4. invalid.
1494      */
1495     query = Query::Select().From(tableName_);
1496     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1497     CheckCloudTableCount(tableName_, 0);
1498 
1499     /**
1500      * @tc.steps:step5. query select and fromtable then priority sync.
1501      * @tc.expected: step5. not support.
1502      */
1503     query = Query::Select().From(tableName_).FromTable({tableName_});
1504     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1505     CheckCloudTableCount(tableName_, 0);
1506 
1507     /**
1508      * @tc.steps:step6. query select and from with other predicates then priority sync.
1509      * @tc.expected: step6. not support.
1510      */
1511     query = Query::Select().From(tableName_).IsNotNull("id");
1512     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1513     CheckCloudTableCount(tableName_, 0);
1514 
1515     /**
1516      * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1517      * @tc.expected: step7 not support.
1518      */
1519     std::vector<std::string> idValue = {"0"};
1520     query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1521     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1522     CheckCloudTableCount(tableName_, 0);
1523 
1524     /**
1525      * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1526      * @tc.expected: step8. not support.
1527      */
1528     std::vector<std::string> heightValue = {"155.10"};
1529     query = Query::Select().From(tableName_).In("height", heightValue);
1530     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1531     CheckCloudTableCount(tableName_, 0);
1532 
1533     /**
1534      * @tc.steps:step9. query in count greater than 100.
1535      * @tc.expected: step9. over max limits.
1536      */
1537     idValue.resize(101); // 101 > 100
1538     query = Query::Select().From(tableName_).In("id", idValue);
1539     BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1540     CheckCloudTableCount(tableName_, 0);
1541 }
1542 
1543 /**
1544  * @tc.name: CloudPrioritySyncTest003
1545  * @tc.desc: priority sync when normal syncing
1546  * @tc.type: FUNC
1547  * @tc.require:
1548  * @tc.author: chenchaohao
1549  */
1550 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level1)
1551 {
1552     /**
1553      * @tc.steps:step1. insert user table record.
1554      * @tc.expected: step1. ok.
1555      */
1556     const int actualCount = 10; // 10 is count of records
1557     InsertUserTableRecord(tableName_, actualCount);
1558 
1559     /**
1560      * @tc.steps:step2. begin normal sync and priority sync.
1561      * @tc.expected: step2. ok.
1562      */
1563     Query normalQuery = Query::Select().FromTable({tableName_});
1564     std::vector<std::string> idValue = {"0", "1", "2"};
1565     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1566     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1567     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1568     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1569     virtualCloudDb_->Reset();
1570     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1571     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1572 }
1573 
1574 /**
1575  * @tc.name: CloudPrioritySyncTest004
1576  * @tc.desc: non-primarykey table priority sync
1577  * @tc.type: FUNC
1578  * @tc.require:
1579  * @tc.author: chenchaohao
1580  */
1581 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level1)
1582 {
1583     /**
1584      * @tc.steps:step1. insert user non-primarykey table record.
1585      * @tc.expected: step1. ok.
1586      */
1587     const int actualCount = 10; // 10 is count of records
1588     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1589 
1590     /**
1591      * @tc.steps:step2. begin priority sync.
1592      * @tc.expected: step2. not support.
1593      */
1594     std::vector<std::string> idValue = {"0", "1", "2"};
1595     Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1596     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1597     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1598 
1599     /**
1600      * @tc.steps:step3. begin priority sync when in rowid.
1601      * @tc.expected: step3. invalid.
1602      */
1603     std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1604     query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1605     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1606     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1607 }
1608 
1609 /**
1610  * @tc.name: CloudPrioritySyncTest005
1611  * @tc.desc: priority sync but don't have records
1612  * @tc.type: FUNC
1613  * @tc.require:
1614  * @tc.author: chenchaohao
1615  */
1616 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level1)
1617 {
1618     /**
1619      * @tc.steps:step1. insert user non-primarykey table record.
1620      * @tc.expected: step1. ok.
1621      */
1622     const int actualCount = 10; // 10 is count of records
1623     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1624 
1625     /**
1626      * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1627      * @tc.expected: step2. ok.
1628      */
1629     std::vector<std::string> idValue = {"0", "1", "2"};
1630     Query query = Query::Select().From(tableName_).In("id", idValue);
1631     BlockPrioritySync(query, delegate_, true, OK);
1632     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1633     CheckCloudTableCount(tableName_, 0);
1634 }
1635 
1636 /**
1637  * @tc.name: CloudPrioritySyncTest006
1638  * @tc.desc: priority sync tasks greater than limit
1639  * @tc.type: FUNC
1640  * @tc.require:
1641  * @tc.author: chenchaohao
1642  */
1643 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level1)
1644 {
1645     /**
1646      * @tc.steps:step1. insert user table record.
1647      * @tc.expected: step1. ok.
1648      */
1649     const int actualCount = 10; // 10 is count of records
1650     InsertUserTableRecord(tableName_, actualCount);
1651 
1652     /**
1653      * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1654      * @tc.expected: step2. ok.
1655      */
1656     std::vector<std::string> idValue = {"0", "1", "2"};
1657     Query query = Query::Select().From(tableName_).In("id", idValue);
1658     std::mutex dataMutex;
1659     std::condition_variable cv;
1660     std::mutex callbackMutex;
1661     std::condition_variable callbackCv;
1662     bool finish = false;
1663     size_t finishCount = 0u;
__anon6d2f4c681d02(const std::string &tableName, VBucket &extend) 1664     virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1665         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1666         cv.wait(uniqueLock, [&finish]() {
1667             return finish;
1668         });
1669     });
__anon6d2f4c681f02(const std::map<std::string, SyncProcess> &process) 1670     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1671         for (const auto &item: process) {
1672             if (item.second.process == DistributedDB::FINISHED) {
1673                 {
1674                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1675                     finishCount++;
1676                 }
1677                 callbackCv.notify_one();
1678             }
1679         }
1680     };
1681     CloudSyncOption option;
1682     PrepareOption(option, query, true);
1683     for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1684         ASSERT_EQ(delegate_->Sync(option, callback), OK);
1685     }
1686     ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1687     {
1688         std::lock_guard<std::mutex> autoLock(dataMutex);
1689         finish = true;
1690     }
1691     cv.notify_all();
1692     virtualCloudDb_->ForkQuery(nullptr);
1693     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682002() 1694     callbackCv.wait(callbackLock, [&finishCount]() {
1695         return (finishCount == 32u); // 32 is count of finished sync tasks
1696     });
1697     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1698 }
1699 
1700 /**
1701  * @tc.name: CloudPrioritySyncTest007
1702  * @tc.desc: priority normal priority normal when different query
1703  * @tc.type: FUNC
1704  * @tc.require:
1705  * @tc.author: chenchaohao
1706  */
1707 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level1)
1708 {
1709     /**
1710      * @tc.steps:step1. insert user table record.
1711      * @tc.expected: step1. ok.
1712      */
1713     const int actualCount = 10; // 10 is count of records
1714     InsertUserTableRecord(tableName_, actualCount);
1715 
1716     /**
1717      * @tc.steps:step2. set callback to check during sync.
1718      * @tc.expected: step2. ok.
1719      */
1720     std::atomic<int> count = 0;
1721     SetForkQueryForCloudPrioritySyncTest007(count);
1722 
1723     /**
1724      * @tc.steps:step3. perform priority normal priority normal sync.
1725      * @tc.expected: step3. ok.
1726      */
1727     std::vector<std::string> idValue = {"0"};
1728     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1729     CloudSyncOption option;
1730     PrepareOption(option, priorytyQuery, true);
1731     option.lockAction = static_cast<LockAction>(0xff); // lock all
1732     std::mutex callbackMutex;
1733     std::condition_variable callbackCv;
1734     size_t finishCount = 0u;
__anon6d2f4c682102(const std::map<std::string, SyncProcess> &process) 1735     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1736         for (const auto &item: process) {
1737             if (item.second.process == DistributedDB::FINISHED) {
1738                 {
1739                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1740                     finishCount++;
1741                 }
1742                 callbackCv.notify_one();
1743             }
1744         }
1745     };
1746     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1747     Query normalQuery = Query::Select().FromTable({tableName_});
1748     PrepareOption(option, normalQuery, false);
1749     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1750     idValue = {"1"};
1751     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1752     PrepareOption(option, priorytyQuery, true);
1753     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1754     PrepareOption(option, normalQuery, false);
1755     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1756     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682202() 1757     callbackCv.wait(callbackLock, [&finishCount]() {
1758         return (finishCount == 4u); // 4 is count of finished sync tasks
1759     });
1760     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1761 }
1762 
1763 /**
1764  * @tc.name: CloudPrioritySyncTest008
1765  * @tc.desc: priority normal priority normal when different query
1766  * @tc.type: FUNC
1767  * @tc.require:
1768  * @tc.author: chenchaohao
1769  */
1770 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level1)
1771 {
1772     /**
1773      * @tc.steps:step1. insert user table record.
1774      * @tc.expected: step1. ok.
1775      */
1776     const int actualCount = 10; // 10 is count of records
1777     InsertUserTableRecord(tableName_, actualCount);
1778 
1779     /**
1780      * @tc.steps:step2. set callback to check during sync.
1781      * @tc.expected: step2. ok.
1782      */
1783     std::atomic<int> count = 0;
1784     SetForkQueryForCloudPrioritySyncTest008(count);
1785 
1786     /**
1787      * @tc.steps:step3. perform priority normal priority normal sync.
1788      * @tc.expected: step3. ok.
1789      */
1790     std::vector<std::string> idValue = {"0"};
1791     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1792     CloudSyncOption option;
1793     option.lockAction = static_cast<LockAction>(0xff); // lock all
1794     PrepareOption(option, priorytyQuery, true);
1795     std::mutex callbackMutex;
1796     std::condition_variable callbackCv;
1797     size_t finishCount = 0u;
__anon6d2f4c682302(const std::map<std::string, SyncProcess> &process) 1798     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1799         for (const auto &item: process) {
1800             if (item.second.process == DistributedDB::FINISHED) {
1801                 {
1802                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1803                     finishCount++;
1804                 }
1805                 callbackCv.notify_one();
1806             }
1807         }
1808     };
1809     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1810     Query normalQuery = Query::Select().FromTable({tableName_});
1811     PrepareOption(option, normalQuery, false);
1812     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1813     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1814     PrepareOption(option, priorytyQuery, true);
1815     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1816     PrepareOption(option, normalQuery, false);
1817     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1818     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682402() 1819     callbackCv.wait(callbackLock, [&finishCount]() {
1820         return (finishCount == 4u); // 4 is count of finished sync tasks
1821     });
1822     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1823 }
1824 
1825 /**
1826  * @tc.name: CloudPrioritySyncTest009
1827  * @tc.desc: use priority sync interface when query equal to from table
1828  * @tc.type: FUNC
1829  * @tc.require:
1830  * @tc.author: zhangqiquan
1831  */
1832 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level1)
1833 {
1834     /**
1835      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1836      * @tc.expected: step1. ok.
1837      */
1838     const int actualCount = 5; // 5 is count of records
1839     InsertUserTableRecord(tableName_, actualCount);
1840     Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1841 
1842     /**
1843      * @tc.steps:step2. check ParserQueryNodes
1844      * @tc.expected: step2. ok.
1845      */
__anon6d2f4c682502(const std::string &tableName, VBucket &extend) 1846     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1847         EXPECT_EQ(tableName_, tableName);
1848         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1849         DBStatus status = OK;
1850         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1851         EXPECT_EQ(status, OK);
1852         ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1853     });
1854     BlockPrioritySync(query, delegate_, true, OK);
1855     virtualCloudDb_->ForkQuery(nullptr);
1856     CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1857 }
1858 
1859 /**
1860  * @tc.name: CloudPrioritySyncTest010
1861  * @tc.desc: priority sync after cloud delete
1862  * @tc.type: FUNC
1863  * @tc.require:
1864  * @tc.author: chenchaohao
1865  */
1866 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level1)
1867 {
1868     /**
1869      * @tc.steps:step1. insert user table record.
1870      * @tc.expected: step1. ok.
1871      */
1872     const int actualCount = 10; // 10 is count of records
1873     InsertUserTableRecord(tableName_, actualCount);
1874 
1875     /**
1876      * @tc.steps:step2. normal sync and then delete cloud records.
1877      * @tc.expected: step2. ok.
1878      */
1879     Query query = Query::Select().FromTable({tableName_});
1880     BlockSync(query, delegate_, g_actualDBStatus);
1881     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1882     DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1883     CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1884     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1885 
1886     /**
1887      * @tc.steps:step3. priory sync and set query then check user table records.
1888      * @tc.expected: step3. ok.
1889      */
1890     std::vector<std::string> idValue = {"3", "4", "5"};
1891     query = Query::Select().From(tableName_).In("id", idValue);
1892     BlockPrioritySync(query, delegate_, true, OK);
1893     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1894     idValue = {"0", "1", "2"};
1895     query = Query::Select().From(tableName_).In("id", idValue);
1896     BlockPrioritySync(query, delegate_, true, OK);
1897     CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1898 }
1899 
1900 /**
1901  * @tc.name: CloudPrioritySyncTest011
1902  * @tc.desc: priority sync after cloud insert
1903  * @tc.type: FUNC
1904  * @tc.require:
1905  * @tc.author: chenchaohao
1906  */
1907 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level1)
1908 {
1909     /**
1910      * @tc.steps:step1. insert cloud table record.
1911      * @tc.expected: step1. ok.
1912      */
1913     const int actualCount = 10; // 10 is count of records
1914     InsertCloudTableRecord(0, actualCount, actualCount, false);
1915     std::vector<std::string> idValue = {"0", "1", "2"};
1916     Query query = Query::Select().From(tableName_).In("id", idValue);
1917     std::atomic<int> count = 0;
1918 
1919     /**
1920      * @tc.steps:step2. check user records when query.
1921      * @tc.expected: step1. ok.
1922      */
__anon6d2f4c682602(const std::string &, VBucket &) 1923     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1924         count++;
1925         if (count == 1) { // taskid1
1926             std::this_thread::sleep_for(std::chrono::seconds(1));
1927         }
1928         if (count == 2) { // taskid2
1929             CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1930         }
1931     });
1932     CloudSyncOption option;
1933     PrepareOption(option, query, true);
1934     std::mutex callbackMutex;
1935     std::condition_variable callbackCv;
1936     size_t finishCount = 0u;
__anon6d2f4c682702(const std::map<std::string, SyncProcess> &process) 1937     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1938         for (const auto &item: process) {
1939             if (item.second.process == DistributedDB::FINISHED) {
1940                 {
1941                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1942                     finishCount++;
1943                 }
1944                 callbackCv.notify_one();
1945             }
1946         }
1947     };
1948 
1949     /**
1950      * @tc.steps:step3. begin sync and check user record.
1951      * @tc.expected: step3. ok.
1952      */
1953     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1954     idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1955     query = Query::Select().From(tableName_).In("id", idValue);
1956     PrepareOption(option, query, true);
1957     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1958     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682802() 1959     callbackCv.wait(callbackLock, [&finishCount]() {
1960         return (finishCount == 2u); // 2 is count of finished sync tasks
1961     });
1962     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1963 }
1964 
1965 /**
1966  * @tc.name: CloudPrioritySyncTest012
1967  * @tc.desc: priority or normal sync when waittime > 300s or < -1
1968  * @tc.type: FUNC
1969  * @tc.require:
1970  * @tc.author: chenchaohao
1971  */
1972 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level1)
1973 {
1974     /**
1975      * @tc.steps:step1. insert cloud table record.
1976      * @tc.expected: step1. ok.
1977      */
1978     const int actualCount = 10; // 10 is count of records
1979     InsertCloudTableRecord(0, actualCount, actualCount, false);
1980     std::vector<std::string> idValue = {"0", "1", "2"};
1981     Query query = Query::Select().From(tableName_).In("id", idValue);
1982 
1983     /**
1984      * @tc.steps:step2. set waittime < -1 then begin sync.
1985      * @tc.expected: step2. invalid.
1986      */
1987     CloudSyncOption option;
1988     PrepareOption(option, query, true);
1989     option.waitTime = -2; // -2 < -1;
1990     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1991     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1992 
1993     /**
1994      * @tc.steps:step3. set waittime > 300s then begin sync.
1995      * @tc.expected: step3. invalid.
1996      */
1997 
1998     option.waitTime = 300001; // 300001 > 300s
1999     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
2000     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
2001 }
2002 
2003 /**
2004  * @tc.name: CloudPrioritySyncTest013
2005  * @tc.desc: priority sync in some abnormal composite pk query situations
2006  * @tc.type: FUNC
2007  * @tc.require:
2008  * @tc.author: chenchaohao
2009  */
2010 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level1)
2011 {
2012     /**
2013      * @tc.steps:step1. insert user table record.
2014      * @tc.expected: step1. ok.
2015      */
2016     const int actualCount = 1; // 1 is count of records
2017     InsertUserTableRecord(tableName_, actualCount);
2018 
2019     /**
2020      * @tc.steps:step2. query only begingroup then priority sync.
2021      * @tc.expected: step2. invalid.
2022      */
2023     Query query = Query::Select().From(tableName_).BeginGroup();
2024     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2025     CheckCloudTableCount(tableName_, 0);
2026 
2027     /**
2028      * @tc.steps:step3. query only endgroup then priority sync.
2029      * @tc.expected: step3. invalid.
2030      */
2031     query = Query::Select().From(tableName_).EndGroup();
2032     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2033     CheckCloudTableCount(tableName_, 0);
2034 
2035     /**
2036      * @tc.steps:step4. query only begingroup and endgroup then priority sync.
2037      * @tc.expected: step4. invalid.
2038      */
2039     query = Query::Select().From(tableName_).BeginGroup().EndGroup();
2040     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2041     CheckCloudTableCount(tableName_, 0);
2042 
2043     /**
2044      * @tc.steps:step5. query and from table then priority sync.
2045      * @tc.expected: step5. invalid.
2046      */
2047     query = Query::Select().And().From(tableName_);
2048     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2049     CheckCloudTableCount(tableName_, 0);
2050 
2051     /**
2052      * @tc.steps:step6. query or from table then priority sync.
2053      * @tc.expected: step6. invalid.
2054      */
2055     query = Query::Select().Or().From(tableName_);
2056     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2057     CheckCloudTableCount(tableName_, 0);
2058 
2059     /**
2060      * @tc.steps:step7. query begingroup from table then priority sync.
2061      * @tc.expected: step7 invalid.
2062      */
2063     query = Query::Select().BeginGroup().From(tableName_);
2064     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2065     CheckCloudTableCount(tableName_, 0);
2066 
2067     /**
2068      * @tc.steps:step8. query endgroup from table then priority sync.
2069      * @tc.expected: step8 invalid.
2070      */
2071     query = Query::Select().EndGroup().From(tableName_);
2072     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2073     CheckCloudTableCount(tableName_, 0);
2074 
2075     /**
2076      * @tc.steps:step9. query and in then priority sync.
2077      * @tc.expected: step9. invalid.
2078      */
2079     std::vector<std::string> idValue = {"0"};
2080     query = Query::Select().From(tableName_).And().In("id", idValue);
2081     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2082     CheckCloudTableCount(tableName_, 0);
2083 
2084     /**
2085      * @tc.steps:step10. query when the table name does not exit then priority sync.
2086      * @tc.expected: step10. schema mismatch.
2087      */
2088     query = Query::Select().From("tableName").And().In("id", idValue);
2089     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2090     CheckCloudTableCount(tableName_, 0);
2091 
2092     /**
2093      * @tc.steps:step11. query when the table name does not exit then priority sync.
2094      * @tc.expected: step11. schema mismatch.
2095      */
2096     query = Query::Select().From("tableName").In("id", idValue);
2097     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2098     CheckCloudTableCount(tableName_, 0);
2099 
2100     /**
2101      * @tc.steps:step12. query when the table name does not exit then sync.
2102      * @tc.expected: step12. schema mismatch.
2103      */
2104     query = Query::Select().FromTable({"tableName"});
2105     BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
2106     CheckCloudTableCount(tableName_, 0);
2107 }
2108 
CheckUploadInfoAfterSync(int recordCount,SyncProcess & normalLast)2109 void DistributedDBCloudCheckSyncTest::CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast)
2110 {
2111     uint32_t uintRecordCount = static_cast<uint32_t>(recordCount);
2112     const Info expectUploadInfo = {2u, uintRecordCount, uintRecordCount, 0u, uintRecordCount, 0u, 0u};
2113     for (const auto &table : normalLast.tableProcess) {
2114         CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2115         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2116     }
2117 }
2118 
2119 /**
2120  * @tc.name: CloudPrioritySyncTest014
2121  * @tc.desc: Check the uploadInfo after the normal sync is paused by the priority sync
2122  * @tc.type: FUNC
2123  * @tc.require:
2124  * @tc.author: suyue
2125  */
2126 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest014, TestSize.Level1)
2127 {
2128     /**
2129      * @tc.steps:step1. insert data and sync pause.
2130      * @tc.expected: step1. ok.
2131      */
2132     const int recordCount = 50; // 50 is count of data records
2133     InsertUserTableRecord(tableName_, recordCount, 0);
2134     Query normalQuery = Query::Select().FromTable({tableName_});
2135     CloudSyncOption normalOption;
2136     PrepareOption(normalOption, normalQuery, false);
2137     bool isUpload = false;
2138     uint32_t blockTime = 1000;
__anon6d2f4c682902(const std::string &tableName, VBucket &extend) 2139     virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
2140         if (isUpload == false) {
2141             isUpload = true;
2142             std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
2143         }
2144     });
2145     bool isFinish = false;
2146     bool priorityFinish = false;
2147     SyncProcess normalLast;
__anon6d2f4c682a02(const std::map<std::string, SyncProcess> &process) 2148     auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2149         for (const auto &item : process) {
2150             if (item.second.process == DistributedDB::FINISHED) {
2151                 isFinish = true;
2152                 ASSERT_EQ(priorityFinish, true);
2153                 normalLast = item.second;
2154             }
2155         }
2156     };
2157     ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2158 
2159     /**
2160      * @tc.steps:step2. priority sync.
2161      * @tc.expected: step2. ok.
2162      */
2163     while (isUpload == false) {
2164         std::this_thread::sleep_for(std::chrono::milliseconds(50));
2165     }
2166     std::vector<std::string> idValues = {"0", "1", "2", "3", "4"};
2167     Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2168     CloudSyncOption priorityOption;
2169     PrepareOption(priorityOption, priorityQuery, true);
__anon6d2f4c682b02(const std::map<std::string, SyncProcess> &process) 2170     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2171         for (const auto &item : process) {
2172             if (item.second.process == DistributedDB::FINISHED) {
2173                 priorityFinish = true;
2174             }
2175         }
2176     };
2177     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2178     while (isFinish == false || priorityFinish == false) {
2179         std::this_thread::sleep_for(std::chrono::milliseconds(50));
2180     }
2181 
2182     /**
2183      * @tc.steps:step3. check uploadInfo after sync finished.
2184      * @tc.expected: step3. ok.
2185      */
2186     CheckUploadInfoAfterSync(recordCount, normalLast);
2187     virtualCloudDb_->ForkUpload(nullptr);
2188 }
2189 
2190 /**
2191  * @tc.name: CloudPrioritySyncTest015
2192  * @tc.desc: Check the uploadInfo and the downloadInfo after the normal sync is paused by the priority sync
2193  * @tc.type: FUNC
2194  * @tc.require:
2195  * @tc.author: caihaoting
2196  */
2197 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest015, TestSize.Level1)
2198 {
2199     /**
2200      * @tc.steps:step1. insert data and sync pause.
2201      * @tc.expected: step1. ok.
2202      */
2203     const int localCount = 10; // 10 is count of local data records
2204     const int cloudCount = 50; // 50 is count of cloud data records
2205     InsertUserTableRecord(tableName_, localCount, 0);
2206     InsertCloudTableRecord(20, cloudCount, 0, false); // 20 is begin number
2207     uint32_t blockTime = 500; // 500ms
2208     virtualCloudDb_->SetBlockTime(blockTime);
2209     Query normalQuery = Query::Select().FromTable({tableName_});
2210     CloudSyncOption normalOption;
2211     PrepareOption(normalOption, normalQuery, false);
2212     bool isFinish = false;
2213     bool priorityFinish = false;
2214     SyncProcess normalLast;
__anon6d2f4c682c02(const std::map<std::string, SyncProcess> &process) 2215     auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2216         for (const auto &item : process) {
2217             if (item.second.process == DistributedDB::FINISHED) {
2218                 isFinish = true;
2219                 ASSERT_EQ(priorityFinish, true);
2220                 normalLast = item.second;
2221             }
2222         }
2223     };
2224     ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2225 
2226     /**
2227      * @tc.steps:step2. priority sync.
2228      * @tc.expected: step2. ok.
2229      */
2230     std::vector<std::string> idValues = {"10", "11", "12", "13", "14"};
2231     Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2232     CloudSyncOption priorityOption;
2233     PrepareOption(priorityOption, priorityQuery, true);
__anon6d2f4c682d02(const std::map<std::string, SyncProcess> &process) 2234     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2235         for (const auto &item : process) {
2236             if (item.second.process == DistributedDB::FINISHED) {
2237                 priorityFinish = true;
2238             }
2239         }
2240     };
2241     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2242     while (isFinish == false || priorityFinish == false) {
2243         std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
2244     }
2245 
2246     /**
2247      * @tc.steps:step3. check uploadInfo and downloadInfo after sync finished.
2248      * @tc.expected: step3. ok.
2249      */
2250     uint32_t uintLocalCount = static_cast<uint32_t>(localCount);
2251     uint32_t uintCloudCount = static_cast<uint32_t>(cloudCount);
2252     const Info expectUploadInfo = {1u, uintLocalCount, uintLocalCount, 0u, uintLocalCount, 0u, 0u};
2253     const Info expectDownloadInfo = {1u, uintCloudCount, uintCloudCount, 0u, uintCloudCount, 0u, 0u};
2254     for (const auto &table : normalLast.tableProcess) {
2255         CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2256         CheckDownloadInfo(table.second.downLoadInfo, expectDownloadInfo);
2257         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2258     }
2259     CheckUserTableResult(db_, tableName_, 60);
2260 }
2261 
2262 /**
2263  * @tc.name: CloudPrioritySyncTest016
2264  * @tc.desc: priority sync when normal syncing
2265  * @tc.type: FUNC
2266  * @tc.require:
2267  * @tc.author: wangxiangdong
2268  */
2269 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level1)
2270 {
2271     /**
2272      * @tc.steps:step1. insert cloud table record.
2273      * @tc.expected: step1. ok.
2274      */
2275     const int actualCount = 60; // 60 is count of records
2276     InsertCloudTableRecord(0, actualCount, 0, false);
2277     InsertUserTableRecord(tableName_, 10);
2278 
2279     /**
2280      * @tc.steps:step2. begin normal sync and priority sync.
2281      * @tc.expected: step2. ok.
2282      */
2283     Query normalQuery = Query::Select().FromTable({tableName_});
2284     std::vector<std::string> idValue = {"0", "1", "2"};
2285     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
2286     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
2287     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
2288     virtualCloudDb_->Reset();
2289     CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
2290     /**
2291      * @tc.steps:step3. check sync process result.
2292      * @tc.expected: step3. ok.
2293      */
2294     std::vector<DistributedDB::SyncProcess> expectSyncResult = {
2295                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2296                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2297                 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
2298                 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2299                 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
2300         };
2301     EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
2302 }
2303 
2304 /**
2305  * @tc.name: LogicDeleteSyncTest001
2306  * @tc.desc: sync with logic delete
2307  * @tc.type: FUNC
2308  * @tc.require:
2309  * @tc.author: zhangqiquan
2310  */
2311 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level1)
2312 {
2313     bool logicDelete = true;
2314     auto data = static_cast<PragmaData>(&logicDelete);
2315     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2316     int actualCount = 10;
2317     InitLogicDeleteDataEnv(actualCount, true);
2318     CheckLocalCount(actualCount);
2319     std::string device = "";
2320     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
2321     CheckLocalCount(actualCount);
2322 }
2323 
2324 /**
2325  * @tc.name: LogicDeleteSyncTest002
2326  * @tc.desc: sync without logic delete
2327  * @tc.type: FUNC
2328  * @tc.require:
2329  * @tc.author: zhangqiquan
2330  */
2331 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level1)
2332 {
2333     bool logicDelete = false;
2334     auto data = static_cast<PragmaData>(&logicDelete);
2335     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2336     int actualCount = 10;
2337     InitLogicDeleteDataEnv(actualCount);
2338     CheckLocalCount(0);
2339 }
2340 
2341 /**
2342  * @tc.name: LogicDeleteSyncTest003
2343  * @tc.desc: sync with logic delete and check observer
2344  * @tc.type: FUNC
2345  * @tc.require:
2346  * @tc.author: bty
2347  */
2348 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level1)
2349 {
2350     /**
2351      * @tc.steps:step1. register observer.
2352      * @tc.expected: step1. ok.
2353      */
2354     RelationalStoreDelegate::Option option;
2355     auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
2356     ASSERT_NE(observer, nullptr);
2357     observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
2358     EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
2359     ChangedData expectData;
2360     expectData.tableName = tableName_;
2361     expectData.type = ChangedDataType::DATA;
2362     expectData.field.push_back(std::string("id"));
2363     const int count = 10;
2364     for (int64_t i = 0; i < count; ++i) {
2365         expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
2366     }
2367     expectData.properties = { .isTrackedDataChange = true };
2368     observer->SetExpectedResult(expectData);
2369 
2370     /**
2371      * @tc.steps:step2. set tracker table
2372      * @tc.expected: step2. ok.
2373      */
2374     TrackerSchema trackerSchema;
2375     trackerSchema.tableName = tableName_;
2376     trackerSchema.trackerColNames = { "id" };
2377     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2378 
2379     /**
2380      * @tc.steps:step3. set logic delete and sync
2381      * @tc.expected: step3. ok.
2382      */
2383     bool logicDelete = true;
2384     auto data = static_cast<PragmaData>(&logicDelete);
2385     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2386     int actualCount = 10;
2387     InitLogicDeleteDataEnv(actualCount);
2388     CheckLocalCount(actualCount);
2389     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2390     observer->ClearChangedData();
2391 
2392     /**
2393      * @tc.steps:step4. unSetTrackerTable and sync
2394      * @tc.expected: step4. ok.
2395      */
2396     expectData.properties = { .isTrackedDataChange = false };
2397     observer->SetExpectedResult(expectData);
2398     trackerSchema.trackerColNames = {};
2399     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2400     InsertUserTableRecord(tableName_, actualCount);
2401     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2402     for (int i = 0; i < actualCount + actualCount; ++i) {
2403         DeleteCloudTableRecord(i);
2404     }
2405     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2406     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2407 
2408     EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
2409     delete observer;
2410     observer = nullptr;
2411 }
2412 
2413 /**
2414  * @tc.name: LogicDeleteSyncTest004
2415  * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
2416  * @tc.type: FUNC
2417  * @tc.require:
2418  * @tc.author: chenchaohao
2419  */
2420 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level1)
2421 {
2422     /**
2423      * @tc.steps:step1. set logic delete
2424      * @tc.expected: step1. ok.
2425      */
2426     bool logicDelete = true;
2427     auto data = static_cast<PragmaData>(&logicDelete);
2428     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2429 
2430     /**
2431      * @tc.steps:step2. cloud delete data then sync, check removedevicedata
2432      * @tc.expected: step2. ok.
2433      */
2434     int actualCount = 10;
2435     InitLogicDeleteDataEnv(actualCount);
2436     CheckLocalCount(actualCount);
2437     std::string device = "";
2438     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
2439     CheckLocalCount(actualCount);
2440     CheckLogCleaned(0);
2441 }
2442 
2443 /**
2444  * @tc.name: LogicDeleteSyncTest005
2445  * @tc.desc: test pragma when set cmd is not logic delete
2446  * @tc.type: FUNC
2447  * @tc.require:
2448  * @tc.author: chenchaohao
2449  */
2450 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
2451 {
2452     /**
2453      * @tc.steps:step1. set cmd is auto sync
2454      * @tc.expected: step1. ok.
2455      */
2456     bool logicDelete = true;
2457     auto data = static_cast<PragmaData>(&logicDelete);
2458     EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
2459 }
2460 
2461 /**
2462  * @tc.name: LogicDeleteSyncTest006
2463  * @tc.desc: sync with logic delete after lock table.
2464  * @tc.type: FUNC
2465  * @tc.require:
2466  * @tc.author: liaoyonghuang
2467  */
2468 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level1)
2469 {
2470     /**
2471      * @tc.steps:step1. set logic delete
2472      * @tc.expected: step1. ok.
2473      */
2474     bool logicDelete = true;
2475     auto data = static_cast<PragmaData>(&logicDelete);
2476     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2477 
2478     /**
2479      * @tc.steps:step2. insert user table record and sync.
2480      * @tc.expected: step2. ok.
2481      */
2482     int dataCount = 10;
2483     InsertUserTableRecord(tableName_, dataCount);
2484     Query query = Query::Select().FromTable({ tableName_ });
2485     BlockSync(query, delegate_, g_actualDBStatus);
2486 
2487     /**
2488      * @tc.steps:step3. Lock log table, and delete data from cloud table.
2489      * @tc.expected: step3. ok.
2490      */
2491     std::vector<std::vector<uint8_t>> hashKey;
2492     CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
2493     Lock(tableName_, hashKey, db_);
2494     for (int i = 0; i < dataCount; ++i) {
2495         DeleteCloudTableRecord(i);
2496     }
2497     /**
2498      * @tc.steps:step4. sync.
2499      * @tc.expected: step4. ok.
2500      */
2501     std::vector<DBStatus> actualDBStatus;
2502     BlockSync(query, delegate_, actualDBStatus);
2503     for (auto status : actualDBStatus) {
2504         EXPECT_EQ(status, OK);
2505     }
2506 }
2507 
2508 /**
2509  * @tc.name: LogicDeleteSyncTest008
2510  * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2511  * @tc.type: FUNC
2512  * @tc.require:
2513  * @tc.author: liaoyonghuang
2514  */
2515 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level1)
2516 {
2517     /**
2518      * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2519      * @tc.expected: step1. ok.
2520      */
2521     int dataCount = 10;
2522     uint32_t logicDeleteCount = 4;
2523     InsertUserTableRecord(tableName_, dataCount);
2524     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2525         " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2526     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2527     InsertCloudTableRecord(0, dataCount, 0, false);
2528     sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2529     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2530         reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2531     /**
2532      * @tc.steps:step2. Do sync.
2533      * @tc.expected: step2. ok.
2534      */
2535     Query query = Query::Select().FromTable({ tableName_ });
2536     BlockSync(query, delegate_, g_actualDBStatus);
2537     /**
2538      * @tc.steps:step3. Check data flag in local DB.
2539      * @tc.expected: step3. No data flag is 0x800.
2540      */
2541     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2542         reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2543 }
2544 
2545 /**
2546  * @tc.name: LockActionTest001
2547  * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2548  * @tc.type: FUNC
2549  * @tc.require:
2550  * @tc.author: wangxiangdong
2551  */
2552 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2553 {
2554     /**
2555      * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2556      * @tc.expected: step1. ok.
2557      */
2558     CloudSyncOption option;
2559     option.devices = { "CLOUD" };
2560     option.mode = SYNC_MODE_CLOUD_MERGE;
2561     option.query = Query::Select().FromTable({ tableName_ });
2562     option.waitTime = g_syncWaitTime;
2563     auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2564                       | static_cast<uint32_t>(LockAction::DELETE);
2565     option.lockAction = static_cast<LockAction>(action);
2566     option.priorityTask = true;
2567     option.compensatedSyncOnly = true;
2568     const SyncProcessCallback onProcess;
2569     CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2570     EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2571 }
2572 
2573 /**
2574  * @tc.name: LogicCreateRepeatedTableNameTest001
2575  * @tc.desc: test create repeated table name with different cases
2576  * @tc.type: FUNC
2577  * @tc.require:
2578  * @tc.author: wangxiangdong
2579  */
2580 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2581 {
2582     /**
2583      * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2584      * @tc.expected: step1. operate successfully.
2585      */
2586     DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2587     ASSERT_EQ(createStatus, DBStatus::OK);
2588 }
2589 
2590 /**
2591  * @tc.name: SaveCursorTest001
2592  * @tc.desc: test whether cloud cursor is saved when first sync
2593  * @tc.type: FUNC
2594  * @tc.require:
2595  * @tc.author: chenchaohao
2596  */
2597 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level1)
2598 {
2599     /**
2600      * @tc.steps:step1. insert cloud records
2601      * @tc.expected: step1. OK
2602      */
2603     const int actualCount = 10;
2604     InsertCloudTableRecord(0, actualCount, 0, false);
2605 
2606     /**
2607      * @tc.steps:step2. check cursor when first sync
2608      * @tc.expected: step2. OK
2609      */
__anon6d2f4c682e02(const std::string &tableName, VBucket &extend) 2610     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2611         EXPECT_EQ(tableName_, tableName);
2612         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2613         EXPECT_EQ(cursor, "0");
2614     });
2615     Query query = Query::Select().FromTable({ tableName_ });
2616     BlockSync(query, delegate_, g_actualDBStatus);
2617     CheckLocalCount(actualCount);
2618 }
2619 
2620 /**
2621  * @tc.name: SaveCursorTest002
2622  * @tc.desc: test whether cloud cursor is saved when first download failed
2623  * @tc.type: FUNC
2624  * @tc.require:
2625  * @tc.author: chenchaohao
2626  */
2627 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level1)
2628 {
2629     /**
2630      * @tc.steps:step1. insert cloud records
2631      * @tc.expected: step1. OK
2632      */
2633     const int actualCount = 10;
2634     InsertCloudTableRecord(0, actualCount, 0, false);
2635 
2636     /**
2637      * @tc.steps:step2. set download failed
2638      * @tc.expected: step2. OK
2639      */
2640     virtualCloudDb_->SetCloudError(true);
2641     Query query = Query::Select().FromTable({ tableName_ });
2642     BlockPrioritySync(query, delegate_, false, OK);
2643     CheckLocalCount(0);
2644 
2645     /**
2646      * @tc.steps:step3. check cursor when query
2647      * @tc.expected: step3. OK
2648      */
2649     virtualCloudDb_->SetCloudError(false);
__anon6d2f4c682f02(const std::string &tableName, VBucket &extend) 2650     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2651         EXPECT_EQ(tableName_, tableName);
2652         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2653         EXPECT_EQ(cursor, "0");
2654     });
2655     BlockSync(query, delegate_, g_actualDBStatus);
2656     CheckLocalCount(actualCount);
2657 }
2658 
2659 /**
2660  * @tc.name: SaveCursorTest003
2661  * @tc.desc: test whether cloud cursor is saved when first upload failed
2662  * @tc.type: FUNC
2663  * @tc.require:
2664  * @tc.author: chenchaohao
2665  */
2666 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level1)
2667 {
2668     /**
2669      * @tc.steps:step1. insert local records
2670      * @tc.expected: step1. OK
2671      */
2672     const int actualCount = 10;
2673     InsertUserTableRecord(tableName_, actualCount);
2674 
2675     /**
2676      * @tc.steps:step2. set upload failed
2677      * @tc.expected: step2. OK
2678      */
2679     virtualCloudDb_->SetCloudError(true);
2680     Query query = Query::Select().FromTable({ tableName_ });
2681     BlockPrioritySync(query, delegate_, false, OK);
2682     CheckCloudTableCount(tableName_, 0);
2683 
2684     /**
2685      * @tc.steps:step3. check cursor when query
2686      * @tc.expected: step3. OK
2687      */
2688     virtualCloudDb_->SetCloudError(false);
__anon6d2f4c683002(const std::string &tableName, VBucket &extend) 2689     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2690         EXPECT_EQ(tableName_, tableName);
2691         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2692         EXPECT_EQ(cursor, "0");
2693     });
2694     BlockSync(query, delegate_, g_actualDBStatus);
2695     CheckCloudTableCount(tableName_, actualCount);
2696 }
2697 
2698 /**
2699  * @tc.name: RangeQuerySyncTest001
2700  * @tc.desc: Test sync that has option parameter with range query.
2701  * @tc.type: FUNC
2702  * @tc.require:
2703  * @tc.author: chenchaohao
2704  */
2705 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level1)
2706 {
2707     /**
2708      * @tc.steps:step1. insert user table record.
2709      * @tc.expected: step1. ok.
2710      */
2711     CloudSyncOption option;
2712     option.devices = { "CLOUD" };
2713     option.mode = SYNC_MODE_CLOUD_MERGE;
2714     option.waitTime = g_syncWaitTime;
2715     Query query = Query::Select().From(tableName_).Range({}, {});
2716     option.query = query;
2717 
2718     /**
2719      * @tc.steps:step2. test normal sync with range query.
2720      * @tc.expected: step2. not support.
2721      */
2722     option.priorityTask = false;
2723     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2724 
2725     /**
2726      * @tc.steps:step3. test Priority sync with range query.
2727      * @tc.expected: step3. not support.
2728      */
2729     option.priorityTask = true;
2730     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2731 }
2732 
2733 /*
2734  * @tc.name: RangeQuerySyncTest002
2735  * @tc.desc: Test sync that has not option parameter with range query.
2736  * @tc.type: FUNC
2737  * @tc.require:
2738  * @tc.author: mazhao
2739  */
2740 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2741 {
2742     Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2743     ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2744         DBStatus::NOT_SUPPORT);
2745 }
2746 
2747 /*
2748  * @tc.name: SameDataSync001
2749  * @tc.desc: Test query same data in one batch.
2750  * @tc.type: FUNC
2751  * @tc.require:
2752  * @tc.author: zqq
2753  */
2754 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level1)
2755 {
2756     /**
2757      * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2758      * @tc.expected: step1. OK
2759      */
2760     const int actualCount = 5;
2761     InsertCloudTableRecord(0, actualCount, 0, false);
2762     InsertCloudTableRecord(0, actualCount, 0, false);
2763     /**
2764      * @tc.steps:step2. call sync, local has one batch id:0-4
2765      * @tc.expected: step2. OK
2766      */
2767     Query query = Query::Select().FromTable({ tableName_ });
2768     BlockSync(query, delegate_, g_actualDBStatus);
2769     CheckLocalCount(actualCount);
2770 }
2771 
2772 /*
2773  * @tc.name: SameDataSync002
2774  * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2775  * @tc.type: FUNC
2776  * @tc.require:
2777  * @tc.author: liaoyonghuang
2778  */
2779 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2780 {
2781     /**
2782      * @tc.steps:step1. insert local 1 record and sync to cloud.
2783      * @tc.expected: step1. OK
2784      */
2785     const int actualCount = 1;
2786     InsertUserTableRecord(tableName_, actualCount);
2787     Query query = Query::Select().FromTable({ tableName_ });
2788     BlockSync(query, delegate_, g_actualDBStatus);
2789 
2790     /**
2791      * @tc.steps:step2. insert 2 records with the same primary key.
2792      * @tc.expected: step2. OK
2793      */
2794     std::vector<VBucket> record;
2795     std::vector<VBucket> extend;
2796     Timestamp now = TimeHelper::GetSysCurrentTime();
2797     VBucket data;
2798     std::vector<uint8_t> photo(0, 'v');
2799     data.insert_or_assign("id", std::string("0"));
2800     data.insert_or_assign("name", std::string("Cloud"));
2801     data.insert_or_assign("height", 166.0); // 166.0 is random double value
2802     data.insert_or_assign("married", false);
2803     data.insert_or_assign("photo", photo);
2804     data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2805     record.push_back(data);
2806     data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2807     record.push_back(data);
2808     VBucket log;
2809     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2810         now / CloudDbConstant::TEN_THOUSAND));
2811     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2812         now / CloudDbConstant::TEN_THOUSAND));
2813     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2814     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2815     extend.push_back(log);
2816     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2817     extend.push_back(log);
2818     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2819 
2820     /**
2821      * @tc.steps:step3. sync from cloud and check record.
2822      * @tc.expected: step3. The record with age of 14 has been updated locally.
2823      */
2824     BlockSync(query, delegate_, g_actualDBStatus);
2825     std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2826     int64_t actualAge = 0;
2827     int64_t expectAge = 14L;
__anon6d2f4c683102(sqlite3_stmt *stmt) 2828     RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2829         actualAge = sqlite3_column_int(stmt, 0);
2830         return E_OK;
2831     });
2832     EXPECT_EQ(actualAge, expectAge);
2833 }
2834 
2835 /*
2836  * @tc.name: CreateDistributedTable001
2837  * @tc.desc: Test create distributed table when table not empty.
2838  * @tc.type: FUNC
2839  * @tc.require:
2840  * @tc.author: zqq
2841  */
2842 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level1)
2843 {
2844     const std::string table = "CreateDistributedTable001";
2845     const std::string createSQL =
2846         "CREATE TABLE IF NOT EXISTS " + table + "(" \
2847         "id TEXT PRIMARY KEY," \
2848         "name TEXT," \
2849         "height REAL ," \
2850         "photo BLOB," \
2851         "age INT);";
2852     ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2853     int actualCount = 10;
2854     InsertUserTableRecord(table, actualCount);
2855     InsertCloudTableRecord(table, 0, actualCount, 0, true);
2856     ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2857     DataBaseSchema dataBaseSchema = GetSchema();
2858     TableSchema schema = dataBaseSchema.tables.at(0);
2859     schema.name = table;
2860     schema.sharedTableName = "";
2861     dataBaseSchema.tables.push_back(schema);
2862     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2863     /**
2864      * @tc.steps:step2. call sync, local has one batch id:0-4
2865      * @tc.expected: step2. OK
2866      */
2867     Query query = Query::Select().FromTable({ table });
2868     BlockSync(query, delegate_, g_actualDBStatus);
2869     CheckCloudTableCount(table, actualCount);
2870 }
2871 
2872 /*
2873  * @tc.name: CloseDbTest001
2874  * @tc.desc: Test process of db close during sync
2875  * @tc.type: FUNC
2876  * @tc.require:
2877  * @tc.author: bty
2878  */
2879 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2880 {
2881     /**
2882      * @tc.steps:step1. insert user table record.
2883      * @tc.expected: step1. ok.
2884      */
2885     const int actualCount = 10; // 10 is count of records
2886     InsertUserTableRecord(tableName_, actualCount);
2887 
2888     /**
2889      * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2890      * @tc.expected: step2. ok.
2891      */
2892     std::mutex callMutex;
2893     int callCount = 0;
__anon6d2f4c683202(const std::string &, VBucket &) 2894     virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2895         std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2896     });
2897     const auto callback = [&callCount, &callMutex](
__anon6d2f4c683302( const std::map<std::string, SyncProcess> &) 2898         const std::map<std::string, SyncProcess> &) {
2899         {
2900             std::lock_guard<std::mutex> autoLock(callMutex);
2901             callCount++;
2902         }
2903     };
2904     Query query = Query::Select().FromTable({ tableName_ });
2905     ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2906     std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2907     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2908     delegate_ = nullptr;
2909     mgr_ = nullptr;
2910 
2911     /**
2912      * @tc.steps:step3. wait for 2 seconds to check the process call count.
2913      * @tc.expected: step3. ok.
2914      */
2915     std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2916     EXPECT_EQ(callCount, 0L);
2917 }
2918 
2919 /*
2920  * @tc.name: ConsistentFlagTest001
2921  * @tc.desc: Test the consistency flag of no asset table
2922  * @tc.type: FUNC
2923  * @tc.require:
2924  * @tc.author: bty
2925  */
2926 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2927 {
2928     /**
2929      * @tc.steps:step1. init data and sync
2930      * @tc.expected: step1. ok.
2931      */
2932     const int localCount = 20; // 20 is count of local
2933     const int cloudCount = 10; // 10 is count of cloud
2934     InsertUserTableRecord(tableName_, localCount);
2935     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2936     Query query = Query::Select().FromTable({ tableName_ });
2937     BlockSync(query, delegate_, g_actualDBStatus);
2938 
2939     /**
2940      * @tc.steps:step2. check the 0x20 bit of flag after sync
2941      * @tc.expected: step2. ok.
2942      */
2943     std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2944         " where flag&0x20=0;";
2945     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2946         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2947 
2948     /**
2949      * @tc.steps:step3. delete local data and check
2950      * @tc.expected: step3. ok.
2951      */
2952     std::string sql = "delete from " + tableName_ + " where id = '1';";
2953     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2954     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2955         reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2956 
2957     /**
2958      * @tc.steps:step4. check the 0x20 bit of flag after sync
2959      * @tc.expected: step4. ok.
2960      */
2961     BlockSync(query, delegate_, g_actualDBStatus);
2962     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2963         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2964 }
2965 
SyncDataStatusTest(bool isCompensatedSyncOnly)2966 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2967 {
2968     /**
2969      * @tc.steps:step1. init data and sync
2970      * @tc.expected: step1. ok.
2971      */
2972     const int localCount = 20; // 20 is count of local
2973     const int cloudCount = 10; // 10 is count of cloud
2974     InsertUserTableRecord(tableName_, localCount);
2975     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2976     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2977     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2978     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2979     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2980     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2981     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2982     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2983     Query query = Query::Select().FromTable({tableName_});
2984 
2985     /**
2986      * @tc.steps:step2. check count
2987      * @tc.expected: step2. ok.
2988      */
2989     int64_t syncCount = 2;
2990     BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2991     if (!isCompensatedSyncOnly) {
2992         std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2993     }
2994     std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2995     std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2996     CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2997     if (isCompensatedSyncOnly) {
2998         querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2999         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3000         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
3001         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3002         querySql = preSql + " where status=0 and cloud_gid ='';";
3003         int unSyncCount = 14; // 14 is the num of unSync data with status 0
3004         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
3005     } else {
3006         // gid 12、13 are upload insert, lock to lock_change
3007         querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
3008         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3009         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
3010         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3011         querySql = preSql + " where status=0 and cloud_gid !='';";
3012         int unSyncCount = 16; // 16 is the num of sync finish
3013         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
3014     }
3015 }
3016 
3017 /*
3018  * @tc.name: SyncDataStatusTest001
3019  * @tc.desc: Test the status after compensated sync the no asset table
3020  * @tc.type: FUNC
3021  * @tc.require:
3022  * @tc.author: bty
3023  */
3024 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
3025 {
3026     SyncDataStatusTest(true);
3027 }
3028 
3029 /*
3030  * @tc.name: SyncDataStatusTest002
3031  * @tc.desc: Test the status after normal sync the no asset table
3032  * @tc.type: FUNC
3033  * @tc.require:
3034  * @tc.author: bty
3035  */
3036 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
3037 {
3038     SyncDataStatusTest(false);
3039 }
3040 }
3041 #endif
3042