1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42 "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43 "id TEXT PRIMARY KEY," \
44 "name TEXT," \
45 "height REAL ," \
46 "photo BLOB," \
47 "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49 "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50 "id TEXT," \
51 "name TEXT," \
52 "height REAL ," \
53 "photo BLOB," \
54 "age INT);";
55 const int64_t g_syncWaitTime = 60;
56
57 const Asset g_cloudAsset = {
58 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61
62 std::vector<DBStatus> g_actualDBStatus;
63
CreateUserDBAndTable(sqlite3 * & db)64 void CreateUserDBAndTable(sqlite3 *&db)
65 {
66 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
67 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
68 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
69 }
70
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)71 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
72 {
73 option.devices = { "CLOUD" };
74 option.mode = SYNC_MODE_CLOUD_MERGE;
75 option.query = query;
76 option.waitTime = g_syncWaitTime;
77 option.priorityTask = isPriorityTask;
78 option.compensatedSyncOnly = isCompensatedSyncOnly;
79 }
80
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)81 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
82 bool prioritySync = false)
83 {
84 std::mutex dataMutex;
85 std::condition_variable cv;
86 bool finish = false;
87 auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
88 for (const auto &item: process) {
89 actualDBStatus.push_back(item.second.errCode);
90 if (item.second.process == DistributedDB::FINISHED) {
91 {
92 std::lock_guard<std::mutex> autoLock(dataMutex);
93 finish = true;
94 }
95 cv.notify_one();
96 }
97 }
98 };
99 CloudSyncOption option;
100 option.devices = { "CLOUD" };
101 option.mode = SYNC_MODE_CLOUD_MERGE;
102 option.query = query;
103 option.waitTime = g_syncWaitTime;
104 option.priorityTask = prioritySync;
105 ASSERT_EQ(delegate->Sync(option, callback), OK);
106 std::unique_lock<std::mutex> uniqueLock(dataMutex);
107 cv.wait(uniqueLock, [&finish]() {
108 return finish;
109 });
110 }
111
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)112 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
113 bool isCompensatedSyncOnly = false)
114 {
115 std::mutex dataMutex;
116 std::condition_variable cv;
117 bool finish = false;
118 auto callback = [&cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
119 for (const auto &item: process) {
120 if (item.second.process == DistributedDB::FINISHED) {
121 {
122 std::lock_guard<std::mutex> autoLock(dataMutex);
123 finish = true;
124 }
125 cv.notify_one();
126 }
127 }
128 };
129 CloudSyncOption option;
130 PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
131 ASSERT_EQ(delegate->Sync(option, callback), expectResult);
132 if (expectResult == OK) {
133 std::unique_lock<std::mutex> uniqueLock(dataMutex);
134 cv.wait(uniqueLock, [&finish]() {
135 return finish;
136 });
137 }
138 }
139
BlockCompensatedSync(const Query & query,RelationalStoreDelegate * delegate,DBStatus expectResult,const std::function<void (const std::map<std::string,SyncProcess> & syncProcess)> & processCallback)140 void BlockCompensatedSync(const Query &query, RelationalStoreDelegate *delegate, DBStatus expectResult,
141 const std::function<void(const std::map<std::string, SyncProcess> &syncProcess)> &processCallback)
142 {
143 std::mutex dataMutex;
144 std::condition_variable cv;
145 bool finish = false;
146 auto callback = [&processCallback, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
147 for (const auto &item: process) {
148 if (item.second.process == DistributedDB::FINISHED) {
149 {
150 std::lock_guard<std::mutex> autoLock(dataMutex);
151 finish = true;
152 }
153 cv.notify_one();
154 }
155 }
156 if (processCallback != nullptr) {
157 processCallback(process);
158 }
159 };
160 CloudSyncOption option;
161 PrepareOption(option, query, false, true);
162 ASSERT_EQ(delegate->Sync(option, callback), expectResult);
163 if (expectResult == OK) {
164 std::unique_lock<std::mutex> uniqueLock(dataMutex);
165 cv.wait(uniqueLock, [&finish]() {
166 return finish;
167 });
168 }
169 }
170
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)171 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
172 {
173 if (count != 1) {
174 return 0;
175 }
176 auto expectCount = reinterpret_cast<int64_t>(data);
177 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
178 return 0;
179 }
180
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)181 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
182 {
183 string query = "select count(*) from " + tableName + ";";
184 EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
185 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
186 }
187
188 class DistributedDBCloudCheckSyncTest : public testing::Test {
189 public:
190 static void SetUpTestCase();
191 static void TearDownTestCase();
192 void SetUp() override;
193 void TearDown() override;
194 protected:
195 void InitTestDir();
196 DataBaseSchema GetSchema();
197 void CloseDb();
198 void InitDataAndSync();
199 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
200 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
201 void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
202 bool assetIsNull);
203 void DeleteUserTableRecord(int64_t id);
204 void DeleteUserTableRecord(int64_t begin, int64_t end);
205 void DeleteCloudTableRecord(int64_t gid);
206 void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
207 bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
208 bool CheckSyncProcessInner(SyncProcess &actualSyncProcess, SyncProcess &expectSyncProcess);
209 bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
210 vector<SyncProcess> &expectSyncProcessV);
211 void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
212 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
213 bool isCheckProcess);
214 void DeleteCloudDBData(int64_t begin, int64_t count);
215 void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
216 void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
217 void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
218 void CheckLocalCount(int64_t expectCount);
219 void CheckLogCleaned(int64_t expectCount);
220 void CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo);
221 void CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo);
222 void SyncDataStatusTest(bool isCompensatedSyncOnly);
223 void CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast);
224 std::string testDir_;
225 std::string storePath_;
226 sqlite3 *db_ = nullptr;
227 RelationalStoreDelegate *delegate_ = nullptr;
228 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
229 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
230 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
231 std::string tableName_ = "DistributedDBCloudCheckSyncTest";
232 std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
233 std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
234 std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
235 std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
236 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
237 };
238
SetUpTestCase()239 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
240 {
241 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
242 }
243
TearDownTestCase()244 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
245 {}
246
SetUp()247 void DistributedDBCloudCheckSyncTest::SetUp()
248 {
249 DistributedDBToolsUnitTest::PrintTestCaseInfo();
250 InitTestDir();
251 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
252 LOGE("rm test db files error.");
253 }
254 DistributedDBToolsUnitTest::PrintTestCaseInfo();
255 LOGD("Test dir is %s", testDir_.c_str());
256 db_ = RelationalTestUtils::CreateDataBase(storePath_);
257 ASSERT_NE(db_, nullptr);
258 CreateUserDBAndTable(db_);
259 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
260 RelationalStoreDelegate::Option option;
261 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
262 ASSERT_NE(delegate_, nullptr);
263 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
264 ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
265 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
266 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
267 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
268 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
269 DataBaseSchema dataBaseSchema = GetSchema();
270 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
271 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
272 ASSERT_TRUE(communicatorAggregator_ != nullptr);
273 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
274 }
275
TearDown()276 void DistributedDBCloudCheckSyncTest::TearDown()
277 {
278 virtualCloudDb_->ForkQuery(nullptr);
279 virtualCloudDb_->SetCloudError(false);
280 CloseDb();
281 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
282 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
283 LOGE("rm test db files error.");
284 }
285 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
286 communicatorAggregator_ = nullptr;
287 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
288 }
289
InitTestDir()290 void DistributedDBCloudCheckSyncTest::InitTestDir()
291 {
292 if (!testDir_.empty()) {
293 return;
294 }
295 DistributedDBToolsUnitTest::TestDirInit(testDir_);
296 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
297 LOGI("The test db is:%s", testDir_.c_str());
298 }
299
GetSchema()300 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
301 {
302 DataBaseSchema schema;
303 TableSchema tableSchema;
304 tableSchema.name = tableName_;
305 tableSchema.sharedTableName = tableNameShared_;
306 tableSchema.fields = {
307 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
308 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
309 };
310 TableSchema tableWithoutPrimaryKeySchema;
311 tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
312 tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
313 tableWithoutPrimaryKeySchema.fields = {
314 {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
315 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
316 };
317 schema.tables.push_back(tableSchema);
318 schema.tables.push_back(tableWithoutPrimaryKeySchema);
319 return schema;
320 }
321
CloseDb()322 void DistributedDBCloudCheckSyncTest::CloseDb()
323 {
324 virtualCloudDb_ = nullptr;
325 if (mgr_ != nullptr) {
326 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
327 delegate_ = nullptr;
328 mgr_ = nullptr;
329 }
330 }
331
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)332 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
333 int64_t recordCounts, int64_t begin)
334 {
335 ASSERT_NE(db_, nullptr);
336 for (int64_t i = begin; i < begin + recordCounts; ++i) {
337 string sql = "INSERT OR REPLACE INTO " + tableName
338 + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
339 + std::to_string(i) + "', '155.10', 'text', '21');";
340 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
341 }
342 }
343
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)344 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
345 bool assetIsNull)
346 {
347 InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
348 }
349
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)350 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
351 int64_t photoSize, bool assetIsNull)
352 {
353 std::vector<uint8_t> photo(photoSize, 'v');
354 std::vector<VBucket> record1;
355 std::vector<VBucket> extend1;
356 std::vector<VBucket> record2;
357 std::vector<VBucket> extend2;
358 Timestamp now = TimeHelper::GetSysCurrentTime();
359 for (int64_t i = begin; i < begin + count; ++i) {
360 VBucket data;
361 data.insert_or_assign("id", std::to_string(i));
362 data.insert_or_assign("name", "Cloud" + std::to_string(i));
363 data.insert_or_assign("height", 166.0); // 166.0 is random double value
364 data.insert_or_assign("married", false);
365 data.insert_or_assign("photo", photo);
366 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
367 Asset asset = g_cloudAsset;
368 asset.name = asset.name + std::to_string(i);
369 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
370 record1.push_back(data);
371 VBucket log;
372 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
373 now / CloudDbConstant::TEN_THOUSAND + i));
374 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
375 now / CloudDbConstant::TEN_THOUSAND + i));
376 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
377 extend1.push_back(log);
378
379 std::vector<Asset> assets;
380 data.insert_or_assign("height", 180.3); // 180.3 is random double value
381 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
382 asset.name = g_cloudAsset.name + std::to_string(j);
383 assets.push_back(asset);
384 }
385 data.erase("assert");
386 data.erase("married");
387 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
388 record2.push_back(data);
389 extend2.push_back(log);
390 }
391 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
392 std::this_thread::sleep_for(std::chrono::milliseconds(count));
393 }
394
DeleteUserTableRecord(int64_t id)395 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
396 {
397 ASSERT_NE(db_, nullptr);
398 string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
399 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
400 }
401
DeleteUserTableRecord(int64_t begin,int64_t end)402 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
403 {
404 ASSERT_NE(db_, nullptr);
405 std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
406 for (int64_t i = begin; i <= end; ++i) {
407 sql += "'" + std::to_string(i) + "',";
408 }
409 if (sql.back() == ',') {
410 sql.pop_back();
411 }
412 sql += ");";
413 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
414 }
415
DeleteCloudTableRecord(int64_t gid)416 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
417 {
418 VBucket idMap;
419 idMap.insert_or_assign("#_gid", std::to_string(gid));
420 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
421 }
422
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)423 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
424 {
425 VBucket extend;
426 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
427 int64_t realCount = 0;
428 std::vector<VBucket> data;
429 virtualCloudDb_->Query(tableName, extend, data);
430 for (size_t j = 0; j < data.size(); ++j) {
431 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
432 if (entry != data[j].end() && std::get<bool>(entry->second)) {
433 continue;
434 }
435 realCount++;
436 }
437 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
438 }
439
CheckSyncCount(const Info actualInfo,const Info expectInfo)440 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
441 {
442 if (actualInfo.batchIndex != expectInfo.batchIndex) {
443 return false;
444 }
445 if (actualInfo.total != expectInfo.total) {
446 return false;
447 }
448 if (actualInfo.successCount != expectInfo.successCount) {
449 return false;
450 }
451 if (actualInfo.failCount != expectInfo.failCount) {
452 return false;
453 }
454 return true;
455 }
456
CheckSyncProcessInner(SyncProcess & actualSyncProcess,SyncProcess & expectSyncProcess)457 bool DistributedDBCloudCheckSyncTest::CheckSyncProcessInner(SyncProcess &actualSyncProcess,
458 SyncProcess &expectSyncProcess)
459 {
460 for (const auto &itInner : actualSyncProcess.tableProcess) {
461 std::string tableName = itInner.first;
462 if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
463 return false;
464 }
465 TableProcessInfo actualTableProcessInfo = itInner.second;
466 TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
467 if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
468 return false;
469 }
470 if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
471 return false;
472 }
473 }
474 return true;
475 }
476
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)477 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
478 std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
479 {
480 vector<map<string, SyncProcess>> expectSyncProcess;
481 for (auto syncProcess : expectSyncProcessV) {
482 map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
483 expectSyncProcess.emplace_back(expectSyncProcessMap);
484 }
485 for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
486 map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
487 map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
488 for (auto &it : actualSyncProcessMap) {
489 string mapKey = it.first;
490 if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
491 return false;
492 }
493 SyncProcess actualSyncProcess = it.second;
494 SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
495 if (!CheckSyncProcessInner(actualSyncProcess, expectSyncProcess)) {
496 return false;
497 }
498 }
499 }
500 return true;
501 }
502
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)503 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
504 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
505 bool isCheckProcess)
506 {
507 std::mutex dataMutex;
508 std::condition_variable cv;
509 bool normalFinish = false;
510 bool priorityFinish = false;
511 auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
512 const std::map<std::string, SyncProcess> &process) {
513 auto foundFinishedProcess = std::find_if(process.begin(), process.end(), [](const auto &item) {
514 return item.second.process == DistributedDB::FINISHED;
515 });
516 if (foundFinishedProcess != process.end()) {
517 normalFinish = true;
518 if (isCheckProcess) {
519 ASSERT_EQ(priorityFinish, true);
520 }
521 cv.notify_one();
522 }
523 prioritySyncProcess.emplace_back(process);
524 };
525 auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
526 const std::map<std::string, SyncProcess> &process) {
527 auto it = std::find_if(process.begin(), process.end(), [](const auto &item) {
528 return item.second.process == DistributedDB::FINISHED;
529 });
530 if (it != process.end()) {
531 priorityFinish = true;
532 cv.notify_one();
533 }
534 prioritySyncProcess.emplace_back(process);
535 };
536 CloudSyncOption option;
537 PrepareOption(option, normalQuery, false);
538 virtualCloudDb_->SetBlockTime(500); // 500 ms
539 ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
540 PrepareOption(option, priorityQuery, true);
541 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
542 ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
543 std::unique_lock<std::mutex> uniqueLock(dataMutex);
544 cv.wait(uniqueLock, [&normalFinish]() {
545 return normalFinish;
546 });
547 }
548
DeleteCloudDBData(int64_t begin,int64_t count)549 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
550 {
551 for (int64_t i = begin; i < begin + count; i++) {
552 VBucket idMap;
553 idMap.insert_or_assign("#_gid", std::to_string(i));
554 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
555 }
556 }
557
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)558 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
559 {
560 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
561 count++;
562 if (count == 1) { // taskid1
563 std::this_thread::sleep_for(std::chrono::seconds(1));
564 }
565 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
566 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
567 }
568 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
569 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
570 }
571 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
572 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
573 }
574 });
575 }
576
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)577 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
578 {
579 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
580 count++;
581 if (count == 1) { // taskid1
582 std::this_thread::sleep_for(std::chrono::seconds(1));
583 }
584 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
585 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
586 }
587 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
588 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
589 }
590 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
591 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
592 }
593 });
594 }
595
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)596 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
597 {
598 // prepare data
599 InsertUserTableRecord(tableName_, dataCount);
600 // sync
601 Query query = Query::Select().FromTable({ tableName_ });
602 BlockSync(query, delegate_, g_actualDBStatus);
603 // delete cloud data
604 for (int i = 0; i < dataCount; ++i) {
605 DeleteCloudTableRecord(i);
606 }
607 // sync again
608 BlockSync(query, delegate_, g_actualDBStatus);
609 }
610
CheckLocalCount(int64_t expectCount)611 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
612 {
613 // check local data
614 int dataCnt = -1;
615 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
616 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
617 dataCnt = sqlite3_column_int(stmt, 0);
618 return E_OK;
619 });
620 EXPECT_EQ(dataCnt, expectCount);
621 }
622
CheckLogCleaned(int64_t expectCount)623 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
624 {
625 std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
626 " where device = 'cloud';";
627 EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
628 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
629 std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
630 " is not null and cloud_gid != '';";
631 EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
632 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
633 std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
634 " where flag & 0x02 != 0;";
635 EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
636 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
637 }
638
CheckUploadInfo(const Info & actualUploadInfo,const Info & expectUploadInfo)639 void DistributedDBCloudCheckSyncTest::CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo)
640 {
641 EXPECT_EQ(actualUploadInfo.batchIndex, expectUploadInfo.batchIndex);
642 EXPECT_EQ(actualUploadInfo.total, expectUploadInfo.total);
643 EXPECT_EQ(actualUploadInfo.successCount, expectUploadInfo.successCount);
644 EXPECT_EQ(actualUploadInfo.failCount, expectUploadInfo.failCount);
645 EXPECT_EQ(actualUploadInfo.insertCount, expectUploadInfo.insertCount);
646 EXPECT_EQ(actualUploadInfo.updateCount, expectUploadInfo.updateCount);
647 EXPECT_EQ(actualUploadInfo.deleteCount, expectUploadInfo.deleteCount);
648 }
649
CheckDownloadInfo(const Info & actualDownloadInfo,const Info & expectDownloadInfo)650 void DistributedDBCloudCheckSyncTest::CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo)
651 {
652 EXPECT_EQ(actualDownloadInfo.batchIndex, expectDownloadInfo.batchIndex);
653 EXPECT_EQ(actualDownloadInfo.total, expectDownloadInfo.total);
654 EXPECT_EQ(actualDownloadInfo.successCount, expectDownloadInfo.successCount);
655 EXPECT_EQ(actualDownloadInfo.failCount, expectDownloadInfo.failCount);
656 EXPECT_EQ(actualDownloadInfo.insertCount, expectDownloadInfo.insertCount);
657 EXPECT_EQ(actualDownloadInfo.updateCount, expectDownloadInfo.updateCount);
658 EXPECT_EQ(actualDownloadInfo.deleteCount, expectDownloadInfo.deleteCount);
659 }
660
661 /**
662 * @tc.name: CloudSyncTest001
663 * @tc.desc: sync with device sync query
664 * @tc.type: FUNC
665 * @tc.require:
666 * @tc.author: zhangqiquan
667 */
668 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level1)
669 {
670 // prepare data
671 const int actualCount = 10;
672 InsertUserTableRecord(tableName_, actualCount);
673 // sync twice
674 Query query = Query::Select().FromTable({ tableName_ });
675 BlockSync(query, delegate_, g_actualDBStatus);
676 BlockSync(query, delegate_, g_actualDBStatus);
677 // remove cloud data
678 delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
679 // check local data
680 int dataCnt = -1;
681 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon6d2f4c681002(sqlite3_stmt *stmt) 682 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
683 dataCnt = sqlite3_column_int(stmt, 0);
684 return E_OK;
685 });
686 EXPECT_EQ(dataCnt, 0);
687 }
688
689 /**
690 * @tc.name: CloudSyncTest002
691 * @tc.desc: sync with same data in one batch
692 * @tc.type: FUNC
693 * @tc.require:
694 * @tc.author: zhangqiquan
695 */
696 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level1)
697 {
698 // prepare data
699 const int actualCount = 1;
700 InsertUserTableRecord(tableName_, actualCount);
701 // sync twice
702 Query query = Query::Select().FromTable({ tableName_ });
703 BlockSync(query, delegate_, g_actualDBStatus);
704 // cloud delete id=0 and insert id=0 but its gid is 1
705 // local delete id=0
706 DeleteCloudTableRecord(0); // cloud gid is 0
707 InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
708 DeleteUserTableRecord(0); // 0 is id
709 BlockSync(query, delegate_, g_actualDBStatus);
710 bool deleteStatus = true;
711 EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
712 EXPECT_EQ(deleteStatus, false);
713 }
714
715 /**
716 * @tc.name: CloudSyncTest003
717 * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
718 * @tc.type: FUNC
719 * @tc.require:
720 * @tc.author: zhangshijie
721 */
722 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level1)
723 {
724 // prepare data
725 const int actualCount = 1;
726 InsertUserTableRecord(tableName_, actualCount);
727
728 InsertCloudTableRecord(0, actualCount, 0, false);
729 // delete local data
730 DeleteUserTableRecord(0);
731 Query query = Query::Select().FromTable({ tableName_ });
732 BlockSync(query, delegate_, g_actualDBStatus);
733
734 // check local data, cloud date will insert into local
735 int dataCnt = -1;
736 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon6d2f4c681102(sqlite3_stmt *stmt) 737 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
738 dataCnt = sqlite3_column_int(stmt, 0);
739 return E_OK;
740 });
741 EXPECT_EQ(dataCnt, actualCount);
742 }
743
744 /**
745 * @tc.name: CloudSyncTest004
746 * @tc.desc: sync after insert failed
747 * @tc.type: FUNC
748 * @tc.require:
749 * @tc.author: zhangqiquan
750 */
751 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level1)
752 {
753 // prepare data
754 const int actualCount = 1;
755 InsertUserTableRecord(tableName_, actualCount);
756 // sync twice
757 Query query = Query::Select().FromTable({ tableName_ });
758 LOGW("Block Sync");
759 virtualCloudDb_->SetInsertFailed(1);
760 BlockSync(query, delegate_, g_actualDBStatus);
761 // delete local data
762 DeleteUserTableRecord(0); // 0 is id
763 LOGW("Block Sync");
764 // sync again and this record with be synced to cloud
765 BlockSync(query, delegate_, g_actualDBStatus);
766 bool deleteStatus = true;
767 EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
768 EXPECT_EQ(deleteStatus, true);
769 }
770
771 /**
772 * @tc.name: CloudSyncTest005
773 * @tc.desc: check device in process after sync
774 * @tc.type: FUNC
775 * @tc.require:
776 * @tc.author: liaoyonghuang
777 */
778 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest005, TestSize.Level1)
779 {
780 /**
781 * @tc.steps:step1. init data and sync
782 * @tc.expected: step1. ok.
783 */
784 const int localCount = 20; // 20 is count of local
785 const int cloudCount = 10; // 10 is count of cloud
786 InsertUserTableRecord(tableName_, localCount);
787 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
788 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
789 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
790
791 /**
792 * @tc.steps:step2. check device name in process
793 * @tc.expected: step2. ok.
794 */
795 Query query = Query::Select().FromTable({tableName_});
__anon6d2f4c681202(const std::map<std::string, SyncProcess> &syncProcess) 796 auto callback = [](const std::map<std::string, SyncProcess> &syncProcess) {
797 EXPECT_TRUE(syncProcess.find("CLOUD") != syncProcess.end());
798 };
799 BlockCompensatedSync(query, delegate_, OK, callback);
800 }
801
InitDataAndSync()802 void DistributedDBCloudCheckSyncTest::InitDataAndSync()
803 {
804 const int localCount = 120; // 120 is count of local
805 const int cloudCount = 100; // 100 is count of cloud
806 InsertUserTableRecord(tableName_, localCount, 0);
807 InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
808 InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false); // 80 is begin sync number
809 }
810
811 /**
812 * @tc.name: CloudSyncTest006
813 * @tc.desc: check reDownload when common sync pause.
814 * @tc.type: FUNC
815 * @tc.require:
816 * @tc.author: luoguo
817 */
818 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level1)
819 {
820 /**
821 * @tc.steps:step1. init data and sync
822 * @tc.expected: step1. ok.
823 */
824 InitDataAndSync();
825
826 /**
827 * @tc.steps:step2. common sync will pause
828 * @tc.expected: step2. ok.
829 */
830 std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
831 Query normalQuery = Query::Select().FromTable({tableNames});
832 std::vector<std::string> idValue = {"0", "1", "2"};
833 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
834 CloudSyncOption option;
835 CloudSyncOption priorityOption;
836 PrepareOption(option, normalQuery, false);
837 PrepareOption(priorityOption, priorityQuery, true);
838 bool isUpload = false;
839 uint32_t blockTime = 2000;
__anon6d2f4c681302(const std::string &tableName, VBucket &extend) 840 virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
841 if (isUpload == false) {
842 isUpload = true;
843 std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
844 }
845 });
846 bool isFinished = false;
847 bool priorityFinish = false;
__anon6d2f4c681402(const std::map<std::string, SyncProcess> &process) 848 auto normalCallback = [&isFinished, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
849 for (const auto &item : process) {
850 if (item.second.process == DistributedDB::FINISHED) {
851 isFinished = true;
852 ASSERT_EQ(priorityFinish, true);
853 }
854 }
855 };
856 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
857
858 /**
859 * @tc.steps:step3. wait common upload and priority sync.
860 * @tc.expected: step3. ok.
861 */
862 while (isUpload == false) {
863 std::this_thread::sleep_for(std::chrono::milliseconds(50));
864 }
__anon6d2f4c681502(const std::map<std::string, SyncProcess> &process) 865 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
866 for (const auto &item : process) {
867 if (item.second.process == DistributedDB::FINISHED) {
868 priorityFinish = true;
869 }
870 }
871 };
872 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
873 while (isFinished == false || priorityFinish == false) {
874 std::this_thread::sleep_for(std::chrono::milliseconds(50));
875 }
876
877 /**
878 * @tc.steps:step4. wait common sync and priority sync finish, check query Times.
879 * @tc.expected: step4. ok.
880 */
881 uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
882 ASSERT_EQ(times, 3u);
883 virtualCloudDb_->ForkUpload(nullptr);
884 }
885
886 /**
887 * @tc.name: CloudSyncTest007
888 * @tc.desc: check process info when version conflict sync process.
889 * @tc.type: FUNC
890 * @tc.require:
891 * @tc.author: luoguo
892 */
893 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level1)
894 {
895 /**
896 * @tc.steps:step1. init data and sync
897 * @tc.expected: step1. ok.
898 */
899 const int localCount = 60;
900 InsertUserTableRecord(tableName_, localCount, 0);
901 Query query = Query::Select().FromTable({tableName_});
902 BlockSync(query, delegate_, g_actualDBStatus);
903
904 /**
905 * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
906 * @tc.expected: step2. ok.
907 */
908 DeleteUserTableRecord(30, 59);
909 bool isUpload = false;
__anon6d2f4c681602(const std::string &tableName, VBucket &extend) 910 virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
911 if (isUpload == false) {
912 isUpload = true;
913 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
914 }
915 });
916 bool isFinished = false;
917 std::map<std::string, TableProcessInfo> retSyncProcess;
__anon6d2f4c681702(const std::map<std::string, SyncProcess> &process) 918 auto normalCallback = [&isFinished, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
919 for (const auto &item : process) {
920 if (item.second.process == DistributedDB::FINISHED) {
921 isFinished = true;
922 ASSERT_EQ(process.empty(), false);
923 auto lastProcess = process.rbegin();
924 retSyncProcess = lastProcess->second.tableProcess;
925 }
926 }
927 };
928
929 /**
930 * @tc.steps:step3. sync.
931 * @tc.expected: step3. ok.
932 */
933 std::vector<std::string> tableNames = {tableName_};
934 Query normalQuery = Query::Select().FromTable({tableNames});
935 CloudSyncOption option;
936 PrepareOption(option, normalQuery, false);
937 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
938
939 /**
940 * @tc.steps:step4. wait upload process and delete 30 record in cloud table.
941 * @tc.expected: step4. ok.
942 */
943 while (isUpload == false) {
944 std::this_thread::sleep_for(std::chrono::milliseconds(50));
945 }
946 DeleteCloudTableRecord(30);
947
948 /**
949 * @tc.steps:step5. wait sync process end and check data.
950 * @tc.expected: step5. ok.
951 */
952 while (isFinished == false) {
953 std::this_thread::sleep_for(std::chrono::milliseconds(50));
954 }
955 ASSERT_EQ(retSyncProcess.empty(), false);
956 auto taskInfo = retSyncProcess.rbegin();
957 ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
958 virtualCloudDb_->ForkUpload(nullptr);
959 }
960
961 /**
962 * @tc.name: CloudSyncTest008
963 * @tc.desc: test when normal sync interrupted by priority sync, process info should be consistent
964 * @tc.type: FUNC
965 * @tc.require:
966 * @tc.author: suyuchen
967 */
968 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest008, TestSize.Level1)
969 {
970 /**
971 * @tc.steps: step1. insert 35 records to user table
972 * @tc.expected: step1. OK.
973 */
974 const int localCount = 35;
975 InsertUserTableRecord(tableName_, localCount, 0);
976 Query query = Query::Select().FromTable({tableName_});
977
978 /**
979 * @tc.steps: step2. Set CLOUD_VERSION_CONFLICT when normal sync task upload
980 * @tc.expected: step2. OK.
981 */
982 int recordIndex = 0;
983 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6d2f4c681802(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 984 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
985 recordIndex++;
986 if (recordIndex == 20) {
987 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
988 std::this_thread::sleep_for(std::chrono::seconds(1));
989 return CLOUD_VERSION_CONFLICT;
990 }
991 return OK;
992 });
993
994 /**
995 * @tc.steps: step3. set callback function for normal sync
996 * @tc.expected: step3. OK.
997 */
998 std::map<std::string, TableProcessInfo> retSyncProcess;
__anon6d2f4c681902(const std::map<std::string, SyncProcess> &process) 999 auto normalCallback = [&retSyncProcess](const std::map<std::string, SyncProcess> &process) {
1000 for (const auto &item : process) {
1001 if (item.second.process == DistributedDB::FINISHED) {
1002 ASSERT_EQ(process.empty(), false);
1003 auto lastProcess = process.rbegin();
1004 retSyncProcess = lastProcess->second.tableProcess;
1005 }
1006 }
1007 };
1008
1009 /**
1010 * @tc.steps: step4. start normal sync
1011 * @tc.expected: step4. OK.
1012 */
1013 CloudSyncOption option;
1014 PrepareOption(option, query, false);
1015 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
__anon6d2f4c681a02() 1016 std::thread syncThread1([&]() {
1017 BlockSync(query, delegate_, g_actualDBStatus);
1018 });
1019
1020 /**
1021 * @tc.steps: step5. start priority sync
1022 * @tc.expected: step5. OK.
1023 */
1024 std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anon6d2f4c681b02() 1025 std::thread syncThread2([&]() {
1026 BlockSync(query, delegate_, g_actualDBStatus, true);
1027 });
1028 syncThread1.join();
1029 syncThread2.join();
1030
1031 /**
1032 * @tc.steps: step6. Check notification of normal sync
1033 * @tc.expected: step6. OK.
1034 */
1035 ASSERT_EQ(retSyncProcess.empty(), false);
1036 auto taskInfo = retSyncProcess.rbegin();
1037 ASSERT_EQ(taskInfo->second.upLoadInfo.total, 35u);
1038 ASSERT_EQ(taskInfo->second.upLoadInfo.successCount, 35u);
1039 ASSERT_EQ(taskInfo->second.upLoadInfo.failCount, 0u);
1040
1041 virtualCloudDb_->ForkInsertConflict(nullptr);
1042 }
1043
1044 /**
1045 * @tc.name: CloudSyncTest009
1046 * @tc.desc: reopen database and sync
1047 * @tc.type: FUNC
1048 * @tc.require:
1049 * @tc.author: wangxiangdong
1050 */
1051 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level1)
1052 {
1053 /**
1054 * @tc.steps: step1. insert 1 record to user table
1055 * @tc.expected: step1. OK.
1056 */
1057 const int actualCount = 1;
1058 InsertUserTableRecord(tableName_, actualCount);
1059 /**
1060 * @tc.steps: step2. sync data to cloud
1061 * @tc.expected: step2. OK.
1062 */
1063 Query query = Query::Select().FromTable({ tableName_ });
1064 BlockSync(query, delegate_, g_actualDBStatus);
1065 CheckCloudTableCount(tableName_, 1);
1066 /**
1067 * @tc.steps: step3. drop data table then close db
1068 * @tc.expected: step3. OK.
1069 */
1070 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1071 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1072 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1073 delegate_ = nullptr;
1074 /**
1075 * @tc.steps: step4. recreate data table and reopen database
1076 * @tc.expected: step4. OK.
1077 */
1078 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1079 RelationalStoreDelegate::Option option;
1080 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1081 ASSERT_NE(delegate_, nullptr);
1082 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1083 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1084 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1085 DataBaseSchema dataBaseSchema = GetSchema();
1086 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1087 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1088 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1089 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1090 /**
1091 * @tc.steps: step5. sync and cloud data should be deleted
1092 * @tc.expected: step5. OK.
1093 */
1094 BlockSync(query, delegate_, g_actualDBStatus);
1095 CheckCloudTableCount(tableName_, 0);
1096 }
1097
1098 /**
1099 * @tc.name: CloudSyncTest010
1100 * @tc.desc: reopen database, recreate table with less columns and sync
1101 * @tc.type: FUNC
1102 * @tc.require:
1103 * @tc.author: wangxiangdong
1104 */
1105 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level1)
1106 {
1107 /**
1108 * @tc.steps: step1. insert 1 record to user table
1109 * @tc.expected: step1. OK.
1110 */
1111 const int actualCount = 1;
1112 InsertUserTableRecord(tableName_, actualCount);
1113 /**
1114 * @tc.steps: step2. sync data to cloud
1115 * @tc.expected: step2. OK.
1116 */
1117 Query query = Query::Select().FromTable({ tableName_ });
1118 BlockSync(query, delegate_, g_actualDBStatus);
1119 CheckCloudTableCount(tableName_, 1);
1120 /**
1121 * @tc.steps: step3. drop data table then close db
1122 * @tc.expected: step3. OK.
1123 */
1124 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1125 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1126 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1127 delegate_ = nullptr;
1128 /**
1129 * @tc.steps: step4. recreate data table and reopen database
1130 * @tc.expected: step4. OK.
1131 */
1132 std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
1133 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
1134 RelationalStoreDelegate::Option option;
1135 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1136 ASSERT_NE(delegate_, nullptr);
1137 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1138 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1139 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1140 DataBaseSchema dataBaseSchema = GetSchema();
1141 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1142 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1143 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1144 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1145 /**
1146 * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1147 * @tc.expected: step5. OK.
1148 */
1149 BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1150 CheckCloudTableCount(tableName_, 1);
1151 }
1152
1153 /**
1154 * @tc.name: CloudSyncTest011
1155 * @tc.desc: reopen database, do not recreate table and sync
1156 * @tc.type: FUNC
1157 * @tc.require:
1158 * @tc.author: wangxiangdong
1159 */
1160 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest011, TestSize.Level1)
1161 {
1162 /**
1163 * @tc.steps: step1. insert 1 record to user table
1164 * @tc.expected: step1. OK.
1165 */
1166 const int actualCount = 1;
1167 InsertUserTableRecord(tableName_, actualCount);
1168 /**
1169 * @tc.steps: step2. sync data to cloud
1170 * @tc.expected: step2. OK.
1171 */
1172 Query query = Query::Select().FromTable({ tableName_ });
1173 BlockSync(query, delegate_, g_actualDBStatus);
1174 CheckCloudTableCount(tableName_, 1);
1175 /**
1176 * @tc.steps: step3. drop data table then close db
1177 * @tc.expected: step3. OK.
1178 */
1179 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1180 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1181 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1182 delegate_ = nullptr;
1183 /**
1184 * @tc.steps: step4. reopen database
1185 * @tc.expected: step4. OK.
1186 */
1187 RelationalStoreDelegate::Option option;
1188 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1189 ASSERT_NE(delegate_, nullptr);
1190 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1191 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1192 DataBaseSchema dataBaseSchema = GetSchema();
1193 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1194 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1195 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1196 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1197 /**
1198 * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1199 * @tc.expected: step5. OK.
1200 */
1201 BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1202 }
1203
1204 /**
1205 * @tc.name: CloudSyncTest012
1206 * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1207 * @tc.type: FUNC
1208 * @tc.require:
1209 * @tc.author: wangxiangdong
1210 */
1211 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest012, TestSize.Level1)
1212 {
1213 /**
1214 * @tc.steps: step1. insert 1 record to user table
1215 * @tc.expected: step1. OK.
1216 */
1217 const int actualCount = 1;
1218 InsertUserTableRecord(tableName_, actualCount);
1219 /**
1220 * @tc.steps: step2. sync data to cloud
1221 * @tc.expected: step2. OK.
1222 */
1223 Query query = Query::Select().FromTable({ tableName_ });
1224 BlockSync(query, delegate_, g_actualDBStatus);
1225 CheckCloudTableCount(tableName_, 1);
1226 /**
1227 * @tc.steps: step3. drop data table then close db
1228 * @tc.expected: step3. OK.
1229 */
1230 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1231 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1232 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1233 delegate_ = nullptr;
1234 /**
1235 * @tc.steps: step4. recreate data table and reopen database
1236 * @tc.expected: step4. OK.
1237 */
1238 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1239 RelationalStoreDelegate::Option option;
1240 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1241 ASSERT_NE(delegate_, nullptr);
1242 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1243 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1244 DataBaseSchema dataBaseSchema = GetSchema();
1245 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1246 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1247 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1248 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1249 /**
1250 * @tc.steps: step5. insert data to new table
1251 * @tc.expected: step5. OK.
1252 */
1253 int begin = 1;
1254 InsertUserTableRecord(tableName_, actualCount, begin);
1255 /**
1256 * @tc.steps: step6. sync and cloud data should be deleted
1257 * @tc.expected: step6. OK.
1258 */
1259 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1260 BlockSync(query, delegate_, g_actualDBStatus);
1261 CheckCloudTableCount(tableName_, 1);
1262 }
1263
1264 /**
1265 * @tc.name: CloudSyncTest013
1266 * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1267 * @tc.type: FUNC
1268 * @tc.require:
1269 * @tc.author: tankaisheng
1270 */
1271 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest013, TestSize.Level0)
1272 {
1273 /**
1274 * @tc.steps: step1. insert 1 record to user table
1275 * @tc.expected: step1. OK.
1276 */
1277 const int actualCount = 10;
1278 InsertUserTableRecord(tableName_, actualCount);
1279 /**
1280 * @tc.steps: step2. sync data to cloud
1281 * @tc.expected: step2. OK.
1282 */
1283 Query query = Query::Select().FromTable({ tableName_ });
1284 BlockSync(query, delegate_, g_actualDBStatus);
1285 CheckCloudTableCount(tableName_, 10);
1286 /**
1287 * @tc.steps: step3. drop data table then close db
1288 * @tc.expected: step3. OK.
1289 */
1290 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1291 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1292 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1293 delegate_ = nullptr;
1294 /**
1295 * @tc.steps: step4. recreate data table and reopen database
1296 * @tc.expected: step4. OK.
1297 */
1298 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1299 RelationalStoreDelegate::Option option;
1300 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1301 ASSERT_NE(delegate_, nullptr);
1302 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1303 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1304 DataBaseSchema dataBaseSchema = GetSchema();
1305 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1306 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1307 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1308 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1309 /**
1310 * @tc.steps: step5. insert data to new table
1311 * @tc.expected: step5. OK.
1312 */
1313 int begin = 0;
1314 InsertUserTableRecord(tableName_, actualCount, begin);
1315 /**
1316 * @tc.steps: step6. sync and cloud data should be deleted
1317 * @tc.expected: step6. OK.
1318 */
1319 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1320 BlockSync(query, delegate_, g_actualDBStatus);
1321 CheckCloudTableCount(tableName_, 10);
1322 }
1323
1324 /**
1325 * @tc.name: CloudSyncObserverTest001
1326 * @tc.desc: test cloud sync multi observer
1327 * @tc.type: FUNC
1328 * @tc.require:
1329 * @tc.author: zhangshijie
1330 */
1331 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level1)
1332 {
1333 // prepare data
1334 const int actualCount = 10;
1335 InsertUserTableRecord(tableName_, actualCount);
1336
1337 /**
1338 * @tc.steps:step1. open two delegate with two observer.
1339 * @tc.expected: step1. ok.
1340 */
1341 RelationalStoreDelegate::Option option;
1342 auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
1343 ASSERT_NE(observer1, nullptr);
1344 option.observer = observer1;
1345 RelationalStoreDelegate *delegate1 = nullptr;
1346 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1347 ASSERT_NE(delegate1, nullptr);
1348
1349 auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
1350 ASSERT_NE(observer2, nullptr);
1351 option.observer = observer2;
1352 RelationalStoreDelegate *delegate2 = nullptr;
1353 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
1354 ASSERT_NE(delegate2, nullptr);
1355
1356 /**
1357 * @tc.steps:step2. insert 1-10 cloud data, start.
1358 * @tc.expected: step2. ok.
1359 */
1360 InsertCloudTableRecord(0, actualCount, actualCount, false);
1361 Query query = Query::Select().FromTable({ tableName_ });
1362 BlockSync(query, delegate_, g_actualDBStatus);
1363
1364 /**
1365 * @tc.steps:step3. check observer.
1366 * @tc.expected: step3. ok.
1367 */
1368 EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1369 EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1370
1371 /**
1372 * @tc.steps:step4. insert 11-20 cloud data, start.
1373 * @tc.expected: step4. ok.
1374 */
1375 delegate2->UnRegisterObserver();
1376 observer2->ResetCloudSyncToZero();
1377 int64_t begin = 11;
1378 InsertCloudTableRecord(begin, actualCount, actualCount, false);
1379 BlockSync(query, delegate_, g_actualDBStatus);
1380
1381 /**
1382 * @tc.steps:step5. check observer.
1383 * @tc.expected: step5. ok.
1384 */
1385 EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1386 EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1387
1388 delete observer1;
1389 observer1 = nullptr;
1390 EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1391
1392 delete observer2;
1393 observer2 = nullptr;
1394 EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1395 }
1396
1397 /**
1398 * @tc.name: CloudPrioritySyncTest001
1399 * @tc.desc: use priority sync interface when query in or from table
1400 * @tc.type: FUNC
1401 * @tc.require:
1402 * @tc.author: chenchaohao
1403 */
1404 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level1)
1405 {
1406 /**
1407 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1408 * @tc.expected: step1. ok.
1409 */
1410 const int actualCount = 10; // 10 is count of records
1411 InsertUserTableRecord(tableName_, actualCount);
1412 std::vector<std::string> idValue = {"0", "1", "2"};
1413 Query query = Query::Select().From(tableName_).In("id", idValue);
1414
1415 /**
1416 * @tc.steps:step2. check ParserQueryNodes
1417 * @tc.expected: step2. ok.
1418 */
__anon6d2f4c681c02(const std::string &tableName, VBucket &extend) 1419 virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1420 EXPECT_EQ(tableName_, tableName);
1421 if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1422 return;
1423 }
1424 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1425 DBStatus status = OK;
1426 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1427 EXPECT_EQ(status, OK);
1428 ASSERT_EQ(queryNodes.size(), 1u);
1429 EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1430 EXPECT_EQ(queryNodes[0].fieldName, "id");
1431 ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1432 for (size_t i = 0u; i < idValue.size(); i++) {
1433 std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1434 EXPECT_EQ(val, idValue[i]);
1435 }
1436 });
1437 BlockPrioritySync(query, delegate_, true, OK);
1438 virtualCloudDb_->ForkQuery(nullptr);
1439 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1440
1441 /**
1442 * @tc.steps:step3. use priority sync interface but not priority.
1443 * @tc.expected: step3. ok.
1444 */
1445 query = Query::Select().FromTable({ tableName_ });
1446 BlockPrioritySync(query, delegate_, false, OK);
1447 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1448
1449 /**
1450 * @tc.steps:step4. insert user table record and query from table, then priority sync.
1451 * @tc.expected: step4. ok.
1452 */
1453 InsertUserTableRecord(tableName_, actualCount, actualCount);
1454 BlockPrioritySync(query, delegate_, true, OK);
1455 CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1456 }
1457
1458
1459 /**
1460 * @tc.name: CloudPrioritySyncTest002
1461 * @tc.desc: priority sync in some abnormal query situations
1462 * @tc.type: FUNC
1463 * @tc.require:
1464 * @tc.author: chenchaohao
1465 */
1466 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level1)
1467 {
1468 /**
1469 * @tc.steps:step1. insert user table record.
1470 * @tc.expected: step1. ok.
1471 */
1472 const int actualCount = 1; // 1 is count of records
1473 InsertUserTableRecord(tableName_, actualCount);
1474
1475 /**
1476 * @tc.steps:step2. query select tablename then priority sync.
1477 * @tc.expected: step2. invalid.
1478 */
1479 Query query = Query::Select(tableName_);
1480 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1481 CheckCloudTableCount(tableName_, 0);
1482
1483 /**
1484 * @tc.steps:step3. query select without from then priority sync.
1485 * @tc.expected: step3. invalid.
1486 */
1487 query = Query::Select();
1488 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1489 CheckCloudTableCount(tableName_, 0);
1490
1491 /**
1492 * @tc.steps:step4. query select and from without in then priority sync.
1493 * @tc.expected: step4. invalid.
1494 */
1495 query = Query::Select().From(tableName_);
1496 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1497 CheckCloudTableCount(tableName_, 0);
1498
1499 /**
1500 * @tc.steps:step5. query select and fromtable then priority sync.
1501 * @tc.expected: step5. not support.
1502 */
1503 query = Query::Select().From(tableName_).FromTable({tableName_});
1504 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1505 CheckCloudTableCount(tableName_, 0);
1506
1507 /**
1508 * @tc.steps:step6. query select and from with other predicates then priority sync.
1509 * @tc.expected: step6. not support.
1510 */
1511 query = Query::Select().From(tableName_).IsNotNull("id");
1512 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1513 CheckCloudTableCount(tableName_, 0);
1514
1515 /**
1516 * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1517 * @tc.expected: step7 not support.
1518 */
1519 std::vector<std::string> idValue = {"0"};
1520 query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1521 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1522 CheckCloudTableCount(tableName_, 0);
1523
1524 /**
1525 * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1526 * @tc.expected: step8. not support.
1527 */
1528 std::vector<std::string> heightValue = {"155.10"};
1529 query = Query::Select().From(tableName_).In("height", heightValue);
1530 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1531 CheckCloudTableCount(tableName_, 0);
1532
1533 /**
1534 * @tc.steps:step9. query in count greater than 100.
1535 * @tc.expected: step9. over max limits.
1536 */
1537 idValue.resize(101); // 101 > 100
1538 query = Query::Select().From(tableName_).In("id", idValue);
1539 BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1540 CheckCloudTableCount(tableName_, 0);
1541 }
1542
1543 /**
1544 * @tc.name: CloudPrioritySyncTest003
1545 * @tc.desc: priority sync when normal syncing
1546 * @tc.type: FUNC
1547 * @tc.require:
1548 * @tc.author: chenchaohao
1549 */
1550 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level1)
1551 {
1552 /**
1553 * @tc.steps:step1. insert user table record.
1554 * @tc.expected: step1. ok.
1555 */
1556 const int actualCount = 10; // 10 is count of records
1557 InsertUserTableRecord(tableName_, actualCount);
1558
1559 /**
1560 * @tc.steps:step2. begin normal sync and priority sync.
1561 * @tc.expected: step2. ok.
1562 */
1563 Query normalQuery = Query::Select().FromTable({tableName_});
1564 std::vector<std::string> idValue = {"0", "1", "2"};
1565 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1566 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1567 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1568 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1569 virtualCloudDb_->Reset();
1570 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1571 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1572 }
1573
1574 /**
1575 * @tc.name: CloudPrioritySyncTest004
1576 * @tc.desc: non-primarykey table priority sync
1577 * @tc.type: FUNC
1578 * @tc.require:
1579 * @tc.author: chenchaohao
1580 */
1581 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level1)
1582 {
1583 /**
1584 * @tc.steps:step1. insert user non-primarykey table record.
1585 * @tc.expected: step1. ok.
1586 */
1587 const int actualCount = 10; // 10 is count of records
1588 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1589
1590 /**
1591 * @tc.steps:step2. begin priority sync.
1592 * @tc.expected: step2. not support.
1593 */
1594 std::vector<std::string> idValue = {"0", "1", "2"};
1595 Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1596 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1597 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1598
1599 /**
1600 * @tc.steps:step3. begin priority sync when in rowid.
1601 * @tc.expected: step3. invalid.
1602 */
1603 std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1604 query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1605 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1606 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1607 }
1608
1609 /**
1610 * @tc.name: CloudPrioritySyncTest005
1611 * @tc.desc: priority sync but don't have records
1612 * @tc.type: FUNC
1613 * @tc.require:
1614 * @tc.author: chenchaohao
1615 */
1616 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level1)
1617 {
1618 /**
1619 * @tc.steps:step1. insert user non-primarykey table record.
1620 * @tc.expected: step1. ok.
1621 */
1622 const int actualCount = 10; // 10 is count of records
1623 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1624
1625 /**
1626 * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1627 * @tc.expected: step2. ok.
1628 */
1629 std::vector<std::string> idValue = {"0", "1", "2"};
1630 Query query = Query::Select().From(tableName_).In("id", idValue);
1631 BlockPrioritySync(query, delegate_, true, OK);
1632 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1633 CheckCloudTableCount(tableName_, 0);
1634 }
1635
1636 /**
1637 * @tc.name: CloudPrioritySyncTest006
1638 * @tc.desc: priority sync tasks greater than limit
1639 * @tc.type: FUNC
1640 * @tc.require:
1641 * @tc.author: chenchaohao
1642 */
1643 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level1)
1644 {
1645 /**
1646 * @tc.steps:step1. insert user table record.
1647 * @tc.expected: step1. ok.
1648 */
1649 const int actualCount = 10; // 10 is count of records
1650 InsertUserTableRecord(tableName_, actualCount);
1651
1652 /**
1653 * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1654 * @tc.expected: step2. ok.
1655 */
1656 std::vector<std::string> idValue = {"0", "1", "2"};
1657 Query query = Query::Select().From(tableName_).In("id", idValue);
1658 std::mutex dataMutex;
1659 std::condition_variable cv;
1660 std::mutex callbackMutex;
1661 std::condition_variable callbackCv;
1662 bool finish = false;
1663 size_t finishCount = 0u;
__anon6d2f4c681d02(const std::string &tableName, VBucket &extend) 1664 virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1665 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1666 cv.wait(uniqueLock, [&finish]() {
1667 return finish;
1668 });
1669 });
__anon6d2f4c681f02(const std::map<std::string, SyncProcess> &process) 1670 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1671 for (const auto &item: process) {
1672 if (item.second.process == DistributedDB::FINISHED) {
1673 {
1674 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1675 finishCount++;
1676 }
1677 callbackCv.notify_one();
1678 }
1679 }
1680 };
1681 CloudSyncOption option;
1682 PrepareOption(option, query, true);
1683 for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1684 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1685 }
1686 ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1687 {
1688 std::lock_guard<std::mutex> autoLock(dataMutex);
1689 finish = true;
1690 }
1691 cv.notify_all();
1692 virtualCloudDb_->ForkQuery(nullptr);
1693 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682002() 1694 callbackCv.wait(callbackLock, [&finishCount]() {
1695 return (finishCount == 32u); // 32 is count of finished sync tasks
1696 });
1697 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1698 }
1699
1700 /**
1701 * @tc.name: CloudPrioritySyncTest007
1702 * @tc.desc: priority normal priority normal when different query
1703 * @tc.type: FUNC
1704 * @tc.require:
1705 * @tc.author: chenchaohao
1706 */
1707 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level1)
1708 {
1709 /**
1710 * @tc.steps:step1. insert user table record.
1711 * @tc.expected: step1. ok.
1712 */
1713 const int actualCount = 10; // 10 is count of records
1714 InsertUserTableRecord(tableName_, actualCount);
1715
1716 /**
1717 * @tc.steps:step2. set callback to check during sync.
1718 * @tc.expected: step2. ok.
1719 */
1720 std::atomic<int> count = 0;
1721 SetForkQueryForCloudPrioritySyncTest007(count);
1722
1723 /**
1724 * @tc.steps:step3. perform priority normal priority normal sync.
1725 * @tc.expected: step3. ok.
1726 */
1727 std::vector<std::string> idValue = {"0"};
1728 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1729 CloudSyncOption option;
1730 PrepareOption(option, priorytyQuery, true);
1731 option.lockAction = static_cast<LockAction>(0xff); // lock all
1732 std::mutex callbackMutex;
1733 std::condition_variable callbackCv;
1734 size_t finishCount = 0u;
__anon6d2f4c682102(const std::map<std::string, SyncProcess> &process) 1735 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1736 for (const auto &item: process) {
1737 if (item.second.process == DistributedDB::FINISHED) {
1738 {
1739 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1740 finishCount++;
1741 }
1742 callbackCv.notify_one();
1743 }
1744 }
1745 };
1746 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1747 Query normalQuery = Query::Select().FromTable({tableName_});
1748 PrepareOption(option, normalQuery, false);
1749 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1750 idValue = {"1"};
1751 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1752 PrepareOption(option, priorytyQuery, true);
1753 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1754 PrepareOption(option, normalQuery, false);
1755 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1756 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682202() 1757 callbackCv.wait(callbackLock, [&finishCount]() {
1758 return (finishCount == 4u); // 4 is count of finished sync tasks
1759 });
1760 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1761 }
1762
1763 /**
1764 * @tc.name: CloudPrioritySyncTest008
1765 * @tc.desc: priority normal priority normal when different query
1766 * @tc.type: FUNC
1767 * @tc.require:
1768 * @tc.author: chenchaohao
1769 */
1770 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level1)
1771 {
1772 /**
1773 * @tc.steps:step1. insert user table record.
1774 * @tc.expected: step1. ok.
1775 */
1776 const int actualCount = 10; // 10 is count of records
1777 InsertUserTableRecord(tableName_, actualCount);
1778
1779 /**
1780 * @tc.steps:step2. set callback to check during sync.
1781 * @tc.expected: step2. ok.
1782 */
1783 std::atomic<int> count = 0;
1784 SetForkQueryForCloudPrioritySyncTest008(count);
1785
1786 /**
1787 * @tc.steps:step3. perform priority normal priority normal sync.
1788 * @tc.expected: step3. ok.
1789 */
1790 std::vector<std::string> idValue = {"0"};
1791 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1792 CloudSyncOption option;
1793 option.lockAction = static_cast<LockAction>(0xff); // lock all
1794 PrepareOption(option, priorytyQuery, true);
1795 std::mutex callbackMutex;
1796 std::condition_variable callbackCv;
1797 size_t finishCount = 0u;
__anon6d2f4c682302(const std::map<std::string, SyncProcess> &process) 1798 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1799 for (const auto &item: process) {
1800 if (item.second.process == DistributedDB::FINISHED) {
1801 {
1802 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1803 finishCount++;
1804 }
1805 callbackCv.notify_one();
1806 }
1807 }
1808 };
1809 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1810 Query normalQuery = Query::Select().FromTable({tableName_});
1811 PrepareOption(option, normalQuery, false);
1812 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1813 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1814 PrepareOption(option, priorytyQuery, true);
1815 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1816 PrepareOption(option, normalQuery, false);
1817 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1818 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682402() 1819 callbackCv.wait(callbackLock, [&finishCount]() {
1820 return (finishCount == 4u); // 4 is count of finished sync tasks
1821 });
1822 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1823 }
1824
1825 /**
1826 * @tc.name: CloudPrioritySyncTest009
1827 * @tc.desc: use priority sync interface when query equal to from table
1828 * @tc.type: FUNC
1829 * @tc.require:
1830 * @tc.author: zhangqiquan
1831 */
1832 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level1)
1833 {
1834 /**
1835 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1836 * @tc.expected: step1. ok.
1837 */
1838 const int actualCount = 5; // 5 is count of records
1839 InsertUserTableRecord(tableName_, actualCount);
1840 Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1841
1842 /**
1843 * @tc.steps:step2. check ParserQueryNodes
1844 * @tc.expected: step2. ok.
1845 */
__anon6d2f4c682502(const std::string &tableName, VBucket &extend) 1846 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1847 EXPECT_EQ(tableName_, tableName);
1848 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1849 DBStatus status = OK;
1850 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1851 EXPECT_EQ(status, OK);
1852 ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1853 });
1854 BlockPrioritySync(query, delegate_, true, OK);
1855 virtualCloudDb_->ForkQuery(nullptr);
1856 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1857 }
1858
1859 /**
1860 * @tc.name: CloudPrioritySyncTest010
1861 * @tc.desc: priority sync after cloud delete
1862 * @tc.type: FUNC
1863 * @tc.require:
1864 * @tc.author: chenchaohao
1865 */
1866 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level1)
1867 {
1868 /**
1869 * @tc.steps:step1. insert user table record.
1870 * @tc.expected: step1. ok.
1871 */
1872 const int actualCount = 10; // 10 is count of records
1873 InsertUserTableRecord(tableName_, actualCount);
1874
1875 /**
1876 * @tc.steps:step2. normal sync and then delete cloud records.
1877 * @tc.expected: step2. ok.
1878 */
1879 Query query = Query::Select().FromTable({tableName_});
1880 BlockSync(query, delegate_, g_actualDBStatus);
1881 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1882 DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1883 CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1884 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1885
1886 /**
1887 * @tc.steps:step3. priory sync and set query then check user table records.
1888 * @tc.expected: step3. ok.
1889 */
1890 std::vector<std::string> idValue = {"3", "4", "5"};
1891 query = Query::Select().From(tableName_).In("id", idValue);
1892 BlockPrioritySync(query, delegate_, true, OK);
1893 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1894 idValue = {"0", "1", "2"};
1895 query = Query::Select().From(tableName_).In("id", idValue);
1896 BlockPrioritySync(query, delegate_, true, OK);
1897 CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1898 }
1899
1900 /**
1901 * @tc.name: CloudPrioritySyncTest011
1902 * @tc.desc: priority sync after cloud insert
1903 * @tc.type: FUNC
1904 * @tc.require:
1905 * @tc.author: chenchaohao
1906 */
1907 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level1)
1908 {
1909 /**
1910 * @tc.steps:step1. insert cloud table record.
1911 * @tc.expected: step1. ok.
1912 */
1913 const int actualCount = 10; // 10 is count of records
1914 InsertCloudTableRecord(0, actualCount, actualCount, false);
1915 std::vector<std::string> idValue = {"0", "1", "2"};
1916 Query query = Query::Select().From(tableName_).In("id", idValue);
1917 std::atomic<int> count = 0;
1918
1919 /**
1920 * @tc.steps:step2. check user records when query.
1921 * @tc.expected: step1. ok.
1922 */
__anon6d2f4c682602(const std::string &, VBucket &) 1923 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1924 count++;
1925 if (count == 1) { // taskid1
1926 std::this_thread::sleep_for(std::chrono::seconds(1));
1927 }
1928 if (count == 2) { // taskid2
1929 CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1930 }
1931 });
1932 CloudSyncOption option;
1933 PrepareOption(option, query, true);
1934 std::mutex callbackMutex;
1935 std::condition_variable callbackCv;
1936 size_t finishCount = 0u;
__anon6d2f4c682702(const std::map<std::string, SyncProcess> &process) 1937 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1938 for (const auto &item: process) {
1939 if (item.second.process == DistributedDB::FINISHED) {
1940 {
1941 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1942 finishCount++;
1943 }
1944 callbackCv.notify_one();
1945 }
1946 }
1947 };
1948
1949 /**
1950 * @tc.steps:step3. begin sync and check user record.
1951 * @tc.expected: step3. ok.
1952 */
1953 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1954 idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1955 query = Query::Select().From(tableName_).In("id", idValue);
1956 PrepareOption(option, query, true);
1957 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1958 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon6d2f4c682802() 1959 callbackCv.wait(callbackLock, [&finishCount]() {
1960 return (finishCount == 2u); // 2 is count of finished sync tasks
1961 });
1962 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1963 }
1964
1965 /**
1966 * @tc.name: CloudPrioritySyncTest012
1967 * @tc.desc: priority or normal sync when waittime > 300s or < -1
1968 * @tc.type: FUNC
1969 * @tc.require:
1970 * @tc.author: chenchaohao
1971 */
1972 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level1)
1973 {
1974 /**
1975 * @tc.steps:step1. insert cloud table record.
1976 * @tc.expected: step1. ok.
1977 */
1978 const int actualCount = 10; // 10 is count of records
1979 InsertCloudTableRecord(0, actualCount, actualCount, false);
1980 std::vector<std::string> idValue = {"0", "1", "2"};
1981 Query query = Query::Select().From(tableName_).In("id", idValue);
1982
1983 /**
1984 * @tc.steps:step2. set waittime < -1 then begin sync.
1985 * @tc.expected: step2. invalid.
1986 */
1987 CloudSyncOption option;
1988 PrepareOption(option, query, true);
1989 option.waitTime = -2; // -2 < -1;
1990 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1991 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1992
1993 /**
1994 * @tc.steps:step3. set waittime > 300s then begin sync.
1995 * @tc.expected: step3. invalid.
1996 */
1997
1998 option.waitTime = 300001; // 300001 > 300s
1999 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
2000 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
2001 }
2002
2003 /**
2004 * @tc.name: CloudPrioritySyncTest013
2005 * @tc.desc: priority sync in some abnormal composite pk query situations
2006 * @tc.type: FUNC
2007 * @tc.require:
2008 * @tc.author: chenchaohao
2009 */
2010 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level1)
2011 {
2012 /**
2013 * @tc.steps:step1. insert user table record.
2014 * @tc.expected: step1. ok.
2015 */
2016 const int actualCount = 1; // 1 is count of records
2017 InsertUserTableRecord(tableName_, actualCount);
2018
2019 /**
2020 * @tc.steps:step2. query only begingroup then priority sync.
2021 * @tc.expected: step2. invalid.
2022 */
2023 Query query = Query::Select().From(tableName_).BeginGroup();
2024 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2025 CheckCloudTableCount(tableName_, 0);
2026
2027 /**
2028 * @tc.steps:step3. query only endgroup then priority sync.
2029 * @tc.expected: step3. invalid.
2030 */
2031 query = Query::Select().From(tableName_).EndGroup();
2032 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2033 CheckCloudTableCount(tableName_, 0);
2034
2035 /**
2036 * @tc.steps:step4. query only begingroup and endgroup then priority sync.
2037 * @tc.expected: step4. invalid.
2038 */
2039 query = Query::Select().From(tableName_).BeginGroup().EndGroup();
2040 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2041 CheckCloudTableCount(tableName_, 0);
2042
2043 /**
2044 * @tc.steps:step5. query and from table then priority sync.
2045 * @tc.expected: step5. invalid.
2046 */
2047 query = Query::Select().And().From(tableName_);
2048 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2049 CheckCloudTableCount(tableName_, 0);
2050
2051 /**
2052 * @tc.steps:step6. query or from table then priority sync.
2053 * @tc.expected: step6. invalid.
2054 */
2055 query = Query::Select().Or().From(tableName_);
2056 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2057 CheckCloudTableCount(tableName_, 0);
2058
2059 /**
2060 * @tc.steps:step7. query begingroup from table then priority sync.
2061 * @tc.expected: step7 invalid.
2062 */
2063 query = Query::Select().BeginGroup().From(tableName_);
2064 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2065 CheckCloudTableCount(tableName_, 0);
2066
2067 /**
2068 * @tc.steps:step8. query endgroup from table then priority sync.
2069 * @tc.expected: step8 invalid.
2070 */
2071 query = Query::Select().EndGroup().From(tableName_);
2072 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2073 CheckCloudTableCount(tableName_, 0);
2074
2075 /**
2076 * @tc.steps:step9. query and in then priority sync.
2077 * @tc.expected: step9. invalid.
2078 */
2079 std::vector<std::string> idValue = {"0"};
2080 query = Query::Select().From(tableName_).And().In("id", idValue);
2081 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2082 CheckCloudTableCount(tableName_, 0);
2083
2084 /**
2085 * @tc.steps:step10. query when the table name does not exit then priority sync.
2086 * @tc.expected: step10. schema mismatch.
2087 */
2088 query = Query::Select().From("tableName").And().In("id", idValue);
2089 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2090 CheckCloudTableCount(tableName_, 0);
2091
2092 /**
2093 * @tc.steps:step11. query when the table name does not exit then priority sync.
2094 * @tc.expected: step11. schema mismatch.
2095 */
2096 query = Query::Select().From("tableName").In("id", idValue);
2097 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2098 CheckCloudTableCount(tableName_, 0);
2099
2100 /**
2101 * @tc.steps:step12. query when the table name does not exit then sync.
2102 * @tc.expected: step12. schema mismatch.
2103 */
2104 query = Query::Select().FromTable({"tableName"});
2105 BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
2106 CheckCloudTableCount(tableName_, 0);
2107 }
2108
CheckUploadInfoAfterSync(int recordCount,SyncProcess & normalLast)2109 void DistributedDBCloudCheckSyncTest::CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast)
2110 {
2111 uint32_t uintRecordCount = static_cast<uint32_t>(recordCount);
2112 const Info expectUploadInfo = {2u, uintRecordCount, uintRecordCount, 0u, uintRecordCount, 0u, 0u};
2113 for (const auto &table : normalLast.tableProcess) {
2114 CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2115 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2116 }
2117 }
2118
2119 /**
2120 * @tc.name: CloudPrioritySyncTest014
2121 * @tc.desc: Check the uploadInfo after the normal sync is paused by the priority sync
2122 * @tc.type: FUNC
2123 * @tc.require:
2124 * @tc.author: suyue
2125 */
2126 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest014, TestSize.Level1)
2127 {
2128 /**
2129 * @tc.steps:step1. insert data and sync pause.
2130 * @tc.expected: step1. ok.
2131 */
2132 const int recordCount = 50; // 50 is count of data records
2133 InsertUserTableRecord(tableName_, recordCount, 0);
2134 Query normalQuery = Query::Select().FromTable({tableName_});
2135 CloudSyncOption normalOption;
2136 PrepareOption(normalOption, normalQuery, false);
2137 bool isUpload = false;
2138 uint32_t blockTime = 1000;
__anon6d2f4c682902(const std::string &tableName, VBucket &extend) 2139 virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
2140 if (isUpload == false) {
2141 isUpload = true;
2142 std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
2143 }
2144 });
2145 bool isFinish = false;
2146 bool priorityFinish = false;
2147 SyncProcess normalLast;
__anon6d2f4c682a02(const std::map<std::string, SyncProcess> &process) 2148 auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2149 for (const auto &item : process) {
2150 if (item.second.process == DistributedDB::FINISHED) {
2151 isFinish = true;
2152 ASSERT_EQ(priorityFinish, true);
2153 normalLast = item.second;
2154 }
2155 }
2156 };
2157 ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2158
2159 /**
2160 * @tc.steps:step2. priority sync.
2161 * @tc.expected: step2. ok.
2162 */
2163 while (isUpload == false) {
2164 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2165 }
2166 std::vector<std::string> idValues = {"0", "1", "2", "3", "4"};
2167 Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2168 CloudSyncOption priorityOption;
2169 PrepareOption(priorityOption, priorityQuery, true);
__anon6d2f4c682b02(const std::map<std::string, SyncProcess> &process) 2170 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2171 for (const auto &item : process) {
2172 if (item.second.process == DistributedDB::FINISHED) {
2173 priorityFinish = true;
2174 }
2175 }
2176 };
2177 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2178 while (isFinish == false || priorityFinish == false) {
2179 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2180 }
2181
2182 /**
2183 * @tc.steps:step3. check uploadInfo after sync finished.
2184 * @tc.expected: step3. ok.
2185 */
2186 CheckUploadInfoAfterSync(recordCount, normalLast);
2187 virtualCloudDb_->ForkUpload(nullptr);
2188 }
2189
2190 /**
2191 * @tc.name: CloudPrioritySyncTest015
2192 * @tc.desc: Check the uploadInfo and the downloadInfo after the normal sync is paused by the priority sync
2193 * @tc.type: FUNC
2194 * @tc.require:
2195 * @tc.author: caihaoting
2196 */
2197 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest015, TestSize.Level1)
2198 {
2199 /**
2200 * @tc.steps:step1. insert data and sync pause.
2201 * @tc.expected: step1. ok.
2202 */
2203 const int localCount = 10; // 10 is count of local data records
2204 const int cloudCount = 50; // 50 is count of cloud data records
2205 InsertUserTableRecord(tableName_, localCount, 0);
2206 InsertCloudTableRecord(20, cloudCount, 0, false); // 20 is begin number
2207 uint32_t blockTime = 500; // 500ms
2208 virtualCloudDb_->SetBlockTime(blockTime);
2209 Query normalQuery = Query::Select().FromTable({tableName_});
2210 CloudSyncOption normalOption;
2211 PrepareOption(normalOption, normalQuery, false);
2212 bool isFinish = false;
2213 bool priorityFinish = false;
2214 SyncProcess normalLast;
__anon6d2f4c682c02(const std::map<std::string, SyncProcess> &process) 2215 auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2216 for (const auto &item : process) {
2217 if (item.second.process == DistributedDB::FINISHED) {
2218 isFinish = true;
2219 ASSERT_EQ(priorityFinish, true);
2220 normalLast = item.second;
2221 }
2222 }
2223 };
2224 ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2225
2226 /**
2227 * @tc.steps:step2. priority sync.
2228 * @tc.expected: step2. ok.
2229 */
2230 std::vector<std::string> idValues = {"10", "11", "12", "13", "14"};
2231 Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2232 CloudSyncOption priorityOption;
2233 PrepareOption(priorityOption, priorityQuery, true);
__anon6d2f4c682d02(const std::map<std::string, SyncProcess> &process) 2234 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2235 for (const auto &item : process) {
2236 if (item.second.process == DistributedDB::FINISHED) {
2237 priorityFinish = true;
2238 }
2239 }
2240 };
2241 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2242 while (isFinish == false || priorityFinish == false) {
2243 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
2244 }
2245
2246 /**
2247 * @tc.steps:step3. check uploadInfo and downloadInfo after sync finished.
2248 * @tc.expected: step3. ok.
2249 */
2250 uint32_t uintLocalCount = static_cast<uint32_t>(localCount);
2251 uint32_t uintCloudCount = static_cast<uint32_t>(cloudCount);
2252 const Info expectUploadInfo = {1u, uintLocalCount, uintLocalCount, 0u, uintLocalCount, 0u, 0u};
2253 const Info expectDownloadInfo = {1u, uintCloudCount, uintCloudCount, 0u, uintCloudCount, 0u, 0u};
2254 for (const auto &table : normalLast.tableProcess) {
2255 CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2256 CheckDownloadInfo(table.second.downLoadInfo, expectDownloadInfo);
2257 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2258 }
2259 CheckUserTableResult(db_, tableName_, 60);
2260 }
2261
2262 /**
2263 * @tc.name: CloudPrioritySyncTest016
2264 * @tc.desc: priority sync when normal syncing
2265 * @tc.type: FUNC
2266 * @tc.require:
2267 * @tc.author: wangxiangdong
2268 */
2269 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level1)
2270 {
2271 /**
2272 * @tc.steps:step1. insert cloud table record.
2273 * @tc.expected: step1. ok.
2274 */
2275 const int actualCount = 60; // 60 is count of records
2276 InsertCloudTableRecord(0, actualCount, 0, false);
2277 InsertUserTableRecord(tableName_, 10);
2278
2279 /**
2280 * @tc.steps:step2. begin normal sync and priority sync.
2281 * @tc.expected: step2. ok.
2282 */
2283 Query normalQuery = Query::Select().FromTable({tableName_});
2284 std::vector<std::string> idValue = {"0", "1", "2"};
2285 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
2286 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
2287 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
2288 virtualCloudDb_->Reset();
2289 CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
2290 /**
2291 * @tc.steps:step3. check sync process result.
2292 * @tc.expected: step3. ok.
2293 */
2294 std::vector<DistributedDB::SyncProcess> expectSyncResult = {
2295 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2296 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2297 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
2298 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2299 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
2300 };
2301 EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
2302 }
2303
2304 /**
2305 * @tc.name: LogicDeleteSyncTest001
2306 * @tc.desc: sync with logic delete
2307 * @tc.type: FUNC
2308 * @tc.require:
2309 * @tc.author: zhangqiquan
2310 */
2311 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level1)
2312 {
2313 bool logicDelete = true;
2314 auto data = static_cast<PragmaData>(&logicDelete);
2315 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2316 int actualCount = 10;
2317 InitLogicDeleteDataEnv(actualCount, true);
2318 CheckLocalCount(actualCount);
2319 std::string device = "";
2320 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
2321 CheckLocalCount(actualCount);
2322 }
2323
2324 /**
2325 * @tc.name: LogicDeleteSyncTest002
2326 * @tc.desc: sync without logic delete
2327 * @tc.type: FUNC
2328 * @tc.require:
2329 * @tc.author: zhangqiquan
2330 */
2331 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level1)
2332 {
2333 bool logicDelete = false;
2334 auto data = static_cast<PragmaData>(&logicDelete);
2335 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2336 int actualCount = 10;
2337 InitLogicDeleteDataEnv(actualCount);
2338 CheckLocalCount(0);
2339 }
2340
2341 /**
2342 * @tc.name: LogicDeleteSyncTest003
2343 * @tc.desc: sync with logic delete and check observer
2344 * @tc.type: FUNC
2345 * @tc.require:
2346 * @tc.author: bty
2347 */
2348 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level1)
2349 {
2350 /**
2351 * @tc.steps:step1. register observer.
2352 * @tc.expected: step1. ok.
2353 */
2354 RelationalStoreDelegate::Option option;
2355 auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
2356 ASSERT_NE(observer, nullptr);
2357 observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
2358 EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
2359 ChangedData expectData;
2360 expectData.tableName = tableName_;
2361 expectData.type = ChangedDataType::DATA;
2362 expectData.field.push_back(std::string("id"));
2363 const int count = 10;
2364 for (int64_t i = 0; i < count; ++i) {
2365 expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
2366 }
2367 expectData.properties = { .isTrackedDataChange = true };
2368 observer->SetExpectedResult(expectData);
2369
2370 /**
2371 * @tc.steps:step2. set tracker table
2372 * @tc.expected: step2. ok.
2373 */
2374 TrackerSchema trackerSchema;
2375 trackerSchema.tableName = tableName_;
2376 trackerSchema.trackerColNames = { "id" };
2377 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2378
2379 /**
2380 * @tc.steps:step3. set logic delete and sync
2381 * @tc.expected: step3. ok.
2382 */
2383 bool logicDelete = true;
2384 auto data = static_cast<PragmaData>(&logicDelete);
2385 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2386 int actualCount = 10;
2387 InitLogicDeleteDataEnv(actualCount);
2388 CheckLocalCount(actualCount);
2389 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2390 observer->ClearChangedData();
2391
2392 /**
2393 * @tc.steps:step4. unSetTrackerTable and sync
2394 * @tc.expected: step4. ok.
2395 */
2396 expectData.properties = { .isTrackedDataChange = false };
2397 observer->SetExpectedResult(expectData);
2398 trackerSchema.trackerColNames = {};
2399 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2400 InsertUserTableRecord(tableName_, actualCount);
2401 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2402 for (int i = 0; i < actualCount + actualCount; ++i) {
2403 DeleteCloudTableRecord(i);
2404 }
2405 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2406 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2407
2408 EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
2409 delete observer;
2410 observer = nullptr;
2411 }
2412
2413 /**
2414 * @tc.name: LogicDeleteSyncTest004
2415 * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
2416 * @tc.type: FUNC
2417 * @tc.require:
2418 * @tc.author: chenchaohao
2419 */
2420 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level1)
2421 {
2422 /**
2423 * @tc.steps:step1. set logic delete
2424 * @tc.expected: step1. ok.
2425 */
2426 bool logicDelete = true;
2427 auto data = static_cast<PragmaData>(&logicDelete);
2428 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2429
2430 /**
2431 * @tc.steps:step2. cloud delete data then sync, check removedevicedata
2432 * @tc.expected: step2. ok.
2433 */
2434 int actualCount = 10;
2435 InitLogicDeleteDataEnv(actualCount);
2436 CheckLocalCount(actualCount);
2437 std::string device = "";
2438 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
2439 CheckLocalCount(actualCount);
2440 CheckLogCleaned(0);
2441 }
2442
2443 /**
2444 * @tc.name: LogicDeleteSyncTest005
2445 * @tc.desc: test pragma when set cmd is not logic delete
2446 * @tc.type: FUNC
2447 * @tc.require:
2448 * @tc.author: chenchaohao
2449 */
2450 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
2451 {
2452 /**
2453 * @tc.steps:step1. set cmd is auto sync
2454 * @tc.expected: step1. ok.
2455 */
2456 bool logicDelete = true;
2457 auto data = static_cast<PragmaData>(&logicDelete);
2458 EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
2459 }
2460
2461 /**
2462 * @tc.name: LogicDeleteSyncTest006
2463 * @tc.desc: sync with logic delete after lock table.
2464 * @tc.type: FUNC
2465 * @tc.require:
2466 * @tc.author: liaoyonghuang
2467 */
2468 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level1)
2469 {
2470 /**
2471 * @tc.steps:step1. set logic delete
2472 * @tc.expected: step1. ok.
2473 */
2474 bool logicDelete = true;
2475 auto data = static_cast<PragmaData>(&logicDelete);
2476 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2477
2478 /**
2479 * @tc.steps:step2. insert user table record and sync.
2480 * @tc.expected: step2. ok.
2481 */
2482 int dataCount = 10;
2483 InsertUserTableRecord(tableName_, dataCount);
2484 Query query = Query::Select().FromTable({ tableName_ });
2485 BlockSync(query, delegate_, g_actualDBStatus);
2486
2487 /**
2488 * @tc.steps:step3. Lock log table, and delete data from cloud table.
2489 * @tc.expected: step3. ok.
2490 */
2491 std::vector<std::vector<uint8_t>> hashKey;
2492 CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
2493 Lock(tableName_, hashKey, db_);
2494 for (int i = 0; i < dataCount; ++i) {
2495 DeleteCloudTableRecord(i);
2496 }
2497 /**
2498 * @tc.steps:step4. sync.
2499 * @tc.expected: step4. ok.
2500 */
2501 std::vector<DBStatus> actualDBStatus;
2502 BlockSync(query, delegate_, actualDBStatus);
2503 for (auto status : actualDBStatus) {
2504 EXPECT_EQ(status, OK);
2505 }
2506 }
2507
2508 /**
2509 * @tc.name: LogicDeleteSyncTest008
2510 * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2511 * @tc.type: FUNC
2512 * @tc.require:
2513 * @tc.author: liaoyonghuang
2514 */
2515 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level1)
2516 {
2517 /**
2518 * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2519 * @tc.expected: step1. ok.
2520 */
2521 int dataCount = 10;
2522 uint32_t logicDeleteCount = 4;
2523 InsertUserTableRecord(tableName_, dataCount);
2524 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2525 " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2526 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2527 InsertCloudTableRecord(0, dataCount, 0, false);
2528 sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2529 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2530 reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2531 /**
2532 * @tc.steps:step2. Do sync.
2533 * @tc.expected: step2. ok.
2534 */
2535 Query query = Query::Select().FromTable({ tableName_ });
2536 BlockSync(query, delegate_, g_actualDBStatus);
2537 /**
2538 * @tc.steps:step3. Check data flag in local DB.
2539 * @tc.expected: step3. No data flag is 0x800.
2540 */
2541 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2542 reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2543 }
2544
2545 /**
2546 * @tc.name: LockActionTest001
2547 * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2548 * @tc.type: FUNC
2549 * @tc.require:
2550 * @tc.author: wangxiangdong
2551 */
2552 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2553 {
2554 /**
2555 * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2556 * @tc.expected: step1. ok.
2557 */
2558 CloudSyncOption option;
2559 option.devices = { "CLOUD" };
2560 option.mode = SYNC_MODE_CLOUD_MERGE;
2561 option.query = Query::Select().FromTable({ tableName_ });
2562 option.waitTime = g_syncWaitTime;
2563 auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2564 | static_cast<uint32_t>(LockAction::DELETE);
2565 option.lockAction = static_cast<LockAction>(action);
2566 option.priorityTask = true;
2567 option.compensatedSyncOnly = true;
2568 const SyncProcessCallback onProcess;
2569 CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2570 EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2571 }
2572
2573 /**
2574 * @tc.name: LogicCreateRepeatedTableNameTest001
2575 * @tc.desc: test create repeated table name with different cases
2576 * @tc.type: FUNC
2577 * @tc.require:
2578 * @tc.author: wangxiangdong
2579 */
2580 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2581 {
2582 /**
2583 * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2584 * @tc.expected: step1. operate successfully.
2585 */
2586 DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2587 ASSERT_EQ(createStatus, DBStatus::OK);
2588 }
2589
2590 /**
2591 * @tc.name: SaveCursorTest001
2592 * @tc.desc: test whether cloud cursor is saved when first sync
2593 * @tc.type: FUNC
2594 * @tc.require:
2595 * @tc.author: chenchaohao
2596 */
2597 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level1)
2598 {
2599 /**
2600 * @tc.steps:step1. insert cloud records
2601 * @tc.expected: step1. OK
2602 */
2603 const int actualCount = 10;
2604 InsertCloudTableRecord(0, actualCount, 0, false);
2605
2606 /**
2607 * @tc.steps:step2. check cursor when first sync
2608 * @tc.expected: step2. OK
2609 */
__anon6d2f4c682e02(const std::string &tableName, VBucket &extend) 2610 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2611 EXPECT_EQ(tableName_, tableName);
2612 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2613 EXPECT_EQ(cursor, "0");
2614 });
2615 Query query = Query::Select().FromTable({ tableName_ });
2616 BlockSync(query, delegate_, g_actualDBStatus);
2617 CheckLocalCount(actualCount);
2618 }
2619
2620 /**
2621 * @tc.name: SaveCursorTest002
2622 * @tc.desc: test whether cloud cursor is saved when first download failed
2623 * @tc.type: FUNC
2624 * @tc.require:
2625 * @tc.author: chenchaohao
2626 */
2627 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level1)
2628 {
2629 /**
2630 * @tc.steps:step1. insert cloud records
2631 * @tc.expected: step1. OK
2632 */
2633 const int actualCount = 10;
2634 InsertCloudTableRecord(0, actualCount, 0, false);
2635
2636 /**
2637 * @tc.steps:step2. set download failed
2638 * @tc.expected: step2. OK
2639 */
2640 virtualCloudDb_->SetCloudError(true);
2641 Query query = Query::Select().FromTable({ tableName_ });
2642 BlockPrioritySync(query, delegate_, false, OK);
2643 CheckLocalCount(0);
2644
2645 /**
2646 * @tc.steps:step3. check cursor when query
2647 * @tc.expected: step3. OK
2648 */
2649 virtualCloudDb_->SetCloudError(false);
__anon6d2f4c682f02(const std::string &tableName, VBucket &extend) 2650 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2651 EXPECT_EQ(tableName_, tableName);
2652 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2653 EXPECT_EQ(cursor, "0");
2654 });
2655 BlockSync(query, delegate_, g_actualDBStatus);
2656 CheckLocalCount(actualCount);
2657 }
2658
2659 /**
2660 * @tc.name: SaveCursorTest003
2661 * @tc.desc: test whether cloud cursor is saved when first upload failed
2662 * @tc.type: FUNC
2663 * @tc.require:
2664 * @tc.author: chenchaohao
2665 */
2666 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level1)
2667 {
2668 /**
2669 * @tc.steps:step1. insert local records
2670 * @tc.expected: step1. OK
2671 */
2672 const int actualCount = 10;
2673 InsertUserTableRecord(tableName_, actualCount);
2674
2675 /**
2676 * @tc.steps:step2. set upload failed
2677 * @tc.expected: step2. OK
2678 */
2679 virtualCloudDb_->SetCloudError(true);
2680 Query query = Query::Select().FromTable({ tableName_ });
2681 BlockPrioritySync(query, delegate_, false, OK);
2682 CheckCloudTableCount(tableName_, 0);
2683
2684 /**
2685 * @tc.steps:step3. check cursor when query
2686 * @tc.expected: step3. OK
2687 */
2688 virtualCloudDb_->SetCloudError(false);
__anon6d2f4c683002(const std::string &tableName, VBucket &extend) 2689 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2690 EXPECT_EQ(tableName_, tableName);
2691 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2692 EXPECT_EQ(cursor, "0");
2693 });
2694 BlockSync(query, delegate_, g_actualDBStatus);
2695 CheckCloudTableCount(tableName_, actualCount);
2696 }
2697
2698 /**
2699 * @tc.name: RangeQuerySyncTest001
2700 * @tc.desc: Test sync that has option parameter with range query.
2701 * @tc.type: FUNC
2702 * @tc.require:
2703 * @tc.author: chenchaohao
2704 */
2705 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level1)
2706 {
2707 /**
2708 * @tc.steps:step1. insert user table record.
2709 * @tc.expected: step1. ok.
2710 */
2711 CloudSyncOption option;
2712 option.devices = { "CLOUD" };
2713 option.mode = SYNC_MODE_CLOUD_MERGE;
2714 option.waitTime = g_syncWaitTime;
2715 Query query = Query::Select().From(tableName_).Range({}, {});
2716 option.query = query;
2717
2718 /**
2719 * @tc.steps:step2. test normal sync with range query.
2720 * @tc.expected: step2. not support.
2721 */
2722 option.priorityTask = false;
2723 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2724
2725 /**
2726 * @tc.steps:step3. test Priority sync with range query.
2727 * @tc.expected: step3. not support.
2728 */
2729 option.priorityTask = true;
2730 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2731 }
2732
2733 /*
2734 * @tc.name: RangeQuerySyncTest002
2735 * @tc.desc: Test sync that has not option parameter with range query.
2736 * @tc.type: FUNC
2737 * @tc.require:
2738 * @tc.author: mazhao
2739 */
2740 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2741 {
2742 Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2743 ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2744 DBStatus::NOT_SUPPORT);
2745 }
2746
2747 /*
2748 * @tc.name: SameDataSync001
2749 * @tc.desc: Test query same data in one batch.
2750 * @tc.type: FUNC
2751 * @tc.require:
2752 * @tc.author: zqq
2753 */
2754 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level1)
2755 {
2756 /**
2757 * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2758 * @tc.expected: step1. OK
2759 */
2760 const int actualCount = 5;
2761 InsertCloudTableRecord(0, actualCount, 0, false);
2762 InsertCloudTableRecord(0, actualCount, 0, false);
2763 /**
2764 * @tc.steps:step2. call sync, local has one batch id:0-4
2765 * @tc.expected: step2. OK
2766 */
2767 Query query = Query::Select().FromTable({ tableName_ });
2768 BlockSync(query, delegate_, g_actualDBStatus);
2769 CheckLocalCount(actualCount);
2770 }
2771
2772 /*
2773 * @tc.name: SameDataSync002
2774 * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2775 * @tc.type: FUNC
2776 * @tc.require:
2777 * @tc.author: liaoyonghuang
2778 */
2779 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2780 {
2781 /**
2782 * @tc.steps:step1. insert local 1 record and sync to cloud.
2783 * @tc.expected: step1. OK
2784 */
2785 const int actualCount = 1;
2786 InsertUserTableRecord(tableName_, actualCount);
2787 Query query = Query::Select().FromTable({ tableName_ });
2788 BlockSync(query, delegate_, g_actualDBStatus);
2789
2790 /**
2791 * @tc.steps:step2. insert 2 records with the same primary key.
2792 * @tc.expected: step2. OK
2793 */
2794 std::vector<VBucket> record;
2795 std::vector<VBucket> extend;
2796 Timestamp now = TimeHelper::GetSysCurrentTime();
2797 VBucket data;
2798 std::vector<uint8_t> photo(0, 'v');
2799 data.insert_or_assign("id", std::string("0"));
2800 data.insert_or_assign("name", std::string("Cloud"));
2801 data.insert_or_assign("height", 166.0); // 166.0 is random double value
2802 data.insert_or_assign("married", false);
2803 data.insert_or_assign("photo", photo);
2804 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2805 record.push_back(data);
2806 data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2807 record.push_back(data);
2808 VBucket log;
2809 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2810 now / CloudDbConstant::TEN_THOUSAND));
2811 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2812 now / CloudDbConstant::TEN_THOUSAND));
2813 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2814 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2815 extend.push_back(log);
2816 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2817 extend.push_back(log);
2818 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2819
2820 /**
2821 * @tc.steps:step3. sync from cloud and check record.
2822 * @tc.expected: step3. The record with age of 14 has been updated locally.
2823 */
2824 BlockSync(query, delegate_, g_actualDBStatus);
2825 std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2826 int64_t actualAge = 0;
2827 int64_t expectAge = 14L;
__anon6d2f4c683102(sqlite3_stmt *stmt) 2828 RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2829 actualAge = sqlite3_column_int(stmt, 0);
2830 return E_OK;
2831 });
2832 EXPECT_EQ(actualAge, expectAge);
2833 }
2834
2835 /*
2836 * @tc.name: CreateDistributedTable001
2837 * @tc.desc: Test create distributed table when table not empty.
2838 * @tc.type: FUNC
2839 * @tc.require:
2840 * @tc.author: zqq
2841 */
2842 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level1)
2843 {
2844 const std::string table = "CreateDistributedTable001";
2845 const std::string createSQL =
2846 "CREATE TABLE IF NOT EXISTS " + table + "(" \
2847 "id TEXT PRIMARY KEY," \
2848 "name TEXT," \
2849 "height REAL ," \
2850 "photo BLOB," \
2851 "age INT);";
2852 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2853 int actualCount = 10;
2854 InsertUserTableRecord(table, actualCount);
2855 InsertCloudTableRecord(table, 0, actualCount, 0, true);
2856 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2857 DataBaseSchema dataBaseSchema = GetSchema();
2858 TableSchema schema = dataBaseSchema.tables.at(0);
2859 schema.name = table;
2860 schema.sharedTableName = "";
2861 dataBaseSchema.tables.push_back(schema);
2862 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2863 /**
2864 * @tc.steps:step2. call sync, local has one batch id:0-4
2865 * @tc.expected: step2. OK
2866 */
2867 Query query = Query::Select().FromTable({ table });
2868 BlockSync(query, delegate_, g_actualDBStatus);
2869 CheckCloudTableCount(table, actualCount);
2870 }
2871
2872 /*
2873 * @tc.name: CloseDbTest001
2874 * @tc.desc: Test process of db close during sync
2875 * @tc.type: FUNC
2876 * @tc.require:
2877 * @tc.author: bty
2878 */
2879 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2880 {
2881 /**
2882 * @tc.steps:step1. insert user table record.
2883 * @tc.expected: step1. ok.
2884 */
2885 const int actualCount = 10; // 10 is count of records
2886 InsertUserTableRecord(tableName_, actualCount);
2887
2888 /**
2889 * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2890 * @tc.expected: step2. ok.
2891 */
2892 std::mutex callMutex;
2893 int callCount = 0;
__anon6d2f4c683202(const std::string &, VBucket &) 2894 virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2895 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2896 });
2897 const auto callback = [&callCount, &callMutex](
__anon6d2f4c683302( const std::map<std::string, SyncProcess> &) 2898 const std::map<std::string, SyncProcess> &) {
2899 {
2900 std::lock_guard<std::mutex> autoLock(callMutex);
2901 callCount++;
2902 }
2903 };
2904 Query query = Query::Select().FromTable({ tableName_ });
2905 ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2906 std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2907 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2908 delegate_ = nullptr;
2909 mgr_ = nullptr;
2910
2911 /**
2912 * @tc.steps:step3. wait for 2 seconds to check the process call count.
2913 * @tc.expected: step3. ok.
2914 */
2915 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2916 EXPECT_EQ(callCount, 0L);
2917 }
2918
2919 /*
2920 * @tc.name: ConsistentFlagTest001
2921 * @tc.desc: Test the consistency flag of no asset table
2922 * @tc.type: FUNC
2923 * @tc.require:
2924 * @tc.author: bty
2925 */
2926 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2927 {
2928 /**
2929 * @tc.steps:step1. init data and sync
2930 * @tc.expected: step1. ok.
2931 */
2932 const int localCount = 20; // 20 is count of local
2933 const int cloudCount = 10; // 10 is count of cloud
2934 InsertUserTableRecord(tableName_, localCount);
2935 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2936 Query query = Query::Select().FromTable({ tableName_ });
2937 BlockSync(query, delegate_, g_actualDBStatus);
2938
2939 /**
2940 * @tc.steps:step2. check the 0x20 bit of flag after sync
2941 * @tc.expected: step2. ok.
2942 */
2943 std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2944 " where flag&0x20=0;";
2945 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2946 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2947
2948 /**
2949 * @tc.steps:step3. delete local data and check
2950 * @tc.expected: step3. ok.
2951 */
2952 std::string sql = "delete from " + tableName_ + " where id = '1';";
2953 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2954 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2955 reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2956
2957 /**
2958 * @tc.steps:step4. check the 0x20 bit of flag after sync
2959 * @tc.expected: step4. ok.
2960 */
2961 BlockSync(query, delegate_, g_actualDBStatus);
2962 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2963 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2964 }
2965
SyncDataStatusTest(bool isCompensatedSyncOnly)2966 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2967 {
2968 /**
2969 * @tc.steps:step1. init data and sync
2970 * @tc.expected: step1. ok.
2971 */
2972 const int localCount = 20; // 20 is count of local
2973 const int cloudCount = 10; // 10 is count of cloud
2974 InsertUserTableRecord(tableName_, localCount);
2975 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2976 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2977 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2978 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2979 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2980 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2981 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2982 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2983 Query query = Query::Select().FromTable({tableName_});
2984
2985 /**
2986 * @tc.steps:step2. check count
2987 * @tc.expected: step2. ok.
2988 */
2989 int64_t syncCount = 2;
2990 BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2991 if (!isCompensatedSyncOnly) {
2992 std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2993 }
2994 std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2995 std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2996 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2997 if (isCompensatedSyncOnly) {
2998 querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2999 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3000 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
3001 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3002 querySql = preSql + " where status=0 and cloud_gid ='';";
3003 int unSyncCount = 14; // 14 is the num of unSync data with status 0
3004 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
3005 } else {
3006 // gid 12、13 are upload insert, lock to lock_change
3007 querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
3008 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3009 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
3010 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
3011 querySql = preSql + " where status=0 and cloud_gid !='';";
3012 int unSyncCount = 16; // 16 is the num of sync finish
3013 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
3014 }
3015 }
3016
3017 /*
3018 * @tc.name: SyncDataStatusTest001
3019 * @tc.desc: Test the status after compensated sync the no asset table
3020 * @tc.type: FUNC
3021 * @tc.require:
3022 * @tc.author: bty
3023 */
3024 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
3025 {
3026 SyncDataStatusTest(true);
3027 }
3028
3029 /*
3030 * @tc.name: SyncDataStatusTest002
3031 * @tc.desc: Test the status after normal sync the no asset table
3032 * @tc.type: FUNC
3033 * @tc.require:
3034 * @tc.author: bty
3035 */
3036 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
3037 {
3038 SyncDataStatusTest(false);
3039 }
3040 }
3041 #endif
3042