• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42     "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43     "id TEXT PRIMARY KEY," \
44     "name TEXT," \
45     "height REAL ," \
46     "photo BLOB," \
47     "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49     "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50     "id TEXT," \
51     "name TEXT," \
52     "height REAL ," \
53     "photo BLOB," \
54     "age INT);";
55 const int64_t g_syncWaitTime = 60;
56 
57 const Asset g_cloudAsset = {
58     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61 
62 std::vector<DBStatus> g_actualDBStatus;
63 
CreateUserDBAndTable(sqlite3 * & db)64 void CreateUserDBAndTable(sqlite3 *&db)
65 {
66     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
67     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
68     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
69 }
70 
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)71 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
72 {
73     option.devices = { "CLOUD" };
74     option.mode = SYNC_MODE_CLOUD_MERGE;
75     option.query = query;
76     option.waitTime = g_syncWaitTime;
77     option.priorityTask = isPriorityTask;
78     option.compensatedSyncOnly = isCompensatedSyncOnly;
79 }
80 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)81 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
82     bool prioritySync = false)
83 {
84     std::mutex dataMutex;
85     std::condition_variable cv;
86     bool finish = false;
87     auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
88         for (const auto &item: process) {
89             actualDBStatus.push_back(item.second.errCode);
90             if (item.second.process == DistributedDB::FINISHED) {
91                 {
92                     std::lock_guard<std::mutex> autoLock(dataMutex);
93                     finish = true;
94                 }
95                 cv.notify_one();
96             }
97         }
98     };
99     CloudSyncOption option;
100     option.devices = { "CLOUD" };
101     option.mode = SYNC_MODE_CLOUD_MERGE;
102     option.query = query;
103     option.waitTime = g_syncWaitTime;
104     option.priorityTask = prioritySync;
105     ASSERT_EQ(delegate->Sync(option, callback), OK);
106     std::unique_lock<std::mutex> uniqueLock(dataMutex);
107     cv.wait(uniqueLock, [&finish]() {
108         return finish;
109     });
110 }
111 
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)112 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
113     bool isCompensatedSyncOnly = false)
114 {
115     std::mutex dataMutex;
116     std::condition_variable cv;
117     bool finish = false;
118     auto callback = [&cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
119         for (const auto &item: process) {
120             if (item.second.process == DistributedDB::FINISHED) {
121                 {
122                     std::lock_guard<std::mutex> autoLock(dataMutex);
123                     finish = true;
124                 }
125                 cv.notify_one();
126             }
127         }
128     };
129     CloudSyncOption option;
130     PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
131     ASSERT_EQ(delegate->Sync(option, callback), expectResult);
132     if (expectResult == OK) {
133         std::unique_lock<std::mutex> uniqueLock(dataMutex);
134         cv.wait(uniqueLock, [&finish]() {
135             return finish;
136         });
137     }
138 }
139 
BlockCompensatedSync(const Query & query,RelationalStoreDelegate * delegate,DBStatus expectResult,const std::function<void (const std::map<std::string,SyncProcess> & syncProcess)> & processCallback)140 void BlockCompensatedSync(const Query &query, RelationalStoreDelegate *delegate, DBStatus expectResult,
141     const std::function<void(const std::map<std::string, SyncProcess> &syncProcess)> &processCallback)
142 {
143     std::mutex dataMutex;
144     std::condition_variable cv;
145     bool finish = false;
146     auto callback = [&processCallback, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
147         for (const auto &item: process) {
148             if (item.second.process == DistributedDB::FINISHED) {
149                 {
150                     std::lock_guard<std::mutex> autoLock(dataMutex);
151                     finish = true;
152                 }
153                 cv.notify_one();
154             }
155         }
156         if (processCallback != nullptr) {
157             processCallback(process);
158         }
159     };
160     CloudSyncOption option;
161     PrepareOption(option, query, false, true);
162     ASSERT_EQ(delegate->Sync(option, callback), expectResult);
163     if (expectResult == OK) {
164         std::unique_lock<std::mutex> uniqueLock(dataMutex);
165         cv.wait(uniqueLock, [&finish]() {
166             return finish;
167         });
168     }
169 }
170 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)171 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
172 {
173     if (count != 1) {
174         return 0;
175     }
176     auto expectCount = reinterpret_cast<int64_t>(data);
177     EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
178     return 0;
179 }
180 
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)181 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
182 {
183     string query = "select count(*) from " + tableName + ";";
184     EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
185         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
186 }
187 
188 class DistributedDBCloudCheckSyncTest : public testing::Test {
189 public:
190     static void SetUpTestCase();
191     static void TearDownTestCase();
192     void SetUp() override;
193     void TearDown() override;
194 protected:
195     void InitTestDir();
196     DataBaseSchema GetSchema();
197     void CloseDb();
198     void InitDataAndSync();
199     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
200     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
201     void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
202         bool assetIsNull);
203     void DeleteUserTableRecord(int64_t id);
204     void DeleteUserTableRecord(int64_t begin, int64_t end);
205     void DeleteCloudTableRecord(int64_t gid);
206     void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
207     bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
208     bool CheckSyncProcessInner(SyncProcess &actualSyncProcess, SyncProcess &expectSyncProcess);
209     bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
210         vector<SyncProcess> &expectSyncProcessV);
211     void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
212         RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
213         bool isCheckProcess);
214     void DeleteCloudDBData(int64_t begin, int64_t count);
215     void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
216     void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
217     void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
218     void CheckLocalCount(int64_t expectCount);
219     void CheckLogCleaned(int64_t expectCount);
220     void CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo);
221     void CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo);
222     void SyncDataStatusTest(bool isCompensatedSyncOnly);
223     void CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast);
224     std::string testDir_;
225     std::string storePath_;
226     sqlite3 *db_ = nullptr;
227     RelationalStoreDelegate *delegate_ = nullptr;
228     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
229     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
230     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
231     std::string tableName_ = "DistributedDBCloudCheckSyncTest";
232     std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
233     std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
234     std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
235     std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
236     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
237 };
238 
SetUpTestCase()239 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
240 {
241     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
242 }
243 
TearDownTestCase()244 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
245 {}
246 
SetUp()247 void DistributedDBCloudCheckSyncTest::SetUp()
248 {
249     DistributedDBToolsUnitTest::PrintTestCaseInfo();
250     InitTestDir();
251     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
252         LOGE("rm test db files error.");
253     }
254     DistributedDBToolsUnitTest::PrintTestCaseInfo();
255     LOGD("Test dir is %s", testDir_.c_str());
256     db_ = RelationalTestUtils::CreateDataBase(storePath_);
257     ASSERT_NE(db_, nullptr);
258     CreateUserDBAndTable(db_);
259     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
260     RelationalStoreDelegate::Option option;
261     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
262     ASSERT_NE(delegate_, nullptr);
263     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
264     ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
265     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
266     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
267     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
268     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
269     DataBaseSchema dataBaseSchema = GetSchema();
270     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
271     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
272     ASSERT_TRUE(communicatorAggregator_ != nullptr);
273     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
274 }
275 
TearDown()276 void DistributedDBCloudCheckSyncTest::TearDown()
277 {
278     virtualCloudDb_->ForkQuery(nullptr);
279     virtualCloudDb_->SetCloudError(false);
280     CloseDb();
281     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
282     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
283         LOGE("rm test db files error.");
284     }
285     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
286     communicatorAggregator_ = nullptr;
287     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
288 }
289 
InitTestDir()290 void DistributedDBCloudCheckSyncTest::InitTestDir()
291 {
292     if (!testDir_.empty()) {
293         return;
294     }
295     DistributedDBToolsUnitTest::TestDirInit(testDir_);
296     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
297     LOGI("The test db is:%s", testDir_.c_str());
298 }
299 
GetSchema()300 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
301 {
302     DataBaseSchema schema;
303     TableSchema tableSchema;
304     tableSchema.name = tableName_;
305     tableSchema.sharedTableName = tableNameShared_;
306     tableSchema.fields = {
307         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
308         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
309     };
310     TableSchema tableWithoutPrimaryKeySchema;
311     tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
312     tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
313     tableWithoutPrimaryKeySchema.fields = {
314         {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
315         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
316     };
317     schema.tables.push_back(tableSchema);
318     schema.tables.push_back(tableWithoutPrimaryKeySchema);
319     return schema;
320 }
321 
CloseDb()322 void DistributedDBCloudCheckSyncTest::CloseDb()
323 {
324     virtualCloudDb_ = nullptr;
325     if (mgr_ != nullptr) {
326         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
327         delegate_ = nullptr;
328         mgr_ = nullptr;
329     }
330 }
331 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)332 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
333     int64_t recordCounts, int64_t begin)
334 {
335     ASSERT_NE(db_, nullptr);
336     for (int64_t i = begin; i < begin + recordCounts; ++i) {
337         string sql = "INSERT OR REPLACE INTO " + tableName
338             + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
339             + std::to_string(i) + "', '155.10',  'text', '21');";
340         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
341     }
342 }
343 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)344 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
345     bool assetIsNull)
346 {
347     InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
348 }
349 
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)350 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
351     int64_t photoSize, bool assetIsNull)
352 {
353     std::vector<uint8_t> photo(photoSize, 'v');
354     std::vector<VBucket> record1;
355     std::vector<VBucket> extend1;
356     std::vector<VBucket> record2;
357     std::vector<VBucket> extend2;
358     Timestamp now = TimeHelper::GetSysCurrentTime();
359     for (int64_t i = begin; i < begin + count; ++i) {
360         VBucket data;
361         data.insert_or_assign("id", std::to_string(i));
362         data.insert_or_assign("name", "Cloud" + std::to_string(i));
363         data.insert_or_assign("height", 166.0); // 166.0 is random double value
364         data.insert_or_assign("married", false);
365         data.insert_or_assign("photo", photo);
366         data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
367         Asset asset = g_cloudAsset;
368         asset.name = asset.name + std::to_string(i);
369         assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
370         record1.push_back(data);
371         VBucket log;
372         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
373             now / CloudDbConstant::TEN_THOUSAND + i));
374         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
375             now / CloudDbConstant::TEN_THOUSAND + i));
376         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
377         extend1.push_back(log);
378 
379         std::vector<Asset> assets;
380         data.insert_or_assign("height", 180.3); // 180.3 is random double value
381         for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
382             asset.name = g_cloudAsset.name + std::to_string(j);
383             assets.push_back(asset);
384         }
385         data.erase("assert");
386         data.erase("married");
387         assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
388         record2.push_back(data);
389         extend2.push_back(log);
390     }
391     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
392     std::this_thread::sleep_for(std::chrono::milliseconds(count));
393 }
394 
DeleteUserTableRecord(int64_t id)395 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
396 {
397     ASSERT_NE(db_, nullptr);
398     string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
399     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
400 }
401 
DeleteUserTableRecord(int64_t begin,int64_t end)402 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
403 {
404     ASSERT_NE(db_, nullptr);
405     std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
406     for (int64_t i = begin; i <= end; ++i) {
407         sql += "'" + std::to_string(i) + "',";
408     }
409     if (sql.back() == ',') {
410         sql.pop_back();
411     }
412     sql += ");";
413     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
414 }
415 
DeleteCloudTableRecord(int64_t gid)416 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
417 {
418     VBucket idMap;
419     idMap.insert_or_assign("#_gid", std::to_string(gid));
420     ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
421 }
422 
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)423 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
424 {
425     VBucket extend;
426     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
427     int64_t realCount = 0;
428     std::vector<VBucket> data;
429     virtualCloudDb_->Query(tableName, extend, data);
430     for (size_t j = 0; j < data.size(); ++j) {
431         auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
432         if (entry != data[j].end() && std::get<bool>(entry->second)) {
433             continue;
434         }
435         realCount++;
436     }
437     EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
438 }
439 
CheckSyncCount(const Info actualInfo,const Info expectInfo)440 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
441 {
442     if (actualInfo.batchIndex != expectInfo.batchIndex) {
443         return false;
444     }
445     if (actualInfo.total != expectInfo.total) {
446         return false;
447     }
448     if (actualInfo.successCount != expectInfo.successCount) {
449         return false;
450     }
451     if (actualInfo.failCount != expectInfo.failCount) {
452         return false;
453     }
454     return true;
455 }
456 
CheckSyncProcessInner(SyncProcess & actualSyncProcess,SyncProcess & expectSyncProcess)457 bool DistributedDBCloudCheckSyncTest::CheckSyncProcessInner(SyncProcess &actualSyncProcess,
458     SyncProcess &expectSyncProcess)
459 {
460     for (const auto &itInner : actualSyncProcess.tableProcess) {
461         std::string tableName = itInner.first;
462         if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
463             return false;
464         }
465         TableProcessInfo actualTableProcessInfo = itInner.second;
466         TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
467         if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
468             return false;
469         }
470         if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
471             return false;
472         }
473     }
474     return true;
475 }
476 
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)477 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
478     std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
479 {
480     vector<map<string, SyncProcess>> expectSyncProcess;
481     for (auto syncProcess : expectSyncProcessV) {
482         map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
483         expectSyncProcess.emplace_back(expectSyncProcessMap);
484     }
485     for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
486         map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
487         map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
488         for (auto &it : actualSyncProcessMap) {
489             string mapKey = it.first;
490             if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
491                 return false;
492             }
493             SyncProcess actualSyncProcess = it.second;
494             SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
495             if (!CheckSyncProcessInner(actualSyncProcess, expectSyncProcess)) {
496                 return false;
497             }
498         }
499     }
500     return true;
501 }
502 
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)503 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
504     RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
505     bool isCheckProcess)
506 {
507     std::mutex dataMutex;
508     std::condition_variable cv;
509     bool normalFinish = false;
510     bool priorityFinish = false;
511     auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
512         const std::map<std::string, SyncProcess> &process) {
513         auto foundFinishedProcess = std::find_if(process.begin(), process.end(), [](const auto &item) {
514             return item.second.process == DistributedDB::FINISHED;
515         });
516         if (foundFinishedProcess != process.end()) {
517             normalFinish = true;
518             if (isCheckProcess) {
519                 ASSERT_EQ(priorityFinish, true);
520             }
521             cv.notify_one();
522         }
523         prioritySyncProcess.emplace_back(process);
524     };
525     auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
526         const std::map<std::string, SyncProcess> &process) {
527         auto it = std::find_if(process.begin(), process.end(), [](const auto &item) {
528             return item.second.process == DistributedDB::FINISHED;
529         });
530         if (it != process.end()) {
531             priorityFinish = true;
532             cv.notify_one();
533         }
534         prioritySyncProcess.emplace_back(process);
535     };
536     CloudSyncOption option;
537     PrepareOption(option, normalQuery, false);
538     virtualCloudDb_->SetBlockTime(500); // 500 ms
539     ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
540     PrepareOption(option, priorityQuery, true);
541     std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
542     ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
543     std::unique_lock<std::mutex> uniqueLock(dataMutex);
544     cv.wait(uniqueLock, [&normalFinish]() {
545         return normalFinish;
546     });
547 }
548 
DeleteCloudDBData(int64_t begin,int64_t count)549 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
550 {
551     for (int64_t i = begin; i < begin + count; i++) {
552         VBucket idMap;
553         idMap.insert_or_assign("#_gid", std::to_string(i));
554         ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
555     }
556 }
557 
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)558 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
559 {
560     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
561         count++;
562         if (count == 1) { // taskid1
563             std::this_thread::sleep_for(std::chrono::seconds(1));
564         }
565         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
566             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
567         }
568         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
569             CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
570         }
571         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
572             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
573         }
574     });
575 }
576 
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)577 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
578 {
579     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
580         count++;
581         if (count == 1) { // taskid1
582             std::this_thread::sleep_for(std::chrono::seconds(1));
583         }
584         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
585             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
586         }
587         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
588             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
589         }
590         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
591             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
592         }
593     });
594 }
595 
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)596 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
597 {
598     // prepare data
599     InsertUserTableRecord(tableName_, dataCount);
600     // sync
601     Query query = Query::Select().FromTable({ tableName_ });
602     BlockSync(query, delegate_, g_actualDBStatus);
603     // delete cloud data
604     for (int i = 0; i < dataCount; ++i) {
605         DeleteCloudTableRecord(i);
606     }
607     // sync again
608     BlockSync(query, delegate_, g_actualDBStatus);
609 }
610 
CheckLocalCount(int64_t expectCount)611 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
612 {
613     // check local data
614     int dataCnt = -1;
615     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
616     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
617         dataCnt = sqlite3_column_int(stmt, 0);
618         return E_OK;
619     });
620     EXPECT_EQ(dataCnt, expectCount);
621 }
622 
CheckLogCleaned(int64_t expectCount)623 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
624 {
625     std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
626         " where device = 'cloud';";
627     EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
628         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
629     std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
630         " is not null and cloud_gid != '';";
631     EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
632         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
633     std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
634         " where flag & 0x02 != 0;";
635     EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
636         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
637 }
638 
CheckUploadInfo(const Info & actualUploadInfo,const Info & expectUploadInfo)639 void DistributedDBCloudCheckSyncTest::CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo)
640 {
641     EXPECT_EQ(actualUploadInfo.batchIndex, expectUploadInfo.batchIndex);
642     EXPECT_EQ(actualUploadInfo.total, expectUploadInfo.total);
643     EXPECT_EQ(actualUploadInfo.successCount, expectUploadInfo.successCount);
644     EXPECT_EQ(actualUploadInfo.failCount, expectUploadInfo.failCount);
645     EXPECT_EQ(actualUploadInfo.insertCount, expectUploadInfo.insertCount);
646     EXPECT_EQ(actualUploadInfo.updateCount, expectUploadInfo.updateCount);
647     EXPECT_EQ(actualUploadInfo.deleteCount, expectUploadInfo.deleteCount);
648 }
649 
CheckDownloadInfo(const Info & actualDownloadInfo,const Info & expectDownloadInfo)650 void DistributedDBCloudCheckSyncTest::CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo)
651 {
652     EXPECT_EQ(actualDownloadInfo.batchIndex, expectDownloadInfo.batchIndex);
653     EXPECT_EQ(actualDownloadInfo.total, expectDownloadInfo.total);
654     EXPECT_EQ(actualDownloadInfo.successCount, expectDownloadInfo.successCount);
655     EXPECT_EQ(actualDownloadInfo.failCount, expectDownloadInfo.failCount);
656     EXPECT_EQ(actualDownloadInfo.insertCount, expectDownloadInfo.insertCount);
657     EXPECT_EQ(actualDownloadInfo.updateCount, expectDownloadInfo.updateCount);
658     EXPECT_EQ(actualDownloadInfo.deleteCount, expectDownloadInfo.deleteCount);
659 }
660 
661 /**
662  * @tc.name: CloudSyncTest001
663  * @tc.desc: sync with device sync query
664  * @tc.type: FUNC
665  * @tc.require:
666  * @tc.author: zhangqiquan
667  */
668 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level0)
669 {
670     // prepare data
671     const int actualCount = 10;
672     InsertUserTableRecord(tableName_, actualCount);
673     // sync twice
674     Query query = Query::Select().FromTable({ tableName_ });
675     BlockSync(query, delegate_, g_actualDBStatus);
676     BlockSync(query, delegate_, g_actualDBStatus);
677     // remove cloud data
678     delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
679     // check local data
680     int dataCnt = -1;
681     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon28bcdbc61002(sqlite3_stmt *stmt) 682     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
683         dataCnt = sqlite3_column_int(stmt, 0);
684         return E_OK;
685     });
686     EXPECT_EQ(dataCnt, 0);
687 }
688 
689 /**
690  * @tc.name: CloudSyncTest002
691  * @tc.desc: sync with same data in one batch
692  * @tc.type: FUNC
693  * @tc.require:
694  * @tc.author: zhangqiquan
695  */
696 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level0)
697 {
698     // prepare data
699     const int actualCount = 1;
700     InsertUserTableRecord(tableName_, actualCount);
701     // sync twice
702     Query query = Query::Select().FromTable({ tableName_ });
703     BlockSync(query, delegate_, g_actualDBStatus);
704     // cloud delete id=0 and insert id=0 but its gid is 1
705     // local delete id=0
706     DeleteCloudTableRecord(0); // cloud gid is 0
707     InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
708     DeleteUserTableRecord(0); // 0 is id
709     BlockSync(query, delegate_, g_actualDBStatus);
710     bool deleteStatus = true;
711     EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
712     EXPECT_EQ(deleteStatus, false);
713 }
714 
715 /**
716  * @tc.name: CloudSyncTest003
717  * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
718  * @tc.type: FUNC
719  * @tc.require:
720  * @tc.author: zhangshijie
721  */
722 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level0)
723 {
724     // prepare data
725     const int actualCount = 1;
726     InsertUserTableRecord(tableName_, actualCount);
727 
728     InsertCloudTableRecord(0, actualCount, 0, false);
729     // delete local data
730     DeleteUserTableRecord(0);
731     Query query = Query::Select().FromTable({ tableName_ });
732     BlockSync(query, delegate_, g_actualDBStatus);
733 
734     // check local data, cloud date will insert into local
735     int dataCnt = -1;
736     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon28bcdbc61102(sqlite3_stmt *stmt) 737     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
738         dataCnt = sqlite3_column_int(stmt, 0);
739         return E_OK;
740     });
741     EXPECT_EQ(dataCnt, actualCount);
742 }
743 
744 /**
745  * @tc.name: CloudSyncTest004
746  * @tc.desc: sync after insert failed
747  * @tc.type: FUNC
748  * @tc.require:
749  * @tc.author: zhangqiquan
750  */
751 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level0)
752 {
753     // prepare data
754     const int actualCount = 1;
755     InsertUserTableRecord(tableName_, actualCount);
756     // sync twice
757     Query query = Query::Select().FromTable({ tableName_ });
758     LOGW("Block Sync");
759     virtualCloudDb_->SetInsertFailed(1);
760     BlockSync(query, delegate_, g_actualDBStatus);
761     // delete local data
762     DeleteUserTableRecord(0); // 0 is id
763     LOGW("Block Sync");
764     // sync again and this record with be synced to cloud
765     BlockSync(query, delegate_, g_actualDBStatus);
766     bool deleteStatus = true;
767     EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
768     EXPECT_EQ(deleteStatus, true);
769 }
770 
771 /**
772  * @tc.name: CloudSyncTest005
773  * @tc.desc: check device in process after sync
774  * @tc.type: FUNC
775  * @tc.require:
776  * @tc.author: liaoyonghuang
777  */
778 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest005, TestSize.Level0)
779 {
780     /**
781      * @tc.steps:step1. init data and sync
782      * @tc.expected: step1. ok.
783      */
784     const int localCount = 20; // 20 is count of local
785     const int cloudCount = 10; // 10 is count of cloud
786     InsertUserTableRecord(tableName_, localCount);
787     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
788     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
789     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
790 
791     /**
792      * @tc.steps:step2. check device name in process
793      * @tc.expected: step2. ok.
794      */
795     Query query = Query::Select().FromTable({tableName_});
__anon28bcdbc61202(const std::map<std::string, SyncProcess> &syncProcess) 796     auto callback = [](const std::map<std::string, SyncProcess> &syncProcess) {
797         EXPECT_TRUE(syncProcess.find("CLOUD") != syncProcess.end());
798     };
799     BlockCompensatedSync(query, delegate_, OK, callback);
800 }
801 
InitDataAndSync()802 void DistributedDBCloudCheckSyncTest::InitDataAndSync()
803 {
804     const int localCount = 120; // 120 is count of local
805     const int cloudCount = 100; // 100 is count of cloud
806     InsertUserTableRecord(tableName_, localCount, 0);
807     InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
808     InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false); // 80 is begin sync number
809 }
810 
811 /**
812  * @tc.name: CloudSyncTest006
813  * @tc.desc: check reDownload when common sync pause.
814  * @tc.type: FUNC
815  * @tc.require:
816  * @tc.author: luoguo
817  */
818 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level0)
819 {
820     /**
821      * @tc.steps:step1. init data and sync
822      * @tc.expected: step1. ok.
823      */
824     InitDataAndSync();
825 
826     /**
827      * @tc.steps:step2. common sync will pause
828      * @tc.expected: step2. ok.
829      */
830     std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
831     Query normalQuery = Query::Select().FromTable({tableNames});
832     std::vector<std::string> idValue = {"0", "1", "2"};
833     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
834     CloudSyncOption option;
835     CloudSyncOption priorityOption;
836     PrepareOption(option, normalQuery, false);
837     PrepareOption(priorityOption, priorityQuery, true);
838     bool isUpload = false;
839     uint32_t blockTime = 2000;
__anon28bcdbc61302(const std::string &tableName, VBucket &extend) 840     virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
841         if (isUpload == false) {
842             isUpload = true;
843             std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
844         }
845     });
846     bool isFinished = false;
847     bool priorityFinish = false;
__anon28bcdbc61402(const std::map<std::string, SyncProcess> &process) 848     auto normalCallback = [&isFinished, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
849         for (const auto &item : process) {
850             if (item.second.process == DistributedDB::FINISHED) {
851                 isFinished = true;
852                 ASSERT_EQ(priorityFinish, true);
853             }
854         }
855     };
856     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
857 
858     /**
859      * @tc.steps:step3. wait common upload and priority sync.
860      * @tc.expected: step3. ok.
861      */
862     while (isUpload == false) {
863         std::this_thread::sleep_for(std::chrono::milliseconds(50));
864     }
__anon28bcdbc61502(const std::map<std::string, SyncProcess> &process) 865     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
866         for (const auto &item : process) {
867             if (item.second.process == DistributedDB::FINISHED) {
868                 priorityFinish = true;
869             }
870         }
871     };
872     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
873     while (isFinished == false || priorityFinish == false) {
874         std::this_thread::sleep_for(std::chrono::milliseconds(50));
875     }
876 
877     /**
878      * @tc.steps:step4. wait common sync and priority sync finish, check query Times.
879      * @tc.expected: step4. ok.
880      */
881     uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
882     ASSERT_EQ(times, 3u);
883     virtualCloudDb_->ForkUpload(nullptr);
884 }
885 
886 /**
887  * @tc.name: CloudSyncTest007
888  * @tc.desc: check process info when version conflict sync process.
889  * @tc.type: FUNC
890  * @tc.require:
891  * @tc.author: luoguo
892  */
893 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level0)
894 {
895     /**
896      * @tc.steps:step1. init data and sync
897      * @tc.expected: step1. ok.
898      */
899     const int localCount = 60;
900     InsertUserTableRecord(tableName_, localCount, 0);
901     Query query = Query::Select().FromTable({tableName_});
902     BlockSync(query, delegate_, g_actualDBStatus);
903 
904     /**
905      * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
906      * @tc.expected: step2. ok.
907      */
908     DeleteUserTableRecord(30, 59);
909     bool isUpload = false;
__anon28bcdbc61602(const std::string &tableName, VBucket &extend) 910     virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
911         if (isUpload == false) {
912             isUpload = true;
913             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
914         }
915     });
916     bool isFinished = false;
917     std::map<std::string, TableProcessInfo> retSyncProcess;
__anon28bcdbc61702(const std::map<std::string, SyncProcess> &process) 918     auto normalCallback = [&isFinished, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
919         for (const auto &item : process) {
920             if (item.second.process == DistributedDB::FINISHED) {
921                 isFinished = true;
922                 ASSERT_EQ(process.empty(), false);
923                 auto lastProcess = process.rbegin();
924                 retSyncProcess = lastProcess->second.tableProcess;
925             }
926         }
927     };
928 
929     /**
930      * @tc.steps:step3. sync.
931      * @tc.expected: step3. ok.
932      */
933     std::vector<std::string> tableNames = {tableName_};
934     Query normalQuery = Query::Select().FromTable({tableNames});
935     CloudSyncOption option;
936     PrepareOption(option, normalQuery, false);
937     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
938 
939     /**
940      * @tc.steps:step4. wait upload process and delete 30 record in cloud table.
941      * @tc.expected: step4. ok.
942      */
943     while (isUpload == false) {
944         std::this_thread::sleep_for(std::chrono::milliseconds(50));
945     }
946     DeleteCloudTableRecord(30);
947 
948     /**
949      * @tc.steps:step5. wait sync process end and check data.
950      * @tc.expected: step5. ok.
951      */
952     while (isFinished == false) {
953         std::this_thread::sleep_for(std::chrono::milliseconds(50));
954     }
955     ASSERT_EQ(retSyncProcess.empty(), false);
956     auto taskInfo = retSyncProcess.rbegin();
957     ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
958     virtualCloudDb_->ForkUpload(nullptr);
959 }
960 
961 /**
962  * @tc.name: CloudSyncTest008
963  * @tc.desc: test when normal sync interrupted by priority sync, process info should be consistent
964  * @tc.type: FUNC
965  * @tc.require:
966  * @tc.author: suyuchen
967  */
968 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest008, TestSize.Level0)
969 {
970     /**
971      * @tc.steps: step1. insert 35 records to user table
972      * @tc.expected: step1. OK.
973      */
974     const int localCount = 35;
975     InsertUserTableRecord(tableName_, localCount, 0);
976     Query query = Query::Select().FromTable({tableName_});
977 
978     /**
979      * @tc.steps: step2. Set CLOUD_VERSION_CONFLICT when normal sync task upload
980      * @tc.expected: step2. OK.
981      */
982     int recordIndex = 0;
983     virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon28bcdbc61802(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 984         vector<VirtualCloudDb::CloudData> &cloudDataVec) {
985         recordIndex++;
986         if (recordIndex == 20) {
987             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
988             std::this_thread::sleep_for(std::chrono::seconds(1));
989             return CLOUD_VERSION_CONFLICT;
990         }
991         return OK;
992     });
993 
994     /**
995      * @tc.steps: step3. set callback function for normal sync
996      * @tc.expected: step3. OK.
997      */
998     std::map<std::string, TableProcessInfo> retSyncProcess;
__anon28bcdbc61902(const std::map<std::string, SyncProcess> &process) 999     auto normalCallback = [&retSyncProcess](const std::map<std::string, SyncProcess> &process) {
1000         for (const auto &item : process) {
1001             if (item.second.process == DistributedDB::FINISHED) {
1002                 ASSERT_EQ(process.empty(), false);
1003                 auto lastProcess = process.rbegin();
1004                 retSyncProcess = lastProcess->second.tableProcess;
1005             }
1006         }
1007     };
1008 
1009     /**
1010      * @tc.steps: step4. start normal sync
1011      * @tc.expected: step4. OK.
1012      */
1013     CloudSyncOption option;
1014     PrepareOption(option, query, false);
1015     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
__anon28bcdbc61a02() 1016     std::thread syncThread1([&]() {
1017         BlockSync(query, delegate_, g_actualDBStatus);
1018     });
1019 
1020     /**
1021      * @tc.steps: step5. start priority sync
1022      * @tc.expected: step5. OK.
1023      */
1024     std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anon28bcdbc61b02() 1025     std::thread syncThread2([&]() {
1026         BlockSync(query, delegate_, g_actualDBStatus, true);
1027     });
1028     syncThread1.join();
1029     syncThread2.join();
1030 
1031     /**
1032      * @tc.steps: step6. Check notification of normal sync
1033      * @tc.expected: step6. OK.
1034      */
1035     ASSERT_EQ(retSyncProcess.empty(), false);
1036     auto taskInfo = retSyncProcess.rbegin();
1037     ASSERT_EQ(taskInfo->second.upLoadInfo.total, 35u);
1038     ASSERT_EQ(taskInfo->second.upLoadInfo.successCount, 35u);
1039     ASSERT_EQ(taskInfo->second.upLoadInfo.failCount, 0u);
1040 
1041     virtualCloudDb_->ForkInsertConflict(nullptr);
1042 }
1043 
1044 /**
1045  * @tc.name: CloudSyncTest009
1046  * @tc.desc: reopen database and sync
1047  * @tc.type: FUNC
1048  * @tc.require:
1049  * @tc.author: wangxiangdong
1050  */
1051 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level0)
1052 {
1053     /**
1054      * @tc.steps: step1. insert 1 record to user table
1055      * @tc.expected: step1. OK.
1056      */
1057     const int actualCount = 1;
1058     InsertUserTableRecord(tableName_, actualCount);
1059     /**
1060      * @tc.steps: step2. sync data to cloud
1061      * @tc.expected: step2. OK.
1062      */
1063     Query query = Query::Select().FromTable({ tableName_ });
1064     BlockSync(query, delegate_, g_actualDBStatus);
1065     CheckCloudTableCount(tableName_, 1);
1066     /**
1067      * @tc.steps: step3. drop data table then close db
1068      * @tc.expected: step3. OK.
1069      */
1070     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1071     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1072     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1073     delegate_ = nullptr;
1074     /**
1075      * @tc.steps: step4. recreate data table and reopen database
1076      * @tc.expected: step4. OK.
1077      */
1078     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1079     RelationalStoreDelegate::Option option;
1080     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1081     ASSERT_NE(delegate_, nullptr);
1082     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1083     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1084     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1085     DataBaseSchema dataBaseSchema = GetSchema();
1086     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1087     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1088     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1089     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1090     /**
1091      * @tc.steps: step5. sync and cloud data should be deleted
1092      * @tc.expected: step5. OK.
1093      */
1094     BlockSync(query, delegate_, g_actualDBStatus);
1095     CheckCloudTableCount(tableName_, 0);
1096 }
1097 
1098 /**
1099  * @tc.name: CloudSyncTest010
1100  * @tc.desc: reopen database, recreate table with less columns and sync
1101  * @tc.type: FUNC
1102  * @tc.require:
1103  * @tc.author: wangxiangdong
1104  */
1105 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level0)
1106 {
1107     /**
1108      * @tc.steps: step1. insert 1 record to user table
1109      * @tc.expected: step1. OK.
1110      */
1111     const int actualCount = 1;
1112     InsertUserTableRecord(tableName_, actualCount);
1113     /**
1114      * @tc.steps: step2. sync data to cloud
1115      * @tc.expected: step2. OK.
1116      */
1117     Query query = Query::Select().FromTable({ tableName_ });
1118     BlockSync(query, delegate_, g_actualDBStatus);
1119     CheckCloudTableCount(tableName_, 1);
1120     /**
1121      * @tc.steps: step3. drop data table then close db
1122      * @tc.expected: step3. OK.
1123      */
1124     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1125     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1126     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1127     delegate_ = nullptr;
1128     /**
1129      * @tc.steps: step4. recreate data table and reopen database
1130      * @tc.expected: step4. OK.
1131      */
1132     std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
1133     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
1134     RelationalStoreDelegate::Option option;
1135     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1136     ASSERT_NE(delegate_, nullptr);
1137     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::SCHEMA_MISMATCH);
1138     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1139     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1140     DataBaseSchema dataBaseSchema = GetSchema();
1141     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1142     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1143     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1144     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1145     /**
1146      * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1147      * @tc.expected: step5. OK.
1148      */
1149     BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1150     CheckCloudTableCount(tableName_, 1);
1151 }
1152 
1153 /**
1154  * @tc.name: CloudSyncTest011
1155  * @tc.desc: reopen database, do not recreate table and sync
1156  * @tc.type: FUNC
1157  * @tc.require:
1158  * @tc.author: wangxiangdong
1159  */
1160 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest011, TestSize.Level0)
1161 {
1162     /**
1163      * @tc.steps: step1. insert 1 record to user table
1164      * @tc.expected: step1. OK.
1165      */
1166     const int actualCount = 1;
1167     InsertUserTableRecord(tableName_, actualCount);
1168     /**
1169      * @tc.steps: step2. sync data to cloud
1170      * @tc.expected: step2. OK.
1171      */
1172     Query query = Query::Select().FromTable({ tableName_ });
1173     BlockSync(query, delegate_, g_actualDBStatus);
1174     CheckCloudTableCount(tableName_, 1);
1175     /**
1176      * @tc.steps: step3. drop data table then close db
1177      * @tc.expected: step3. OK.
1178      */
1179     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1180     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1181     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1182     delegate_ = nullptr;
1183     /**
1184      * @tc.steps: step4. reopen database
1185      * @tc.expected: step4. OK.
1186      */
1187     RelationalStoreDelegate::Option option;
1188     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1189     ASSERT_NE(delegate_, nullptr);
1190     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1191     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1192     DataBaseSchema dataBaseSchema = GetSchema();
1193     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1194     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1195     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1196     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1197     /**
1198      * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1199      * @tc.expected: step5. OK.
1200      */
1201     BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1202 }
1203 
1204 /**
1205  * @tc.name: CloudSyncTest012
1206  * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1207  * @tc.type: FUNC
1208  * @tc.require:
1209  * @tc.author: wangxiangdong
1210  */
1211 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest012, TestSize.Level0)
1212 {
1213     /**
1214      * @tc.steps: step1. insert 1 record to user table
1215      * @tc.expected: step1. OK.
1216      */
1217     const int actualCount = 1;
1218     InsertUserTableRecord(tableName_, actualCount);
1219     /**
1220      * @tc.steps: step2. sync data to cloud
1221      * @tc.expected: step2. OK.
1222      */
1223     Query query = Query::Select().FromTable({ tableName_ });
1224     BlockSync(query, delegate_, g_actualDBStatus);
1225     CheckCloudTableCount(tableName_, 1);
1226     /**
1227      * @tc.steps: step3. drop data table then close db
1228      * @tc.expected: step3. OK.
1229      */
1230     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1231     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1232     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1233     delegate_ = nullptr;
1234     /**
1235      * @tc.steps: step4. recreate data table and reopen database
1236      * @tc.expected: step4. OK.
1237      */
1238     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1239     RelationalStoreDelegate::Option option;
1240     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1241     ASSERT_NE(delegate_, nullptr);
1242     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1243     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1244     DataBaseSchema dataBaseSchema = GetSchema();
1245     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1246     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1247     ASSERT_TRUE(communicatorAggregator_ != nullptr);
1248     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1249     /**
1250      * @tc.steps: step5. insert data to new table
1251      * @tc.expected: step5. OK.
1252      */
1253     int begin = 1;
1254     InsertUserTableRecord(tableName_, actualCount, begin);
1255     /**
1256      * @tc.steps: step6. sync and cloud data should be deleted
1257      * @tc.expected: step6. OK.
1258      */
1259     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1260     BlockSync(query, delegate_, g_actualDBStatus);
1261     CheckCloudTableCount(tableName_, 1);
1262 }
1263 
1264 /**
1265  * @tc.name: CloudSyncObserverTest001
1266  * @tc.desc: test cloud sync multi observer
1267  * @tc.type: FUNC
1268  * @tc.require:
1269  * @tc.author: zhangshijie
1270  */
1271 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level0)
1272 {
1273     // prepare data
1274     const int actualCount = 10;
1275     InsertUserTableRecord(tableName_, actualCount);
1276 
1277     /**
1278      * @tc.steps:step1. open two delegate with two observer.
1279      * @tc.expected: step1. ok.
1280      */
1281     RelationalStoreDelegate::Option option;
1282     auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
1283     ASSERT_NE(observer1, nullptr);
1284     option.observer = observer1;
1285     RelationalStoreDelegate *delegate1 = nullptr;
1286     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1287     ASSERT_NE(delegate1, nullptr);
1288 
1289     auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
1290     ASSERT_NE(observer2, nullptr);
1291     option.observer = observer2;
1292     RelationalStoreDelegate *delegate2 = nullptr;
1293     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
1294     ASSERT_NE(delegate2, nullptr);
1295 
1296     /**
1297      * @tc.steps:step2. insert 1-10 cloud data, start.
1298      * @tc.expected: step2. ok.
1299      */
1300     InsertCloudTableRecord(0, actualCount, actualCount, false);
1301     Query query = Query::Select().FromTable({ tableName_ });
1302     BlockSync(query, delegate_, g_actualDBStatus);
1303 
1304     /**
1305      * @tc.steps:step3. check observer.
1306      * @tc.expected: step3. ok.
1307      */
1308     EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1309     EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1310 
1311     /**
1312      * @tc.steps:step4. insert 11-20 cloud data, start.
1313      * @tc.expected: step4. ok.
1314      */
1315     delegate2->UnRegisterObserver();
1316     observer2->ResetCloudSyncToZero();
1317     int64_t begin = 11;
1318     InsertCloudTableRecord(begin, actualCount, actualCount, false);
1319     BlockSync(query, delegate_, g_actualDBStatus);
1320 
1321     /**
1322      * @tc.steps:step5. check observer.
1323      * @tc.expected: step5. ok.
1324      */
1325     EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1326     EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1327 
1328     delete observer1;
1329     observer1 = nullptr;
1330     EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1331 
1332     delete observer2;
1333     observer2 = nullptr;
1334     EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1335 }
1336 
1337 /**
1338  * @tc.name: CloudPrioritySyncTest001
1339  * @tc.desc: use priority sync interface when query in or from table
1340  * @tc.type: FUNC
1341  * @tc.require:
1342  * @tc.author: chenchaohao
1343  */
1344 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level0)
1345 {
1346     /**
1347      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1348      * @tc.expected: step1. ok.
1349      */
1350     const int actualCount = 10; // 10 is count of records
1351     InsertUserTableRecord(tableName_, actualCount);
1352     std::vector<std::string> idValue = {"0", "1", "2"};
1353     Query query = Query::Select().From(tableName_).In("id", idValue);
1354 
1355     /**
1356      * @tc.steps:step2. check ParserQueryNodes
1357      * @tc.expected: step2. ok.
1358      */
__anon28bcdbc61c02(const std::string &tableName, VBucket &extend) 1359     virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1360         EXPECT_EQ(tableName_, tableName);
1361         if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1362             return;
1363         }
1364         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1365         DBStatus status = OK;
1366         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1367         EXPECT_EQ(status, OK);
1368         ASSERT_EQ(queryNodes.size(), 1u);
1369         EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1370         EXPECT_EQ(queryNodes[0].fieldName, "id");
1371         ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1372         for (size_t i = 0u; i < idValue.size(); i++) {
1373             std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1374             EXPECT_EQ(val, idValue[i]);
1375         }
1376     });
1377     BlockPrioritySync(query, delegate_, true, OK);
1378     virtualCloudDb_->ForkQuery(nullptr);
1379     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1380 
1381     /**
1382      * @tc.steps:step3. use priority sync interface but not priority.
1383      * @tc.expected: step3. ok.
1384      */
1385     query = Query::Select().FromTable({ tableName_ });
1386     BlockPrioritySync(query, delegate_, false, OK);
1387     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1388 
1389     /**
1390      * @tc.steps:step4. insert user table record and query from table, then priority sync.
1391      * @tc.expected: step4. ok.
1392      */
1393     InsertUserTableRecord(tableName_, actualCount, actualCount);
1394     BlockPrioritySync(query, delegate_, true, OK);
1395     CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1396 }
1397 
1398 
1399 /**
1400  * @tc.name: CloudPrioritySyncTest002
1401  * @tc.desc: priority sync in some abnormal query situations
1402  * @tc.type: FUNC
1403  * @tc.require:
1404  * @tc.author: chenchaohao
1405  */
1406 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level0)
1407 {
1408     /**
1409      * @tc.steps:step1. insert user table record.
1410      * @tc.expected: step1. ok.
1411      */
1412     const int actualCount = 1; // 1 is count of records
1413     InsertUserTableRecord(tableName_, actualCount);
1414 
1415     /**
1416      * @tc.steps:step2. query select tablename then priority sync.
1417      * @tc.expected: step2. invalid.
1418      */
1419     Query query = Query::Select(tableName_);
1420     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1421     CheckCloudTableCount(tableName_, 0);
1422 
1423     /**
1424      * @tc.steps:step3. query select without from then priority sync.
1425      * @tc.expected: step3. invalid.
1426      */
1427     query = Query::Select();
1428     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1429     CheckCloudTableCount(tableName_, 0);
1430 
1431     /**
1432      * @tc.steps:step4. query select and from without in then priority sync.
1433      * @tc.expected: step4. invalid.
1434      */
1435     query = Query::Select().From(tableName_);
1436     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1437     CheckCloudTableCount(tableName_, 0);
1438 
1439     /**
1440      * @tc.steps:step5. query select and fromtable then priority sync.
1441      * @tc.expected: step5. not support.
1442      */
1443     query = Query::Select().From(tableName_).FromTable({tableName_});
1444     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1445     CheckCloudTableCount(tableName_, 0);
1446 
1447     /**
1448      * @tc.steps:step6. query select and from with other predicates then priority sync.
1449      * @tc.expected: step6. not support.
1450      */
1451     query = Query::Select().From(tableName_).IsNotNull("id");
1452     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1453     CheckCloudTableCount(tableName_, 0);
1454 
1455     /**
1456      * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1457      * @tc.expected: step7 not support.
1458      */
1459     std::vector<std::string> idValue = {"0"};
1460     query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1461     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1462     CheckCloudTableCount(tableName_, 0);
1463 
1464     /**
1465      * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1466      * @tc.expected: step8. not support.
1467      */
1468     std::vector<std::string> heightValue = {"155.10"};
1469     query = Query::Select().From(tableName_).In("height", heightValue);
1470     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1471     CheckCloudTableCount(tableName_, 0);
1472 
1473     /**
1474      * @tc.steps:step9. query in count greater than 100.
1475      * @tc.expected: step9. over max limits.
1476      */
1477     idValue.resize(101); // 101 > 100
1478     query = Query::Select().From(tableName_).In("id", idValue);
1479     BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1480     CheckCloudTableCount(tableName_, 0);
1481 }
1482 
1483 /**
1484  * @tc.name: CloudPrioritySyncTest003
1485  * @tc.desc: priority sync when normal syncing
1486  * @tc.type: FUNC
1487  * @tc.require:
1488  * @tc.author: chenchaohao
1489  */
1490 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level0)
1491 {
1492     /**
1493      * @tc.steps:step1. insert user table record.
1494      * @tc.expected: step1. ok.
1495      */
1496     const int actualCount = 10; // 10 is count of records
1497     InsertUserTableRecord(tableName_, actualCount);
1498 
1499     /**
1500      * @tc.steps:step2. begin normal sync and priority sync.
1501      * @tc.expected: step2. ok.
1502      */
1503     Query normalQuery = Query::Select().FromTable({tableName_});
1504     std::vector<std::string> idValue = {"0", "1", "2"};
1505     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1506     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1507     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1508     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1509     virtualCloudDb_->Reset();
1510     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1511     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1512 }
1513 
1514 /**
1515  * @tc.name: CloudPrioritySyncTest004
1516  * @tc.desc: non-primarykey table priority sync
1517  * @tc.type: FUNC
1518  * @tc.require:
1519  * @tc.author: chenchaohao
1520  */
1521 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level0)
1522 {
1523     /**
1524      * @tc.steps:step1. insert user non-primarykey table record.
1525      * @tc.expected: step1. ok.
1526      */
1527     const int actualCount = 10; // 10 is count of records
1528     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1529 
1530     /**
1531      * @tc.steps:step2. begin priority sync.
1532      * @tc.expected: step2. not support.
1533      */
1534     std::vector<std::string> idValue = {"0", "1", "2"};
1535     Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1536     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1537     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1538 
1539     /**
1540      * @tc.steps:step3. begin priority sync when in rowid.
1541      * @tc.expected: step3. invalid.
1542      */
1543     std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1544     query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1545     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1546     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1547 }
1548 
1549 /**
1550  * @tc.name: CloudPrioritySyncTest005
1551  * @tc.desc: priority sync but don't have records
1552  * @tc.type: FUNC
1553  * @tc.require:
1554  * @tc.author: chenchaohao
1555  */
1556 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level0)
1557 {
1558     /**
1559      * @tc.steps:step1. insert user non-primarykey table record.
1560      * @tc.expected: step1. ok.
1561      */
1562     const int actualCount = 10; // 10 is count of records
1563     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1564 
1565     /**
1566      * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1567      * @tc.expected: step2. ok.
1568      */
1569     std::vector<std::string> idValue = {"0", "1", "2"};
1570     Query query = Query::Select().From(tableName_).In("id", idValue);
1571     BlockPrioritySync(query, delegate_, true, OK);
1572     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1573     CheckCloudTableCount(tableName_, 0);
1574 }
1575 
1576 /**
1577  * @tc.name: CloudPrioritySyncTest006
1578  * @tc.desc: priority sync tasks greater than limit
1579  * @tc.type: FUNC
1580  * @tc.require:
1581  * @tc.author: chenchaohao
1582  */
1583 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level0)
1584 {
1585     /**
1586      * @tc.steps:step1. insert user table record.
1587      * @tc.expected: step1. ok.
1588      */
1589     const int actualCount = 10; // 10 is count of records
1590     InsertUserTableRecord(tableName_, actualCount);
1591 
1592     /**
1593      * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1594      * @tc.expected: step2. ok.
1595      */
1596     std::vector<std::string> idValue = {"0", "1", "2"};
1597     Query query = Query::Select().From(tableName_).In("id", idValue);
1598     std::mutex dataMutex;
1599     std::condition_variable cv;
1600     std::mutex callbackMutex;
1601     std::condition_variable callbackCv;
1602     bool finish = false;
1603     size_t finishCount = 0u;
__anon28bcdbc61d02(const std::string &tableName, VBucket &extend) 1604     virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1605         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1606         cv.wait(uniqueLock, [&finish]() {
1607             return finish;
1608         });
1609     });
__anon28bcdbc61f02(const std::map<std::string, SyncProcess> &process) 1610     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1611         for (const auto &item: process) {
1612             if (item.second.process == DistributedDB::FINISHED) {
1613                 {
1614                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1615                     finishCount++;
1616                 }
1617                 callbackCv.notify_one();
1618             }
1619         }
1620     };
1621     CloudSyncOption option;
1622     PrepareOption(option, query, true);
1623     for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1624         ASSERT_EQ(delegate_->Sync(option, callback), OK);
1625     }
1626     ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1627     {
1628         std::lock_guard<std::mutex> autoLock(dataMutex);
1629         finish = true;
1630     }
1631     cv.notify_all();
1632     virtualCloudDb_->ForkQuery(nullptr);
1633     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62002() 1634     callbackCv.wait(callbackLock, [&finishCount]() {
1635         return (finishCount == 32u); // 32 is count of finished sync tasks
1636     });
1637     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1638 }
1639 
1640 /**
1641  * @tc.name: CloudPrioritySyncTest007
1642  * @tc.desc: priority normal priority normal when different query
1643  * @tc.type: FUNC
1644  * @tc.require:
1645  * @tc.author: chenchaohao
1646  */
1647 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level0)
1648 {
1649     /**
1650      * @tc.steps:step1. insert user table record.
1651      * @tc.expected: step1. ok.
1652      */
1653     const int actualCount = 10; // 10 is count of records
1654     InsertUserTableRecord(tableName_, actualCount);
1655 
1656     /**
1657      * @tc.steps:step2. set callback to check during sync.
1658      * @tc.expected: step2. ok.
1659      */
1660     std::atomic<int> count = 0;
1661     SetForkQueryForCloudPrioritySyncTest007(count);
1662 
1663     /**
1664      * @tc.steps:step3. perform priority normal priority normal sync.
1665      * @tc.expected: step3. ok.
1666      */
1667     std::vector<std::string> idValue = {"0"};
1668     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1669     CloudSyncOption option;
1670     PrepareOption(option, priorytyQuery, true);
1671     option.lockAction = static_cast<LockAction>(0xff); // lock all
1672     std::mutex callbackMutex;
1673     std::condition_variable callbackCv;
1674     size_t finishCount = 0u;
__anon28bcdbc62102(const std::map<std::string, SyncProcess> &process) 1675     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1676         for (const auto &item: process) {
1677             if (item.second.process == DistributedDB::FINISHED) {
1678                 {
1679                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1680                     finishCount++;
1681                 }
1682                 callbackCv.notify_one();
1683             }
1684         }
1685     };
1686     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1687     Query normalQuery = Query::Select().FromTable({tableName_});
1688     PrepareOption(option, normalQuery, false);
1689     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1690     idValue = {"1"};
1691     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1692     PrepareOption(option, priorytyQuery, true);
1693     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1694     PrepareOption(option, normalQuery, false);
1695     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1696     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62202() 1697     callbackCv.wait(callbackLock, [&finishCount]() {
1698         return (finishCount == 4u); // 4 is count of finished sync tasks
1699     });
1700     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1701 }
1702 
1703 /**
1704  * @tc.name: CloudPrioritySyncTest008
1705  * @tc.desc: priority normal priority normal when different query
1706  * @tc.type: FUNC
1707  * @tc.require:
1708  * @tc.author: chenchaohao
1709  */
1710 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level0)
1711 {
1712     /**
1713      * @tc.steps:step1. insert user table record.
1714      * @tc.expected: step1. ok.
1715      */
1716     const int actualCount = 10; // 10 is count of records
1717     InsertUserTableRecord(tableName_, actualCount);
1718 
1719     /**
1720      * @tc.steps:step2. set callback to check during sync.
1721      * @tc.expected: step2. ok.
1722      */
1723     std::atomic<int> count = 0;
1724     SetForkQueryForCloudPrioritySyncTest008(count);
1725 
1726     /**
1727      * @tc.steps:step3. perform priority normal priority normal sync.
1728      * @tc.expected: step3. ok.
1729      */
1730     std::vector<std::string> idValue = {"0"};
1731     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1732     CloudSyncOption option;
1733     option.lockAction = static_cast<LockAction>(0xff); // lock all
1734     PrepareOption(option, priorytyQuery, true);
1735     std::mutex callbackMutex;
1736     std::condition_variable callbackCv;
1737     size_t finishCount = 0u;
__anon28bcdbc62302(const std::map<std::string, SyncProcess> &process) 1738     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1739         for (const auto &item: process) {
1740             if (item.second.process == DistributedDB::FINISHED) {
1741                 {
1742                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1743                     finishCount++;
1744                 }
1745                 callbackCv.notify_one();
1746             }
1747         }
1748     };
1749     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1750     Query normalQuery = Query::Select().FromTable({tableName_});
1751     PrepareOption(option, normalQuery, false);
1752     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1753     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1754     PrepareOption(option, priorytyQuery, true);
1755     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1756     PrepareOption(option, normalQuery, false);
1757     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1758     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62402() 1759     callbackCv.wait(callbackLock, [&finishCount]() {
1760         return (finishCount == 4u); // 4 is count of finished sync tasks
1761     });
1762     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1763 }
1764 
1765 /**
1766  * @tc.name: CloudPrioritySyncTest009
1767  * @tc.desc: use priority sync interface when query equal to from table
1768  * @tc.type: FUNC
1769  * @tc.require:
1770  * @tc.author: zhangqiquan
1771  */
1772 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level0)
1773 {
1774     /**
1775      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1776      * @tc.expected: step1. ok.
1777      */
1778     const int actualCount = 5; // 5 is count of records
1779     InsertUserTableRecord(tableName_, actualCount);
1780     Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1781 
1782     /**
1783      * @tc.steps:step2. check ParserQueryNodes
1784      * @tc.expected: step2. ok.
1785      */
__anon28bcdbc62502(const std::string &tableName, VBucket &extend) 1786     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1787         EXPECT_EQ(tableName_, tableName);
1788         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1789         DBStatus status = OK;
1790         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1791         EXPECT_EQ(status, OK);
1792         ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1793     });
1794     BlockPrioritySync(query, delegate_, true, OK);
1795     virtualCloudDb_->ForkQuery(nullptr);
1796     CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1797 }
1798 
1799 /**
1800  * @tc.name: CloudPrioritySyncTest010
1801  * @tc.desc: priority sync after cloud delete
1802  * @tc.type: FUNC
1803  * @tc.require:
1804  * @tc.author: chenchaohao
1805  */
1806 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level0)
1807 {
1808     /**
1809      * @tc.steps:step1. insert user table record.
1810      * @tc.expected: step1. ok.
1811      */
1812     const int actualCount = 10; // 10 is count of records
1813     InsertUserTableRecord(tableName_, actualCount);
1814 
1815     /**
1816      * @tc.steps:step2. normal sync and then delete cloud records.
1817      * @tc.expected: step2. ok.
1818      */
1819     Query query = Query::Select().FromTable({tableName_});
1820     BlockSync(query, delegate_, g_actualDBStatus);
1821     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1822     DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1823     CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1824     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1825 
1826     /**
1827      * @tc.steps:step3. priory sync and set query then check user table records.
1828      * @tc.expected: step3. ok.
1829      */
1830     std::vector<std::string> idValue = {"3", "4", "5"};
1831     query = Query::Select().From(tableName_).In("id", idValue);
1832     BlockPrioritySync(query, delegate_, true, OK);
1833     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1834     idValue = {"0", "1", "2"};
1835     query = Query::Select().From(tableName_).In("id", idValue);
1836     BlockPrioritySync(query, delegate_, true, OK);
1837     CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1838 }
1839 
1840 /**
1841  * @tc.name: CloudPrioritySyncTest011
1842  * @tc.desc: priority sync after cloud insert
1843  * @tc.type: FUNC
1844  * @tc.require:
1845  * @tc.author: chenchaohao
1846  */
1847 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level0)
1848 {
1849     /**
1850      * @tc.steps:step1. insert cloud table record.
1851      * @tc.expected: step1. ok.
1852      */
1853     const int actualCount = 10; // 10 is count of records
1854     InsertCloudTableRecord(0, actualCount, actualCount, false);
1855     std::vector<std::string> idValue = {"0", "1", "2"};
1856     Query query = Query::Select().From(tableName_).In("id", idValue);
1857     std::atomic<int> count = 0;
1858 
1859     /**
1860      * @tc.steps:step2. check user records when query.
1861      * @tc.expected: step1. ok.
1862      */
__anon28bcdbc62602(const std::string &, VBucket &) 1863     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1864         count++;
1865         if (count == 1) { // taskid1
1866             std::this_thread::sleep_for(std::chrono::seconds(1));
1867         }
1868         if (count == 2) { // taskid2
1869             CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1870         }
1871     });
1872     CloudSyncOption option;
1873     PrepareOption(option, query, true);
1874     std::mutex callbackMutex;
1875     std::condition_variable callbackCv;
1876     size_t finishCount = 0u;
__anon28bcdbc62702(const std::map<std::string, SyncProcess> &process) 1877     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1878         for (const auto &item: process) {
1879             if (item.second.process == DistributedDB::FINISHED) {
1880                 {
1881                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1882                     finishCount++;
1883                 }
1884                 callbackCv.notify_one();
1885             }
1886         }
1887     };
1888 
1889     /**
1890      * @tc.steps:step3. begin sync and check user record.
1891      * @tc.expected: step3. ok.
1892      */
1893     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1894     idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1895     query = Query::Select().From(tableName_).In("id", idValue);
1896     PrepareOption(option, query, true);
1897     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1898     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62802() 1899     callbackCv.wait(callbackLock, [&finishCount]() {
1900         return (finishCount == 2u); // 2 is count of finished sync tasks
1901     });
1902     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1903 }
1904 
1905 /**
1906  * @tc.name: CloudPrioritySyncTest012
1907  * @tc.desc: priority or normal sync when waittime > 300s or < -1
1908  * @tc.type: FUNC
1909  * @tc.require:
1910  * @tc.author: chenchaohao
1911  */
1912 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level0)
1913 {
1914     /**
1915      * @tc.steps:step1. insert cloud table record.
1916      * @tc.expected: step1. ok.
1917      */
1918     const int actualCount = 10; // 10 is count of records
1919     InsertCloudTableRecord(0, actualCount, actualCount, false);
1920     std::vector<std::string> idValue = {"0", "1", "2"};
1921     Query query = Query::Select().From(tableName_).In("id", idValue);
1922 
1923     /**
1924      * @tc.steps:step2. set waittime < -1 then begin sync.
1925      * @tc.expected: step2. invalid.
1926      */
1927     CloudSyncOption option;
1928     PrepareOption(option, query, true);
1929     option.waitTime = -2; // -2 < -1;
1930     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1931     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1932 
1933     /**
1934      * @tc.steps:step3. set waittime > 300s then begin sync.
1935      * @tc.expected: step3. invalid.
1936      */
1937 
1938     option.waitTime = 300001; // 300001 > 300s
1939     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1940     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1941 }
1942 
1943 /**
1944  * @tc.name: CloudPrioritySyncTest013
1945  * @tc.desc: priority sync in some abnormal composite pk query situations
1946  * @tc.type: FUNC
1947  * @tc.require:
1948  * @tc.author: chenchaohao
1949  */
1950 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level0)
1951 {
1952     /**
1953      * @tc.steps:step1. insert user table record.
1954      * @tc.expected: step1. ok.
1955      */
1956     const int actualCount = 1; // 1 is count of records
1957     InsertUserTableRecord(tableName_, actualCount);
1958 
1959     /**
1960      * @tc.steps:step2. query only begingroup then priority sync.
1961      * @tc.expected: step2. invalid.
1962      */
1963     Query query = Query::Select().From(tableName_).BeginGroup();
1964     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1965     CheckCloudTableCount(tableName_, 0);
1966 
1967     /**
1968      * @tc.steps:step3. query only endgroup then priority sync.
1969      * @tc.expected: step3. invalid.
1970      */
1971     query = Query::Select().From(tableName_).EndGroup();
1972     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1973     CheckCloudTableCount(tableName_, 0);
1974 
1975     /**
1976      * @tc.steps:step4. query only begingroup and endgroup then priority sync.
1977      * @tc.expected: step4. invalid.
1978      */
1979     query = Query::Select().From(tableName_).BeginGroup().EndGroup();
1980     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1981     CheckCloudTableCount(tableName_, 0);
1982 
1983     /**
1984      * @tc.steps:step5. query and from table then priority sync.
1985      * @tc.expected: step5. invalid.
1986      */
1987     query = Query::Select().And().From(tableName_);
1988     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1989     CheckCloudTableCount(tableName_, 0);
1990 
1991     /**
1992      * @tc.steps:step6. query or from table then priority sync.
1993      * @tc.expected: step6. invalid.
1994      */
1995     query = Query::Select().Or().From(tableName_);
1996     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1997     CheckCloudTableCount(tableName_, 0);
1998 
1999     /**
2000      * @tc.steps:step7. query begingroup from table then priority sync.
2001      * @tc.expected: step7 invalid.
2002      */
2003     query = Query::Select().BeginGroup().From(tableName_);
2004     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2005     CheckCloudTableCount(tableName_, 0);
2006 
2007     /**
2008      * @tc.steps:step8. query endgroup from table then priority sync.
2009      * @tc.expected: step8 invalid.
2010      */
2011     query = Query::Select().EndGroup().From(tableName_);
2012     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2013     CheckCloudTableCount(tableName_, 0);
2014 
2015     /**
2016      * @tc.steps:step9. query and in then priority sync.
2017      * @tc.expected: step9. invalid.
2018      */
2019     std::vector<std::string> idValue = {"0"};
2020     query = Query::Select().From(tableName_).And().In("id", idValue);
2021     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2022     CheckCloudTableCount(tableName_, 0);
2023 
2024     /**
2025      * @tc.steps:step10. query when the table name does not exit then priority sync.
2026      * @tc.expected: step10. schema mismatch.
2027      */
2028     query = Query::Select().From("tableName").And().In("id", idValue);
2029     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2030     CheckCloudTableCount(tableName_, 0);
2031 
2032     /**
2033      * @tc.steps:step11. query when the table name does not exit then priority sync.
2034      * @tc.expected: step11. schema mismatch.
2035      */
2036     query = Query::Select().From("tableName").In("id", idValue);
2037     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2038     CheckCloudTableCount(tableName_, 0);
2039 
2040     /**
2041      * @tc.steps:step12. query when the table name does not exit then sync.
2042      * @tc.expected: step12. schema mismatch.
2043      */
2044     query = Query::Select().FromTable({"tableName"});
2045     BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
2046     CheckCloudTableCount(tableName_, 0);
2047 }
2048 
CheckUploadInfoAfterSync(int recordCount,SyncProcess & normalLast)2049 void DistributedDBCloudCheckSyncTest::CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast)
2050 {
2051     uint32_t uintRecordCount = static_cast<uint32_t>(recordCount);
2052     const Info expectUploadInfo = {2u, uintRecordCount, uintRecordCount, 0u, uintRecordCount, 0u, 0u};
2053     for (const auto &table : normalLast.tableProcess) {
2054         CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2055         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2056     }
2057 }
2058 
2059 /**
2060  * @tc.name: CloudPrioritySyncTest014
2061  * @tc.desc: Check the uploadInfo after the normal sync is paused by the priority sync
2062  * @tc.type: FUNC
2063  * @tc.require:
2064  * @tc.author: suyue
2065  */
2066 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest014, TestSize.Level0)
2067 {
2068     /**
2069      * @tc.steps:step1. insert data and sync pause.
2070      * @tc.expected: step1. ok.
2071      */
2072     const int recordCount = 50; // 50 is count of data records
2073     InsertUserTableRecord(tableName_, recordCount, 0);
2074     Query normalQuery = Query::Select().FromTable({tableName_});
2075     CloudSyncOption normalOption;
2076     PrepareOption(normalOption, normalQuery, false);
2077     bool isUpload = false;
2078     uint32_t blockTime = 1000;
__anon28bcdbc62902(const std::string &tableName, VBucket &extend) 2079     virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
2080         if (isUpload == false) {
2081             isUpload = true;
2082             std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
2083         }
2084     });
2085     bool isFinish = false;
2086     bool priorityFinish = false;
2087     SyncProcess normalLast;
__anon28bcdbc62a02(const std::map<std::string, SyncProcess> &process) 2088     auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2089         for (const auto &item : process) {
2090             if (item.second.process == DistributedDB::FINISHED) {
2091                 isFinish = true;
2092                 ASSERT_EQ(priorityFinish, true);
2093                 normalLast = item.second;
2094             }
2095         }
2096     };
2097     ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2098 
2099     /**
2100      * @tc.steps:step2. priority sync.
2101      * @tc.expected: step2. ok.
2102      */
2103     while (isUpload == false) {
2104         std::this_thread::sleep_for(std::chrono::milliseconds(50));
2105     }
2106     std::vector<std::string> idValues = {"0", "1", "2", "3", "4"};
2107     Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2108     CloudSyncOption priorityOption;
2109     PrepareOption(priorityOption, priorityQuery, true);
__anon28bcdbc62b02(const std::map<std::string, SyncProcess> &process) 2110     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2111         for (const auto &item : process) {
2112             if (item.second.process == DistributedDB::FINISHED) {
2113                 priorityFinish = true;
2114             }
2115         }
2116     };
2117     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2118     while (isFinish == false || priorityFinish == false) {
2119         std::this_thread::sleep_for(std::chrono::milliseconds(50));
2120     }
2121 
2122     /**
2123      * @tc.steps:step3. check uploadInfo after sync finished.
2124      * @tc.expected: step3. ok.
2125      */
2126     CheckUploadInfoAfterSync(recordCount, normalLast);
2127     virtualCloudDb_->ForkUpload(nullptr);
2128 }
2129 
2130 /**
2131  * @tc.name: CloudPrioritySyncTest015
2132  * @tc.desc: Check the uploadInfo and the downloadInfo after the normal sync is paused by the priority sync
2133  * @tc.type: FUNC
2134  * @tc.require:
2135  * @tc.author: caihaoting
2136  */
2137 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest015, TestSize.Level0)
2138 {
2139     /**
2140      * @tc.steps:step1. insert data and sync pause.
2141      * @tc.expected: step1. ok.
2142      */
2143     const int localCount = 10; // 10 is count of local data records
2144     const int cloudCount = 50; // 50 is count of cloud data records
2145     InsertUserTableRecord(tableName_, localCount, 0);
2146     InsertCloudTableRecord(20, cloudCount, 0, false); // 20 is begin number
2147     uint32_t blockTime = 500; // 500ms
2148     virtualCloudDb_->SetBlockTime(blockTime);
2149     Query normalQuery = Query::Select().FromTable({tableName_});
2150     CloudSyncOption normalOption;
2151     PrepareOption(normalOption, normalQuery, false);
2152     bool isFinish = false;
2153     bool priorityFinish = false;
2154     SyncProcess normalLast;
__anon28bcdbc62c02(const std::map<std::string, SyncProcess> &process) 2155     auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2156         for (const auto &item : process) {
2157             if (item.second.process == DistributedDB::FINISHED) {
2158                 isFinish = true;
2159                 ASSERT_EQ(priorityFinish, true);
2160                 normalLast = item.second;
2161             }
2162         }
2163     };
2164     ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2165 
2166     /**
2167      * @tc.steps:step2. priority sync.
2168      * @tc.expected: step2. ok.
2169      */
2170     std::vector<std::string> idValues = {"10", "11", "12", "13", "14"};
2171     Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2172     CloudSyncOption priorityOption;
2173     PrepareOption(priorityOption, priorityQuery, true);
__anon28bcdbc62d02(const std::map<std::string, SyncProcess> &process) 2174     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2175         for (const auto &item : process) {
2176             if (item.second.process == DistributedDB::FINISHED) {
2177                 priorityFinish = true;
2178             }
2179         }
2180     };
2181     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2182     while (isFinish == false || priorityFinish == false) {
2183         std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
2184     }
2185 
2186     /**
2187      * @tc.steps:step3. check uploadInfo and downloadInfo after sync finished.
2188      * @tc.expected: step3. ok.
2189      */
2190     uint32_t uintLocalCount = static_cast<uint32_t>(localCount);
2191     uint32_t uintCloudCount = static_cast<uint32_t>(cloudCount);
2192     const Info expectUploadInfo = {1u, uintLocalCount, uintLocalCount, 0u, uintLocalCount, 0u, 0u};
2193     const Info expectDownloadInfo = {1u, uintCloudCount, uintCloudCount, 0u, uintCloudCount, 0u, 0u};
2194     for (const auto &table : normalLast.tableProcess) {
2195         CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2196         CheckDownloadInfo(table.second.downLoadInfo, expectDownloadInfo);
2197         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2198     }
2199     CheckUserTableResult(db_, tableName_, 60);
2200 }
2201 
2202 /**
2203  * @tc.name: CloudPrioritySyncTest016
2204  * @tc.desc: priority sync when normal syncing
2205  * @tc.type: FUNC
2206  * @tc.require:
2207  * @tc.author: wangxiangdong
2208  */
2209 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level0)
2210 {
2211     /**
2212      * @tc.steps:step1. insert cloud table record.
2213      * @tc.expected: step1. ok.
2214      */
2215     const int actualCount = 60; // 60 is count of records
2216     InsertCloudTableRecord(0, actualCount, 0, false);
2217     InsertUserTableRecord(tableName_, 10);
2218 
2219     /**
2220      * @tc.steps:step2. begin normal sync and priority sync.
2221      * @tc.expected: step2. ok.
2222      */
2223     Query normalQuery = Query::Select().FromTable({tableName_});
2224     std::vector<std::string> idValue = {"0", "1", "2"};
2225     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
2226     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
2227     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
2228     virtualCloudDb_->Reset();
2229     CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
2230     /**
2231      * @tc.steps:step3. check sync process result.
2232      * @tc.expected: step3. ok.
2233      */
2234     std::vector<DistributedDB::SyncProcess> expectSyncResult = {
2235                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2236                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2237                 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
2238                 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2239                 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
2240         };
2241     EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
2242 }
2243 
2244 /**
2245  * @tc.name: LogicDeleteSyncTest001
2246  * @tc.desc: sync with logic delete
2247  * @tc.type: FUNC
2248  * @tc.require:
2249  * @tc.author: zhangqiquan
2250  */
2251 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level0)
2252 {
2253     bool logicDelete = true;
2254     auto data = static_cast<PragmaData>(&logicDelete);
2255     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2256     int actualCount = 10;
2257     InitLogicDeleteDataEnv(actualCount, true);
2258     CheckLocalCount(actualCount);
2259     std::string device = "";
2260     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
2261     CheckLocalCount(actualCount);
2262 }
2263 
2264 /**
2265  * @tc.name: LogicDeleteSyncTest002
2266  * @tc.desc: sync without logic delete
2267  * @tc.type: FUNC
2268  * @tc.require:
2269  * @tc.author: zhangqiquan
2270  */
2271 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level0)
2272 {
2273     bool logicDelete = false;
2274     auto data = static_cast<PragmaData>(&logicDelete);
2275     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2276     int actualCount = 10;
2277     InitLogicDeleteDataEnv(actualCount);
2278     CheckLocalCount(0);
2279 }
2280 
2281 /**
2282  * @tc.name: LogicDeleteSyncTest003
2283  * @tc.desc: sync with logic delete and check observer
2284  * @tc.type: FUNC
2285  * @tc.require:
2286  * @tc.author: bty
2287  */
2288 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level0)
2289 {
2290     /**
2291      * @tc.steps:step1. register observer.
2292      * @tc.expected: step1. ok.
2293      */
2294     RelationalStoreDelegate::Option option;
2295     auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
2296     ASSERT_NE(observer, nullptr);
2297     observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
2298     EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
2299     ChangedData expectData;
2300     expectData.tableName = tableName_;
2301     expectData.type = ChangedDataType::DATA;
2302     expectData.field.push_back(std::string("id"));
2303     const int count = 10;
2304     for (int64_t i = 0; i < count; ++i) {
2305         expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
2306     }
2307     expectData.properties = { .isTrackedDataChange = true };
2308     observer->SetExpectedResult(expectData);
2309 
2310     /**
2311      * @tc.steps:step2. set tracker table
2312      * @tc.expected: step2. ok.
2313      */
2314     TrackerSchema trackerSchema;
2315     trackerSchema.tableName = tableName_;
2316     trackerSchema.trackerColNames = { "id" };
2317     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2318 
2319     /**
2320      * @tc.steps:step3. set logic delete and sync
2321      * @tc.expected: step3. ok.
2322      */
2323     bool logicDelete = true;
2324     auto data = static_cast<PragmaData>(&logicDelete);
2325     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2326     int actualCount = 10;
2327     InitLogicDeleteDataEnv(actualCount);
2328     CheckLocalCount(actualCount);
2329     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2330     observer->ClearChangedData();
2331 
2332     /**
2333      * @tc.steps:step4. unSetTrackerTable and sync
2334      * @tc.expected: step4. ok.
2335      */
2336     expectData.properties = { .isTrackedDataChange = false };
2337     observer->SetExpectedResult(expectData);
2338     trackerSchema.trackerColNames = {};
2339     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2340     InsertUserTableRecord(tableName_, actualCount);
2341     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2342     for (int i = 0; i < actualCount + actualCount; ++i) {
2343         DeleteCloudTableRecord(i);
2344     }
2345     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2346     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2347 
2348     EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
2349     delete observer;
2350     observer = nullptr;
2351 }
2352 
2353 /**
2354  * @tc.name: LogicDeleteSyncTest004
2355  * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
2356  * @tc.type: FUNC
2357  * @tc.require:
2358  * @tc.author: chenchaohao
2359  */
2360 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level0)
2361 {
2362     /**
2363      * @tc.steps:step1. set logic delete
2364      * @tc.expected: step1. ok.
2365      */
2366     bool logicDelete = true;
2367     auto data = static_cast<PragmaData>(&logicDelete);
2368     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2369 
2370     /**
2371      * @tc.steps:step2. cloud delete data then sync, check removedevicedata
2372      * @tc.expected: step2. ok.
2373      */
2374     int actualCount = 10;
2375     InitLogicDeleteDataEnv(actualCount);
2376     CheckLocalCount(actualCount);
2377     std::string device = "";
2378     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
2379     CheckLocalCount(actualCount);
2380     CheckLogCleaned(0);
2381 }
2382 
2383 /**
2384  * @tc.name: LogicDeleteSyncTest005
2385  * @tc.desc: test pragma when set cmd is not logic delete
2386  * @tc.type: FUNC
2387  * @tc.require:
2388  * @tc.author: chenchaohao
2389  */
2390 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
2391 {
2392     /**
2393      * @tc.steps:step1. set cmd is auto sync
2394      * @tc.expected: step1. ok.
2395      */
2396     bool logicDelete = true;
2397     auto data = static_cast<PragmaData>(&logicDelete);
2398     EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
2399 }
2400 
2401 /**
2402  * @tc.name: LogicDeleteSyncTest006
2403  * @tc.desc: sync with logic delete after lock table.
2404  * @tc.type: FUNC
2405  * @tc.require:
2406  * @tc.author: liaoyonghuang
2407  */
2408 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level0)
2409 {
2410     /**
2411      * @tc.steps:step1. set logic delete
2412      * @tc.expected: step1. ok.
2413      */
2414     bool logicDelete = true;
2415     auto data = static_cast<PragmaData>(&logicDelete);
2416     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2417 
2418     /**
2419      * @tc.steps:step2. insert user table record and sync.
2420      * @tc.expected: step2. ok.
2421      */
2422     int dataCount = 10;
2423     InsertUserTableRecord(tableName_, dataCount);
2424     Query query = Query::Select().FromTable({ tableName_ });
2425     BlockSync(query, delegate_, g_actualDBStatus);
2426 
2427     /**
2428      * @tc.steps:step3. Lock log table, and delete data from cloud table.
2429      * @tc.expected: step3. ok.
2430      */
2431     std::vector<std::vector<uint8_t>> hashKey;
2432     CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
2433     Lock(tableName_, hashKey, db_);
2434     for (int i = 0; i < dataCount; ++i) {
2435         DeleteCloudTableRecord(i);
2436     }
2437     /**
2438      * @tc.steps:step4. sync.
2439      * @tc.expected: step4. ok.
2440      */
2441     std::vector<DBStatus> actualDBStatus;
2442     BlockSync(query, delegate_, actualDBStatus);
2443     for (auto status : actualDBStatus) {
2444         EXPECT_EQ(status, OK);
2445     }
2446 }
2447 
2448 /**
2449  * @tc.name: LogicDeleteSyncTest008
2450  * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2451  * @tc.type: FUNC
2452  * @tc.require:
2453  * @tc.author: liaoyonghuang
2454  */
2455 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level0)
2456 {
2457     /**
2458      * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2459      * @tc.expected: step1. ok.
2460      */
2461     int dataCount = 10;
2462     uint32_t logicDeleteCount = 4;
2463     InsertUserTableRecord(tableName_, dataCount);
2464     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2465         " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2466     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2467     InsertCloudTableRecord(0, dataCount, 0, false);
2468     sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2469     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2470         reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2471     /**
2472      * @tc.steps:step2. Do sync.
2473      * @tc.expected: step2. ok.
2474      */
2475     Query query = Query::Select().FromTable({ tableName_ });
2476     BlockSync(query, delegate_, g_actualDBStatus);
2477     /**
2478      * @tc.steps:step3. Check data flag in local DB.
2479      * @tc.expected: step3. No data flag is 0x800.
2480      */
2481     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2482         reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2483 }
2484 
2485 /**
2486  * @tc.name: LockActionTest001
2487  * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2488  * @tc.type: FUNC
2489  * @tc.require:
2490  * @tc.author: wangxiangdong
2491  */
2492 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2493 {
2494     /**
2495      * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2496      * @tc.expected: step1. ok.
2497      */
2498     CloudSyncOption option;
2499     option.devices = { "CLOUD" };
2500     option.mode = SYNC_MODE_CLOUD_MERGE;
2501     option.query = Query::Select().FromTable({ tableName_ });
2502     option.waitTime = g_syncWaitTime;
2503     auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2504                       | static_cast<uint32_t>(LockAction::DELETE);
2505     option.lockAction = static_cast<LockAction>(action);
2506     option.priorityTask = true;
2507     option.compensatedSyncOnly = true;
2508     const SyncProcessCallback onProcess;
2509     CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2510     EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2511 }
2512 
2513 /**
2514  * @tc.name: LogicCreateRepeatedTableNameTest001
2515  * @tc.desc: test create repeated table name with different cases
2516  * @tc.type: FUNC
2517  * @tc.require:
2518  * @tc.author: wangxiangdong
2519  */
2520 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2521 {
2522     /**
2523      * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2524      * @tc.expected: step1. operate successfully.
2525      */
2526     DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2527     ASSERT_EQ(createStatus, DBStatus::OK);
2528 }
2529 
2530 /**
2531  * @tc.name: SaveCursorTest001
2532  * @tc.desc: test whether cloud cursor is saved when first sync
2533  * @tc.type: FUNC
2534  * @tc.require:
2535  * @tc.author: chenchaohao
2536  */
2537 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level0)
2538 {
2539     /**
2540      * @tc.steps:step1. insert cloud records
2541      * @tc.expected: step1. OK
2542      */
2543     const int actualCount = 10;
2544     InsertCloudTableRecord(0, actualCount, 0, false);
2545 
2546     /**
2547      * @tc.steps:step2. check cursor when first sync
2548      * @tc.expected: step2. OK
2549      */
__anon28bcdbc62e02(const std::string &tableName, VBucket &extend) 2550     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2551         EXPECT_EQ(tableName_, tableName);
2552         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2553         EXPECT_EQ(cursor, "0");
2554     });
2555     Query query = Query::Select().FromTable({ tableName_ });
2556     BlockSync(query, delegate_, g_actualDBStatus);
2557     CheckLocalCount(actualCount);
2558 }
2559 
2560 /**
2561  * @tc.name: SaveCursorTest002
2562  * @tc.desc: test whether cloud cursor is saved when first download failed
2563  * @tc.type: FUNC
2564  * @tc.require:
2565  * @tc.author: chenchaohao
2566  */
2567 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level0)
2568 {
2569     /**
2570      * @tc.steps:step1. insert cloud records
2571      * @tc.expected: step1. OK
2572      */
2573     const int actualCount = 10;
2574     InsertCloudTableRecord(0, actualCount, 0, false);
2575 
2576     /**
2577      * @tc.steps:step2. set download failed
2578      * @tc.expected: step2. OK
2579      */
2580     virtualCloudDb_->SetCloudError(true);
2581     Query query = Query::Select().FromTable({ tableName_ });
2582     BlockPrioritySync(query, delegate_, false, OK);
2583     CheckLocalCount(0);
2584 
2585     /**
2586      * @tc.steps:step3. check cursor when query
2587      * @tc.expected: step3. OK
2588      */
2589     virtualCloudDb_->SetCloudError(false);
__anon28bcdbc62f02(const std::string &tableName, VBucket &extend) 2590     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2591         EXPECT_EQ(tableName_, tableName);
2592         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2593         EXPECT_EQ(cursor, "0");
2594     });
2595     BlockSync(query, delegate_, g_actualDBStatus);
2596     CheckLocalCount(actualCount);
2597 }
2598 
2599 /**
2600  * @tc.name: SaveCursorTest003
2601  * @tc.desc: test whether cloud cursor is saved when first upload failed
2602  * @tc.type: FUNC
2603  * @tc.require:
2604  * @tc.author: chenchaohao
2605  */
2606 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level0)
2607 {
2608     /**
2609      * @tc.steps:step1. insert local records
2610      * @tc.expected: step1. OK
2611      */
2612     const int actualCount = 10;
2613     InsertUserTableRecord(tableName_, actualCount);
2614 
2615     /**
2616      * @tc.steps:step2. set upload failed
2617      * @tc.expected: step2. OK
2618      */
2619     virtualCloudDb_->SetCloudError(true);
2620     Query query = Query::Select().FromTable({ tableName_ });
2621     BlockPrioritySync(query, delegate_, false, OK);
2622     CheckCloudTableCount(tableName_, 0);
2623 
2624     /**
2625      * @tc.steps:step3. check cursor when query
2626      * @tc.expected: step3. OK
2627      */
2628     virtualCloudDb_->SetCloudError(false);
__anon28bcdbc63002(const std::string &tableName, VBucket &extend) 2629     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2630         EXPECT_EQ(tableName_, tableName);
2631         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2632         EXPECT_EQ(cursor, "0");
2633     });
2634     BlockSync(query, delegate_, g_actualDBStatus);
2635     CheckCloudTableCount(tableName_, actualCount);
2636 }
2637 
2638 /**
2639  * @tc.name: RangeQuerySyncTest001
2640  * @tc.desc: Test sync that has option parameter with range query.
2641  * @tc.type: FUNC
2642  * @tc.require:
2643  * @tc.author: chenchaohao
2644  */
2645 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level0)
2646 {
2647     /**
2648      * @tc.steps:step1. insert user table record.
2649      * @tc.expected: step1. ok.
2650      */
2651     CloudSyncOption option;
2652     option.devices = { "CLOUD" };
2653     option.mode = SYNC_MODE_CLOUD_MERGE;
2654     option.waitTime = g_syncWaitTime;
2655     Query query = Query::Select().From(tableName_).Range({}, {});
2656     option.query = query;
2657 
2658     /**
2659      * @tc.steps:step2. test normal sync with range query.
2660      * @tc.expected: step2. not support.
2661      */
2662     option.priorityTask = false;
2663     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2664 
2665     /**
2666      * @tc.steps:step3. test Priority sync with range query.
2667      * @tc.expected: step3. not support.
2668      */
2669     option.priorityTask = true;
2670     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2671 }
2672 
2673 /*
2674  * @tc.name: RangeQuerySyncTest002
2675  * @tc.desc: Test sync that has not option parameter with range query.
2676  * @tc.type: FUNC
2677  * @tc.require:
2678  * @tc.author: mazhao
2679  */
2680 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2681 {
2682     Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2683     ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2684         DBStatus::NOT_SUPPORT);
2685 }
2686 
2687 /*
2688  * @tc.name: SameDataSync001
2689  * @tc.desc: Test query same data in one batch.
2690  * @tc.type: FUNC
2691  * @tc.require:
2692  * @tc.author: zqq
2693  */
2694 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level0)
2695 {
2696     /**
2697      * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2698      * @tc.expected: step1. OK
2699      */
2700     const int actualCount = 5;
2701     InsertCloudTableRecord(0, actualCount, 0, false);
2702     InsertCloudTableRecord(0, actualCount, 0, false);
2703     /**
2704      * @tc.steps:step2. call sync, local has one batch id:0-4
2705      * @tc.expected: step2. OK
2706      */
2707     Query query = Query::Select().FromTable({ tableName_ });
2708     BlockSync(query, delegate_, g_actualDBStatus);
2709     CheckLocalCount(actualCount);
2710 }
2711 
2712 /*
2713  * @tc.name: SameDataSync002
2714  * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2715  * @tc.type: FUNC
2716  * @tc.require:
2717  * @tc.author: liaoyonghuang
2718  */
2719 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2720 {
2721     /**
2722      * @tc.steps:step1. insert local 1 record and sync to cloud.
2723      * @tc.expected: step1. OK
2724      */
2725     const int actualCount = 1;
2726     InsertUserTableRecord(tableName_, actualCount);
2727     Query query = Query::Select().FromTable({ tableName_ });
2728     BlockSync(query, delegate_, g_actualDBStatus);
2729 
2730     /**
2731      * @tc.steps:step2. insert 2 records with the same primary key.
2732      * @tc.expected: step2. OK
2733      */
2734     std::vector<VBucket> record;
2735     std::vector<VBucket> extend;
2736     Timestamp now = TimeHelper::GetSysCurrentTime();
2737     VBucket data;
2738     std::vector<uint8_t> photo(0, 'v');
2739     data.insert_or_assign("id", std::string("0"));
2740     data.insert_or_assign("name", std::string("Cloud"));
2741     data.insert_or_assign("height", 166.0); // 166.0 is random double value
2742     data.insert_or_assign("married", false);
2743     data.insert_or_assign("photo", photo);
2744     data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2745     record.push_back(data);
2746     data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2747     record.push_back(data);
2748     VBucket log;
2749     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2750         now / CloudDbConstant::TEN_THOUSAND));
2751     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2752         now / CloudDbConstant::TEN_THOUSAND));
2753     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2754     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2755     extend.push_back(log);
2756     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2757     extend.push_back(log);
2758     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2759 
2760     /**
2761      * @tc.steps:step3. sync from cloud and check record.
2762      * @tc.expected: step3. The record with age of 14 has been updated locally.
2763      */
2764     BlockSync(query, delegate_, g_actualDBStatus);
2765     std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2766     int64_t actualAge = 0;
2767     int64_t expectAge = 14L;
__anon28bcdbc63102(sqlite3_stmt *stmt) 2768     RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2769         actualAge = sqlite3_column_int(stmt, 0);
2770         return E_OK;
2771     });
2772     EXPECT_EQ(actualAge, expectAge);
2773 }
2774 
2775 /*
2776  * @tc.name: CreateDistributedTable001
2777  * @tc.desc: Test create distributed table when table not empty.
2778  * @tc.type: FUNC
2779  * @tc.require:
2780  * @tc.author: zqq
2781  */
2782 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level0)
2783 {
2784     const std::string table = "CreateDistributedTable001";
2785     const std::string createSQL =
2786         "CREATE TABLE IF NOT EXISTS " + table + "(" \
2787         "id TEXT PRIMARY KEY," \
2788         "name TEXT," \
2789         "height REAL ," \
2790         "photo BLOB," \
2791         "age INT);";
2792     ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2793     int actualCount = 10;
2794     InsertUserTableRecord(table, actualCount);
2795     InsertCloudTableRecord(table, 0, actualCount, 0, true);
2796     ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2797     DataBaseSchema dataBaseSchema = GetSchema();
2798     TableSchema schema = dataBaseSchema.tables.at(0);
2799     schema.name = table;
2800     schema.sharedTableName = "";
2801     dataBaseSchema.tables.push_back(schema);
2802     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2803     /**
2804      * @tc.steps:step2. call sync, local has one batch id:0-4
2805      * @tc.expected: step2. OK
2806      */
2807     Query query = Query::Select().FromTable({ table });
2808     BlockSync(query, delegate_, g_actualDBStatus);
2809     CheckCloudTableCount(table, actualCount);
2810 }
2811 
2812 /*
2813  * @tc.name: CloseDbTest001
2814  * @tc.desc: Test process of db close during sync
2815  * @tc.type: FUNC
2816  * @tc.require:
2817  * @tc.author: bty
2818  */
2819 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2820 {
2821     /**
2822      * @tc.steps:step1. insert user table record.
2823      * @tc.expected: step1. ok.
2824      */
2825     const int actualCount = 10; // 10 is count of records
2826     InsertUserTableRecord(tableName_, actualCount);
2827 
2828     /**
2829      * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2830      * @tc.expected: step2. ok.
2831      */
2832     std::mutex callMutex;
2833     int callCount = 0;
__anon28bcdbc63202(const std::string &, VBucket &) 2834     virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2835         std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2836     });
2837     const auto callback = [&callCount, &callMutex](
__anon28bcdbc63302( const std::map<std::string, SyncProcess> &) 2838         const std::map<std::string, SyncProcess> &) {
2839         {
2840             std::lock_guard<std::mutex> autoLock(callMutex);
2841             callCount++;
2842         }
2843     };
2844     Query query = Query::Select().FromTable({ tableName_ });
2845     ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2846     std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2847     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2848     delegate_ = nullptr;
2849     mgr_ = nullptr;
2850 
2851     /**
2852      * @tc.steps:step3. wait for 2 seconds to check the process call count.
2853      * @tc.expected: step3. ok.
2854      */
2855     std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2856     EXPECT_EQ(callCount, 0L);
2857 }
2858 
2859 /*
2860  * @tc.name: ConsistentFlagTest001
2861  * @tc.desc: Test the consistency flag of no asset table
2862  * @tc.type: FUNC
2863  * @tc.require:
2864  * @tc.author: bty
2865  */
2866 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2867 {
2868     /**
2869      * @tc.steps:step1. init data and sync
2870      * @tc.expected: step1. ok.
2871      */
2872     const int localCount = 20; // 20 is count of local
2873     const int cloudCount = 10; // 10 is count of cloud
2874     InsertUserTableRecord(tableName_, localCount);
2875     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2876     Query query = Query::Select().FromTable({ tableName_ });
2877     BlockSync(query, delegate_, g_actualDBStatus);
2878 
2879     /**
2880      * @tc.steps:step2. check the 0x20 bit of flag after sync
2881      * @tc.expected: step2. ok.
2882      */
2883     std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2884         " where flag&0x20=0;";
2885     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2886         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2887 
2888     /**
2889      * @tc.steps:step3. delete local data and check
2890      * @tc.expected: step3. ok.
2891      */
2892     std::string sql = "delete from " + tableName_ + " where id = '1';";
2893     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2894     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2895         reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2896 
2897     /**
2898      * @tc.steps:step4. check the 0x20 bit of flag after sync
2899      * @tc.expected: step4. ok.
2900      */
2901     BlockSync(query, delegate_, g_actualDBStatus);
2902     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2903         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2904 }
2905 
SyncDataStatusTest(bool isCompensatedSyncOnly)2906 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2907 {
2908     /**
2909      * @tc.steps:step1. init data and sync
2910      * @tc.expected: step1. ok.
2911      */
2912     const int localCount = 20; // 20 is count of local
2913     const int cloudCount = 10; // 10 is count of cloud
2914     InsertUserTableRecord(tableName_, localCount);
2915     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2916     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2917     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2918     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2919     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2920     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2921     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2922     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2923     Query query = Query::Select().FromTable({tableName_});
2924 
2925     /**
2926      * @tc.steps:step2. check count
2927      * @tc.expected: step2. ok.
2928      */
2929     int64_t syncCount = 2;
2930     BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2931     if (!isCompensatedSyncOnly) {
2932         std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2933     }
2934     std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2935     std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2936     CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2937     if (isCompensatedSyncOnly) {
2938         querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2939         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2940         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
2941         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2942         querySql = preSql + " where status=0 and cloud_gid ='';";
2943         int unSyncCount = 14; // 14 is the num of unSync data with status 0
2944         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2945     } else {
2946         // gid 12、13 are upload insert, lock to lock_change
2947         querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
2948         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2949         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
2950         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2951         querySql = preSql + " where status=0 and cloud_gid !='';";
2952         int unSyncCount = 16; // 16 is the num of sync finish
2953         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2954     }
2955 }
2956 
2957 /*
2958  * @tc.name: SyncDataStatusTest001
2959  * @tc.desc: Test the status after compensated sync the no asset table
2960  * @tc.type: FUNC
2961  * @tc.require:
2962  * @tc.author: bty
2963  */
2964 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
2965 {
2966     SyncDataStatusTest(true);
2967 }
2968 
2969 /*
2970  * @tc.name: SyncDataStatusTest002
2971  * @tc.desc: Test the status after normal sync the no asset table
2972  * @tc.type: FUNC
2973  * @tc.require:
2974  * @tc.author: bty
2975  */
2976 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
2977 {
2978     SyncDataStatusTest(false);
2979 }
2980 }
2981 #endif
2982