1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42 "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43 "id TEXT PRIMARY KEY," \
44 "name TEXT," \
45 "height REAL ," \
46 "photo BLOB," \
47 "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49 "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50 "id TEXT," \
51 "name TEXT," \
52 "height REAL ," \
53 "photo BLOB," \
54 "age INT);";
55 const int64_t g_syncWaitTime = 60;
56
57 const Asset g_cloudAsset = {
58 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61
62 std::vector<DBStatus> g_actualDBStatus;
63
CreateUserDBAndTable(sqlite3 * & db)64 void CreateUserDBAndTable(sqlite3 *&db)
65 {
66 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
67 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
68 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
69 }
70
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)71 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
72 {
73 option.devices = { "CLOUD" };
74 option.mode = SYNC_MODE_CLOUD_MERGE;
75 option.query = query;
76 option.waitTime = g_syncWaitTime;
77 option.priorityTask = isPriorityTask;
78 option.compensatedSyncOnly = isCompensatedSyncOnly;
79 }
80
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)81 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
82 bool prioritySync = false)
83 {
84 std::mutex dataMutex;
85 std::condition_variable cv;
86 bool finish = false;
87 auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
88 for (const auto &item: process) {
89 actualDBStatus.push_back(item.second.errCode);
90 if (item.second.process == DistributedDB::FINISHED) {
91 {
92 std::lock_guard<std::mutex> autoLock(dataMutex);
93 finish = true;
94 }
95 cv.notify_one();
96 }
97 }
98 };
99 CloudSyncOption option;
100 option.devices = { "CLOUD" };
101 option.mode = SYNC_MODE_CLOUD_MERGE;
102 option.query = query;
103 option.waitTime = g_syncWaitTime;
104 option.priorityTask = prioritySync;
105 ASSERT_EQ(delegate->Sync(option, callback), OK);
106 std::unique_lock<std::mutex> uniqueLock(dataMutex);
107 cv.wait(uniqueLock, [&finish]() {
108 return finish;
109 });
110 }
111
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)112 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
113 bool isCompensatedSyncOnly = false)
114 {
115 std::mutex dataMutex;
116 std::condition_variable cv;
117 bool finish = false;
118 auto callback = [&cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
119 for (const auto &item: process) {
120 if (item.second.process == DistributedDB::FINISHED) {
121 {
122 std::lock_guard<std::mutex> autoLock(dataMutex);
123 finish = true;
124 }
125 cv.notify_one();
126 }
127 }
128 };
129 CloudSyncOption option;
130 PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
131 ASSERT_EQ(delegate->Sync(option, callback), expectResult);
132 if (expectResult == OK) {
133 std::unique_lock<std::mutex> uniqueLock(dataMutex);
134 cv.wait(uniqueLock, [&finish]() {
135 return finish;
136 });
137 }
138 }
139
BlockCompensatedSync(const Query & query,RelationalStoreDelegate * delegate,DBStatus expectResult,const std::function<void (const std::map<std::string,SyncProcess> & syncProcess)> & processCallback)140 void BlockCompensatedSync(const Query &query, RelationalStoreDelegate *delegate, DBStatus expectResult,
141 const std::function<void(const std::map<std::string, SyncProcess> &syncProcess)> &processCallback)
142 {
143 std::mutex dataMutex;
144 std::condition_variable cv;
145 bool finish = false;
146 auto callback = [&processCallback, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
147 for (const auto &item: process) {
148 if (item.second.process == DistributedDB::FINISHED) {
149 {
150 std::lock_guard<std::mutex> autoLock(dataMutex);
151 finish = true;
152 }
153 cv.notify_one();
154 }
155 }
156 if (processCallback != nullptr) {
157 processCallback(process);
158 }
159 };
160 CloudSyncOption option;
161 PrepareOption(option, query, false, true);
162 ASSERT_EQ(delegate->Sync(option, callback), expectResult);
163 if (expectResult == OK) {
164 std::unique_lock<std::mutex> uniqueLock(dataMutex);
165 cv.wait(uniqueLock, [&finish]() {
166 return finish;
167 });
168 }
169 }
170
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)171 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
172 {
173 if (count != 1) {
174 return 0;
175 }
176 auto expectCount = reinterpret_cast<int64_t>(data);
177 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
178 return 0;
179 }
180
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)181 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
182 {
183 string query = "select count(*) from " + tableName + ";";
184 EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
185 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
186 }
187
188 class DistributedDBCloudCheckSyncTest : public testing::Test {
189 public:
190 static void SetUpTestCase();
191 static void TearDownTestCase();
192 void SetUp() override;
193 void TearDown() override;
194 protected:
195 void InitTestDir();
196 DataBaseSchema GetSchema();
197 void CloseDb();
198 void InitDataAndSync();
199 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
200 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
201 void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
202 bool assetIsNull);
203 void DeleteUserTableRecord(int64_t id);
204 void DeleteUserTableRecord(int64_t begin, int64_t end);
205 void DeleteCloudTableRecord(int64_t gid);
206 void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
207 bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
208 bool CheckSyncProcessInner(SyncProcess &actualSyncProcess, SyncProcess &expectSyncProcess);
209 bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
210 vector<SyncProcess> &expectSyncProcessV);
211 void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
212 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
213 bool isCheckProcess);
214 void DeleteCloudDBData(int64_t begin, int64_t count);
215 void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
216 void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
217 void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
218 void CheckLocalCount(int64_t expectCount);
219 void CheckLogCleaned(int64_t expectCount);
220 void CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo);
221 void CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo);
222 void SyncDataStatusTest(bool isCompensatedSyncOnly);
223 void CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast);
224 std::string testDir_;
225 std::string storePath_;
226 sqlite3 *db_ = nullptr;
227 RelationalStoreDelegate *delegate_ = nullptr;
228 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
229 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
230 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
231 std::string tableName_ = "DistributedDBCloudCheckSyncTest";
232 std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
233 std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
234 std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
235 std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
236 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
237 };
238
SetUpTestCase()239 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
240 {
241 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
242 }
243
TearDownTestCase()244 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
245 {}
246
SetUp()247 void DistributedDBCloudCheckSyncTest::SetUp()
248 {
249 DistributedDBToolsUnitTest::PrintTestCaseInfo();
250 InitTestDir();
251 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
252 LOGE("rm test db files error.");
253 }
254 DistributedDBToolsUnitTest::PrintTestCaseInfo();
255 LOGD("Test dir is %s", testDir_.c_str());
256 db_ = RelationalTestUtils::CreateDataBase(storePath_);
257 ASSERT_NE(db_, nullptr);
258 CreateUserDBAndTable(db_);
259 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
260 RelationalStoreDelegate::Option option;
261 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
262 ASSERT_NE(delegate_, nullptr);
263 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
264 ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
265 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
266 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
267 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
268 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
269 DataBaseSchema dataBaseSchema = GetSchema();
270 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
271 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
272 ASSERT_TRUE(communicatorAggregator_ != nullptr);
273 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
274 }
275
TearDown()276 void DistributedDBCloudCheckSyncTest::TearDown()
277 {
278 virtualCloudDb_->ForkQuery(nullptr);
279 virtualCloudDb_->SetCloudError(false);
280 CloseDb();
281 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
282 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
283 LOGE("rm test db files error.");
284 }
285 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
286 communicatorAggregator_ = nullptr;
287 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
288 }
289
InitTestDir()290 void DistributedDBCloudCheckSyncTest::InitTestDir()
291 {
292 if (!testDir_.empty()) {
293 return;
294 }
295 DistributedDBToolsUnitTest::TestDirInit(testDir_);
296 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
297 LOGI("The test db is:%s", testDir_.c_str());
298 }
299
GetSchema()300 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
301 {
302 DataBaseSchema schema;
303 TableSchema tableSchema;
304 tableSchema.name = tableName_;
305 tableSchema.sharedTableName = tableNameShared_;
306 tableSchema.fields = {
307 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
308 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
309 };
310 TableSchema tableWithoutPrimaryKeySchema;
311 tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
312 tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
313 tableWithoutPrimaryKeySchema.fields = {
314 {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
315 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
316 };
317 schema.tables.push_back(tableSchema);
318 schema.tables.push_back(tableWithoutPrimaryKeySchema);
319 return schema;
320 }
321
CloseDb()322 void DistributedDBCloudCheckSyncTest::CloseDb()
323 {
324 virtualCloudDb_ = nullptr;
325 if (mgr_ != nullptr) {
326 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
327 delegate_ = nullptr;
328 mgr_ = nullptr;
329 }
330 }
331
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)332 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
333 int64_t recordCounts, int64_t begin)
334 {
335 ASSERT_NE(db_, nullptr);
336 for (int64_t i = begin; i < begin + recordCounts; ++i) {
337 string sql = "INSERT OR REPLACE INTO " + tableName
338 + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
339 + std::to_string(i) + "', '155.10', 'text', '21');";
340 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
341 }
342 }
343
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)344 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
345 bool assetIsNull)
346 {
347 InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
348 }
349
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)350 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
351 int64_t photoSize, bool assetIsNull)
352 {
353 std::vector<uint8_t> photo(photoSize, 'v');
354 std::vector<VBucket> record1;
355 std::vector<VBucket> extend1;
356 std::vector<VBucket> record2;
357 std::vector<VBucket> extend2;
358 Timestamp now = TimeHelper::GetSysCurrentTime();
359 for (int64_t i = begin; i < begin + count; ++i) {
360 VBucket data;
361 data.insert_or_assign("id", std::to_string(i));
362 data.insert_or_assign("name", "Cloud" + std::to_string(i));
363 data.insert_or_assign("height", 166.0); // 166.0 is random double value
364 data.insert_or_assign("married", false);
365 data.insert_or_assign("photo", photo);
366 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
367 Asset asset = g_cloudAsset;
368 asset.name = asset.name + std::to_string(i);
369 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
370 record1.push_back(data);
371 VBucket log;
372 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
373 now / CloudDbConstant::TEN_THOUSAND + i));
374 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
375 now / CloudDbConstant::TEN_THOUSAND + i));
376 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
377 extend1.push_back(log);
378
379 std::vector<Asset> assets;
380 data.insert_or_assign("height", 180.3); // 180.3 is random double value
381 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
382 asset.name = g_cloudAsset.name + std::to_string(j);
383 assets.push_back(asset);
384 }
385 data.erase("assert");
386 data.erase("married");
387 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
388 record2.push_back(data);
389 extend2.push_back(log);
390 }
391 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
392 std::this_thread::sleep_for(std::chrono::milliseconds(count));
393 }
394
DeleteUserTableRecord(int64_t id)395 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
396 {
397 ASSERT_NE(db_, nullptr);
398 string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
399 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
400 }
401
DeleteUserTableRecord(int64_t begin,int64_t end)402 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
403 {
404 ASSERT_NE(db_, nullptr);
405 std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
406 for (int64_t i = begin; i <= end; ++i) {
407 sql += "'" + std::to_string(i) + "',";
408 }
409 if (sql.back() == ',') {
410 sql.pop_back();
411 }
412 sql += ");";
413 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
414 }
415
DeleteCloudTableRecord(int64_t gid)416 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
417 {
418 VBucket idMap;
419 idMap.insert_or_assign("#_gid", std::to_string(gid));
420 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
421 }
422
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)423 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
424 {
425 VBucket extend;
426 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
427 int64_t realCount = 0;
428 std::vector<VBucket> data;
429 virtualCloudDb_->Query(tableName, extend, data);
430 for (size_t j = 0; j < data.size(); ++j) {
431 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
432 if (entry != data[j].end() && std::get<bool>(entry->second)) {
433 continue;
434 }
435 realCount++;
436 }
437 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
438 }
439
CheckSyncCount(const Info actualInfo,const Info expectInfo)440 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
441 {
442 if (actualInfo.batchIndex != expectInfo.batchIndex) {
443 return false;
444 }
445 if (actualInfo.total != expectInfo.total) {
446 return false;
447 }
448 if (actualInfo.successCount != expectInfo.successCount) {
449 return false;
450 }
451 if (actualInfo.failCount != expectInfo.failCount) {
452 return false;
453 }
454 return true;
455 }
456
CheckSyncProcessInner(SyncProcess & actualSyncProcess,SyncProcess & expectSyncProcess)457 bool DistributedDBCloudCheckSyncTest::CheckSyncProcessInner(SyncProcess &actualSyncProcess,
458 SyncProcess &expectSyncProcess)
459 {
460 for (const auto &itInner : actualSyncProcess.tableProcess) {
461 std::string tableName = itInner.first;
462 if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
463 return false;
464 }
465 TableProcessInfo actualTableProcessInfo = itInner.second;
466 TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
467 if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
468 return false;
469 }
470 if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
471 return false;
472 }
473 }
474 return true;
475 }
476
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)477 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
478 std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
479 {
480 vector<map<string, SyncProcess>> expectSyncProcess;
481 for (auto syncProcess : expectSyncProcessV) {
482 map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
483 expectSyncProcess.emplace_back(expectSyncProcessMap);
484 }
485 for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
486 map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
487 map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
488 for (auto &it : actualSyncProcessMap) {
489 string mapKey = it.first;
490 if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
491 return false;
492 }
493 SyncProcess actualSyncProcess = it.second;
494 SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
495 if (!CheckSyncProcessInner(actualSyncProcess, expectSyncProcess)) {
496 return false;
497 }
498 }
499 }
500 return true;
501 }
502
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)503 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
504 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
505 bool isCheckProcess)
506 {
507 std::mutex dataMutex;
508 std::condition_variable cv;
509 bool normalFinish = false;
510 bool priorityFinish = false;
511 auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
512 const std::map<std::string, SyncProcess> &process) {
513 auto foundFinishedProcess = std::find_if(process.begin(), process.end(), [](const auto &item) {
514 return item.second.process == DistributedDB::FINISHED;
515 });
516 if (foundFinishedProcess != process.end()) {
517 normalFinish = true;
518 if (isCheckProcess) {
519 ASSERT_EQ(priorityFinish, true);
520 }
521 cv.notify_one();
522 }
523 prioritySyncProcess.emplace_back(process);
524 };
525 auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
526 const std::map<std::string, SyncProcess> &process) {
527 auto it = std::find_if(process.begin(), process.end(), [](const auto &item) {
528 return item.second.process == DistributedDB::FINISHED;
529 });
530 if (it != process.end()) {
531 priorityFinish = true;
532 cv.notify_one();
533 }
534 prioritySyncProcess.emplace_back(process);
535 };
536 CloudSyncOption option;
537 PrepareOption(option, normalQuery, false);
538 virtualCloudDb_->SetBlockTime(500); // 500 ms
539 ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
540 PrepareOption(option, priorityQuery, true);
541 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
542 ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
543 std::unique_lock<std::mutex> uniqueLock(dataMutex);
544 cv.wait(uniqueLock, [&normalFinish]() {
545 return normalFinish;
546 });
547 }
548
DeleteCloudDBData(int64_t begin,int64_t count)549 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
550 {
551 for (int64_t i = begin; i < begin + count; i++) {
552 VBucket idMap;
553 idMap.insert_or_assign("#_gid", std::to_string(i));
554 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
555 }
556 }
557
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)558 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
559 {
560 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
561 count++;
562 if (count == 1) { // taskid1
563 std::this_thread::sleep_for(std::chrono::seconds(1));
564 }
565 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
566 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
567 }
568 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
569 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
570 }
571 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
572 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
573 }
574 });
575 }
576
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)577 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
578 {
579 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
580 count++;
581 if (count == 1) { // taskid1
582 std::this_thread::sleep_for(std::chrono::seconds(1));
583 }
584 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
585 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
586 }
587 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
588 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
589 }
590 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
591 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
592 }
593 });
594 }
595
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)596 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
597 {
598 // prepare data
599 InsertUserTableRecord(tableName_, dataCount);
600 // sync
601 Query query = Query::Select().FromTable({ tableName_ });
602 BlockSync(query, delegate_, g_actualDBStatus);
603 // delete cloud data
604 for (int i = 0; i < dataCount; ++i) {
605 DeleteCloudTableRecord(i);
606 }
607 // sync again
608 BlockSync(query, delegate_, g_actualDBStatus);
609 }
610
CheckLocalCount(int64_t expectCount)611 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
612 {
613 // check local data
614 int dataCnt = -1;
615 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
616 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
617 dataCnt = sqlite3_column_int(stmt, 0);
618 return E_OK;
619 });
620 EXPECT_EQ(dataCnt, expectCount);
621 }
622
CheckLogCleaned(int64_t expectCount)623 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
624 {
625 std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
626 " where device = 'cloud';";
627 EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
628 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
629 std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
630 " is not null and cloud_gid != '';";
631 EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
632 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
633 std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
634 " where flag & 0x02 != 0;";
635 EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
636 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
637 }
638
CheckUploadInfo(const Info & actualUploadInfo,const Info & expectUploadInfo)639 void DistributedDBCloudCheckSyncTest::CheckUploadInfo(const Info &actualUploadInfo, const Info &expectUploadInfo)
640 {
641 EXPECT_EQ(actualUploadInfo.batchIndex, expectUploadInfo.batchIndex);
642 EXPECT_EQ(actualUploadInfo.total, expectUploadInfo.total);
643 EXPECT_EQ(actualUploadInfo.successCount, expectUploadInfo.successCount);
644 EXPECT_EQ(actualUploadInfo.failCount, expectUploadInfo.failCount);
645 EXPECT_EQ(actualUploadInfo.insertCount, expectUploadInfo.insertCount);
646 EXPECT_EQ(actualUploadInfo.updateCount, expectUploadInfo.updateCount);
647 EXPECT_EQ(actualUploadInfo.deleteCount, expectUploadInfo.deleteCount);
648 }
649
CheckDownloadInfo(const Info & actualDownloadInfo,const Info & expectDownloadInfo)650 void DistributedDBCloudCheckSyncTest::CheckDownloadInfo(const Info &actualDownloadInfo, const Info &expectDownloadInfo)
651 {
652 EXPECT_EQ(actualDownloadInfo.batchIndex, expectDownloadInfo.batchIndex);
653 EXPECT_EQ(actualDownloadInfo.total, expectDownloadInfo.total);
654 EXPECT_EQ(actualDownloadInfo.successCount, expectDownloadInfo.successCount);
655 EXPECT_EQ(actualDownloadInfo.failCount, expectDownloadInfo.failCount);
656 EXPECT_EQ(actualDownloadInfo.insertCount, expectDownloadInfo.insertCount);
657 EXPECT_EQ(actualDownloadInfo.updateCount, expectDownloadInfo.updateCount);
658 EXPECT_EQ(actualDownloadInfo.deleteCount, expectDownloadInfo.deleteCount);
659 }
660
661 /**
662 * @tc.name: CloudSyncTest001
663 * @tc.desc: sync with device sync query
664 * @tc.type: FUNC
665 * @tc.require:
666 * @tc.author: zhangqiquan
667 */
668 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level0)
669 {
670 // prepare data
671 const int actualCount = 10;
672 InsertUserTableRecord(tableName_, actualCount);
673 // sync twice
674 Query query = Query::Select().FromTable({ tableName_ });
675 BlockSync(query, delegate_, g_actualDBStatus);
676 BlockSync(query, delegate_, g_actualDBStatus);
677 // remove cloud data
678 delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
679 // check local data
680 int dataCnt = -1;
681 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon28bcdbc61002(sqlite3_stmt *stmt) 682 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
683 dataCnt = sqlite3_column_int(stmt, 0);
684 return E_OK;
685 });
686 EXPECT_EQ(dataCnt, 0);
687 }
688
689 /**
690 * @tc.name: CloudSyncTest002
691 * @tc.desc: sync with same data in one batch
692 * @tc.type: FUNC
693 * @tc.require:
694 * @tc.author: zhangqiquan
695 */
696 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level0)
697 {
698 // prepare data
699 const int actualCount = 1;
700 InsertUserTableRecord(tableName_, actualCount);
701 // sync twice
702 Query query = Query::Select().FromTable({ tableName_ });
703 BlockSync(query, delegate_, g_actualDBStatus);
704 // cloud delete id=0 and insert id=0 but its gid is 1
705 // local delete id=0
706 DeleteCloudTableRecord(0); // cloud gid is 0
707 InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
708 DeleteUserTableRecord(0); // 0 is id
709 BlockSync(query, delegate_, g_actualDBStatus);
710 bool deleteStatus = true;
711 EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
712 EXPECT_EQ(deleteStatus, false);
713 }
714
715 /**
716 * @tc.name: CloudSyncTest003
717 * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
718 * @tc.type: FUNC
719 * @tc.require:
720 * @tc.author: zhangshijie
721 */
722 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level0)
723 {
724 // prepare data
725 const int actualCount = 1;
726 InsertUserTableRecord(tableName_, actualCount);
727
728 InsertCloudTableRecord(0, actualCount, 0, false);
729 // delete local data
730 DeleteUserTableRecord(0);
731 Query query = Query::Select().FromTable({ tableName_ });
732 BlockSync(query, delegate_, g_actualDBStatus);
733
734 // check local data, cloud date will insert into local
735 int dataCnt = -1;
736 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anon28bcdbc61102(sqlite3_stmt *stmt) 737 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
738 dataCnt = sqlite3_column_int(stmt, 0);
739 return E_OK;
740 });
741 EXPECT_EQ(dataCnt, actualCount);
742 }
743
744 /**
745 * @tc.name: CloudSyncTest004
746 * @tc.desc: sync after insert failed
747 * @tc.type: FUNC
748 * @tc.require:
749 * @tc.author: zhangqiquan
750 */
751 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level0)
752 {
753 // prepare data
754 const int actualCount = 1;
755 InsertUserTableRecord(tableName_, actualCount);
756 // sync twice
757 Query query = Query::Select().FromTable({ tableName_ });
758 LOGW("Block Sync");
759 virtualCloudDb_->SetInsertFailed(1);
760 BlockSync(query, delegate_, g_actualDBStatus);
761 // delete local data
762 DeleteUserTableRecord(0); // 0 is id
763 LOGW("Block Sync");
764 // sync again and this record with be synced to cloud
765 BlockSync(query, delegate_, g_actualDBStatus);
766 bool deleteStatus = true;
767 EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
768 EXPECT_EQ(deleteStatus, true);
769 }
770
771 /**
772 * @tc.name: CloudSyncTest005
773 * @tc.desc: check device in process after sync
774 * @tc.type: FUNC
775 * @tc.require:
776 * @tc.author: liaoyonghuang
777 */
778 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest005, TestSize.Level0)
779 {
780 /**
781 * @tc.steps:step1. init data and sync
782 * @tc.expected: step1. ok.
783 */
784 const int localCount = 20; // 20 is count of local
785 const int cloudCount = 10; // 10 is count of cloud
786 InsertUserTableRecord(tableName_, localCount);
787 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
788 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
789 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
790
791 /**
792 * @tc.steps:step2. check device name in process
793 * @tc.expected: step2. ok.
794 */
795 Query query = Query::Select().FromTable({tableName_});
__anon28bcdbc61202(const std::map<std::string, SyncProcess> &syncProcess) 796 auto callback = [](const std::map<std::string, SyncProcess> &syncProcess) {
797 EXPECT_TRUE(syncProcess.find("CLOUD") != syncProcess.end());
798 };
799 BlockCompensatedSync(query, delegate_, OK, callback);
800 }
801
InitDataAndSync()802 void DistributedDBCloudCheckSyncTest::InitDataAndSync()
803 {
804 const int localCount = 120; // 120 is count of local
805 const int cloudCount = 100; // 100 is count of cloud
806 InsertUserTableRecord(tableName_, localCount, 0);
807 InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
808 InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false); // 80 is begin sync number
809 }
810
811 /**
812 * @tc.name: CloudSyncTest006
813 * @tc.desc: check reDownload when common sync pause.
814 * @tc.type: FUNC
815 * @tc.require:
816 * @tc.author: luoguo
817 */
818 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level0)
819 {
820 /**
821 * @tc.steps:step1. init data and sync
822 * @tc.expected: step1. ok.
823 */
824 InitDataAndSync();
825
826 /**
827 * @tc.steps:step2. common sync will pause
828 * @tc.expected: step2. ok.
829 */
830 std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
831 Query normalQuery = Query::Select().FromTable({tableNames});
832 std::vector<std::string> idValue = {"0", "1", "2"};
833 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
834 CloudSyncOption option;
835 CloudSyncOption priorityOption;
836 PrepareOption(option, normalQuery, false);
837 PrepareOption(priorityOption, priorityQuery, true);
838 bool isUpload = false;
839 uint32_t blockTime = 2000;
__anon28bcdbc61302(const std::string &tableName, VBucket &extend) 840 virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
841 if (isUpload == false) {
842 isUpload = true;
843 std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
844 }
845 });
846 bool isFinished = false;
847 bool priorityFinish = false;
__anon28bcdbc61402(const std::map<std::string, SyncProcess> &process) 848 auto normalCallback = [&isFinished, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
849 for (const auto &item : process) {
850 if (item.second.process == DistributedDB::FINISHED) {
851 isFinished = true;
852 ASSERT_EQ(priorityFinish, true);
853 }
854 }
855 };
856 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
857
858 /**
859 * @tc.steps:step3. wait common upload and priority sync.
860 * @tc.expected: step3. ok.
861 */
862 while (isUpload == false) {
863 std::this_thread::sleep_for(std::chrono::milliseconds(50));
864 }
__anon28bcdbc61502(const std::map<std::string, SyncProcess> &process) 865 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
866 for (const auto &item : process) {
867 if (item.second.process == DistributedDB::FINISHED) {
868 priorityFinish = true;
869 }
870 }
871 };
872 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
873 while (isFinished == false || priorityFinish == false) {
874 std::this_thread::sleep_for(std::chrono::milliseconds(50));
875 }
876
877 /**
878 * @tc.steps:step4. wait common sync and priority sync finish, check query Times.
879 * @tc.expected: step4. ok.
880 */
881 uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
882 ASSERT_EQ(times, 3u);
883 virtualCloudDb_->ForkUpload(nullptr);
884 }
885
886 /**
887 * @tc.name: CloudSyncTest007
888 * @tc.desc: check process info when version conflict sync process.
889 * @tc.type: FUNC
890 * @tc.require:
891 * @tc.author: luoguo
892 */
893 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level0)
894 {
895 /**
896 * @tc.steps:step1. init data and sync
897 * @tc.expected: step1. ok.
898 */
899 const int localCount = 60;
900 InsertUserTableRecord(tableName_, localCount, 0);
901 Query query = Query::Select().FromTable({tableName_});
902 BlockSync(query, delegate_, g_actualDBStatus);
903
904 /**
905 * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
906 * @tc.expected: step2. ok.
907 */
908 DeleteUserTableRecord(30, 59);
909 bool isUpload = false;
__anon28bcdbc61602(const std::string &tableName, VBucket &extend) 910 virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
911 if (isUpload == false) {
912 isUpload = true;
913 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
914 }
915 });
916 bool isFinished = false;
917 std::map<std::string, TableProcessInfo> retSyncProcess;
__anon28bcdbc61702(const std::map<std::string, SyncProcess> &process) 918 auto normalCallback = [&isFinished, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
919 for (const auto &item : process) {
920 if (item.second.process == DistributedDB::FINISHED) {
921 isFinished = true;
922 ASSERT_EQ(process.empty(), false);
923 auto lastProcess = process.rbegin();
924 retSyncProcess = lastProcess->second.tableProcess;
925 }
926 }
927 };
928
929 /**
930 * @tc.steps:step3. sync.
931 * @tc.expected: step3. ok.
932 */
933 std::vector<std::string> tableNames = {tableName_};
934 Query normalQuery = Query::Select().FromTable({tableNames});
935 CloudSyncOption option;
936 PrepareOption(option, normalQuery, false);
937 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
938
939 /**
940 * @tc.steps:step4. wait upload process and delete 30 record in cloud table.
941 * @tc.expected: step4. ok.
942 */
943 while (isUpload == false) {
944 std::this_thread::sleep_for(std::chrono::milliseconds(50));
945 }
946 DeleteCloudTableRecord(30);
947
948 /**
949 * @tc.steps:step5. wait sync process end and check data.
950 * @tc.expected: step5. ok.
951 */
952 while (isFinished == false) {
953 std::this_thread::sleep_for(std::chrono::milliseconds(50));
954 }
955 ASSERT_EQ(retSyncProcess.empty(), false);
956 auto taskInfo = retSyncProcess.rbegin();
957 ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
958 virtualCloudDb_->ForkUpload(nullptr);
959 }
960
961 /**
962 * @tc.name: CloudSyncTest008
963 * @tc.desc: test when normal sync interrupted by priority sync, process info should be consistent
964 * @tc.type: FUNC
965 * @tc.require:
966 * @tc.author: suyuchen
967 */
968 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest008, TestSize.Level0)
969 {
970 /**
971 * @tc.steps: step1. insert 35 records to user table
972 * @tc.expected: step1. OK.
973 */
974 const int localCount = 35;
975 InsertUserTableRecord(tableName_, localCount, 0);
976 Query query = Query::Select().FromTable({tableName_});
977
978 /**
979 * @tc.steps: step2. Set CLOUD_VERSION_CONFLICT when normal sync task upload
980 * @tc.expected: step2. OK.
981 */
982 int recordIndex = 0;
983 virtualCloudDb_->ForkInsertConflict([&recordIndex](const std::string &tableName, VBucket &extend, VBucket &record,
__anon28bcdbc61802(const std::string &tableName, VBucket &extend, VBucket &record, vector<VirtualCloudDb::CloudData> &cloudDataVec) 984 vector<VirtualCloudDb::CloudData> &cloudDataVec) {
985 recordIndex++;
986 if (recordIndex == 20) {
987 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
988 std::this_thread::sleep_for(std::chrono::seconds(1));
989 return CLOUD_VERSION_CONFLICT;
990 }
991 return OK;
992 });
993
994 /**
995 * @tc.steps: step3. set callback function for normal sync
996 * @tc.expected: step3. OK.
997 */
998 std::map<std::string, TableProcessInfo> retSyncProcess;
__anon28bcdbc61902(const std::map<std::string, SyncProcess> &process) 999 auto normalCallback = [&retSyncProcess](const std::map<std::string, SyncProcess> &process) {
1000 for (const auto &item : process) {
1001 if (item.second.process == DistributedDB::FINISHED) {
1002 ASSERT_EQ(process.empty(), false);
1003 auto lastProcess = process.rbegin();
1004 retSyncProcess = lastProcess->second.tableProcess;
1005 }
1006 }
1007 };
1008
1009 /**
1010 * @tc.steps: step4. start normal sync
1011 * @tc.expected: step4. OK.
1012 */
1013 CloudSyncOption option;
1014 PrepareOption(option, query, false);
1015 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
__anon28bcdbc61a02() 1016 std::thread syncThread1([&]() {
1017 BlockSync(query, delegate_, g_actualDBStatus);
1018 });
1019
1020 /**
1021 * @tc.steps: step5. start priority sync
1022 * @tc.expected: step5. OK.
1023 */
1024 std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anon28bcdbc61b02() 1025 std::thread syncThread2([&]() {
1026 BlockSync(query, delegate_, g_actualDBStatus, true);
1027 });
1028 syncThread1.join();
1029 syncThread2.join();
1030
1031 /**
1032 * @tc.steps: step6. Check notification of normal sync
1033 * @tc.expected: step6. OK.
1034 */
1035 ASSERT_EQ(retSyncProcess.empty(), false);
1036 auto taskInfo = retSyncProcess.rbegin();
1037 ASSERT_EQ(taskInfo->second.upLoadInfo.total, 35u);
1038 ASSERT_EQ(taskInfo->second.upLoadInfo.successCount, 35u);
1039 ASSERT_EQ(taskInfo->second.upLoadInfo.failCount, 0u);
1040
1041 virtualCloudDb_->ForkInsertConflict(nullptr);
1042 }
1043
1044 /**
1045 * @tc.name: CloudSyncTest009
1046 * @tc.desc: reopen database and sync
1047 * @tc.type: FUNC
1048 * @tc.require:
1049 * @tc.author: wangxiangdong
1050 */
1051 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level0)
1052 {
1053 /**
1054 * @tc.steps: step1. insert 1 record to user table
1055 * @tc.expected: step1. OK.
1056 */
1057 const int actualCount = 1;
1058 InsertUserTableRecord(tableName_, actualCount);
1059 /**
1060 * @tc.steps: step2. sync data to cloud
1061 * @tc.expected: step2. OK.
1062 */
1063 Query query = Query::Select().FromTable({ tableName_ });
1064 BlockSync(query, delegate_, g_actualDBStatus);
1065 CheckCloudTableCount(tableName_, 1);
1066 /**
1067 * @tc.steps: step3. drop data table then close db
1068 * @tc.expected: step3. OK.
1069 */
1070 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1071 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1072 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1073 delegate_ = nullptr;
1074 /**
1075 * @tc.steps: step4. recreate data table and reopen database
1076 * @tc.expected: step4. OK.
1077 */
1078 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1079 RelationalStoreDelegate::Option option;
1080 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1081 ASSERT_NE(delegate_, nullptr);
1082 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1083 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1084 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1085 DataBaseSchema dataBaseSchema = GetSchema();
1086 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1087 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1088 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1089 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1090 /**
1091 * @tc.steps: step5. sync and cloud data should be deleted
1092 * @tc.expected: step5. OK.
1093 */
1094 BlockSync(query, delegate_, g_actualDBStatus);
1095 CheckCloudTableCount(tableName_, 0);
1096 }
1097
1098 /**
1099 * @tc.name: CloudSyncTest010
1100 * @tc.desc: reopen database, recreate table with less columns and sync
1101 * @tc.type: FUNC
1102 * @tc.require:
1103 * @tc.author: wangxiangdong
1104 */
1105 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level0)
1106 {
1107 /**
1108 * @tc.steps: step1. insert 1 record to user table
1109 * @tc.expected: step1. OK.
1110 */
1111 const int actualCount = 1;
1112 InsertUserTableRecord(tableName_, actualCount);
1113 /**
1114 * @tc.steps: step2. sync data to cloud
1115 * @tc.expected: step2. OK.
1116 */
1117 Query query = Query::Select().FromTable({ tableName_ });
1118 BlockSync(query, delegate_, g_actualDBStatus);
1119 CheckCloudTableCount(tableName_, 1);
1120 /**
1121 * @tc.steps: step3. drop data table then close db
1122 * @tc.expected: step3. OK.
1123 */
1124 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1125 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1126 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1127 delegate_ = nullptr;
1128 /**
1129 * @tc.steps: step4. recreate data table and reopen database
1130 * @tc.expected: step4. OK.
1131 */
1132 std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
1133 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
1134 RelationalStoreDelegate::Option option;
1135 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1136 ASSERT_NE(delegate_, nullptr);
1137 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::SCHEMA_MISMATCH);
1138 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1139 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1140 DataBaseSchema dataBaseSchema = GetSchema();
1141 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1142 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1143 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1144 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1145 /**
1146 * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1147 * @tc.expected: step5. OK.
1148 */
1149 BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1150 CheckCloudTableCount(tableName_, 1);
1151 }
1152
1153 /**
1154 * @tc.name: CloudSyncTest011
1155 * @tc.desc: reopen database, do not recreate table and sync
1156 * @tc.type: FUNC
1157 * @tc.require:
1158 * @tc.author: wangxiangdong
1159 */
1160 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest011, TestSize.Level0)
1161 {
1162 /**
1163 * @tc.steps: step1. insert 1 record to user table
1164 * @tc.expected: step1. OK.
1165 */
1166 const int actualCount = 1;
1167 InsertUserTableRecord(tableName_, actualCount);
1168 /**
1169 * @tc.steps: step2. sync data to cloud
1170 * @tc.expected: step2. OK.
1171 */
1172 Query query = Query::Select().FromTable({ tableName_ });
1173 BlockSync(query, delegate_, g_actualDBStatus);
1174 CheckCloudTableCount(tableName_, 1);
1175 /**
1176 * @tc.steps: step3. drop data table then close db
1177 * @tc.expected: step3. OK.
1178 */
1179 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1180 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1181 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1182 delegate_ = nullptr;
1183 /**
1184 * @tc.steps: step4. reopen database
1185 * @tc.expected: step4. OK.
1186 */
1187 RelationalStoreDelegate::Option option;
1188 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1189 ASSERT_NE(delegate_, nullptr);
1190 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1191 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1192 DataBaseSchema dataBaseSchema = GetSchema();
1193 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1194 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1195 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1196 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1197 /**
1198 * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
1199 * @tc.expected: step5. OK.
1200 */
1201 BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
1202 }
1203
1204 /**
1205 * @tc.name: CloudSyncTest012
1206 * @tc.desc: insert data before re-SetDistributedTable and sync is ok
1207 * @tc.type: FUNC
1208 * @tc.require:
1209 * @tc.author: wangxiangdong
1210 */
1211 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest012, TestSize.Level0)
1212 {
1213 /**
1214 * @tc.steps: step1. insert 1 record to user table
1215 * @tc.expected: step1. OK.
1216 */
1217 const int actualCount = 1;
1218 InsertUserTableRecord(tableName_, actualCount);
1219 /**
1220 * @tc.steps: step2. sync data to cloud
1221 * @tc.expected: step2. OK.
1222 */
1223 Query query = Query::Select().FromTable({ tableName_ });
1224 BlockSync(query, delegate_, g_actualDBStatus);
1225 CheckCloudTableCount(tableName_, 1);
1226 /**
1227 * @tc.steps: step3. drop data table then close db
1228 * @tc.expected: step3. OK.
1229 */
1230 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
1231 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
1232 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
1233 delegate_ = nullptr;
1234 /**
1235 * @tc.steps: step4. recreate data table and reopen database
1236 * @tc.expected: step4. OK.
1237 */
1238 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
1239 RelationalStoreDelegate::Option option;
1240 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
1241 ASSERT_NE(delegate_, nullptr);
1242 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
1243 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
1244 DataBaseSchema dataBaseSchema = GetSchema();
1245 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1246 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1247 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1248 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1249 /**
1250 * @tc.steps: step5. insert data to new table
1251 * @tc.expected: step5. OK.
1252 */
1253 int begin = 1;
1254 InsertUserTableRecord(tableName_, actualCount, begin);
1255 /**
1256 * @tc.steps: step6. sync and cloud data should be deleted
1257 * @tc.expected: step6. OK.
1258 */
1259 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
1260 BlockSync(query, delegate_, g_actualDBStatus);
1261 CheckCloudTableCount(tableName_, 1);
1262 }
1263
1264 /**
1265 * @tc.name: CloudSyncObserverTest001
1266 * @tc.desc: test cloud sync multi observer
1267 * @tc.type: FUNC
1268 * @tc.require:
1269 * @tc.author: zhangshijie
1270 */
1271 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level0)
1272 {
1273 // prepare data
1274 const int actualCount = 10;
1275 InsertUserTableRecord(tableName_, actualCount);
1276
1277 /**
1278 * @tc.steps:step1. open two delegate with two observer.
1279 * @tc.expected: step1. ok.
1280 */
1281 RelationalStoreDelegate::Option option;
1282 auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
1283 ASSERT_NE(observer1, nullptr);
1284 option.observer = observer1;
1285 RelationalStoreDelegate *delegate1 = nullptr;
1286 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
1287 ASSERT_NE(delegate1, nullptr);
1288
1289 auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
1290 ASSERT_NE(observer2, nullptr);
1291 option.observer = observer2;
1292 RelationalStoreDelegate *delegate2 = nullptr;
1293 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
1294 ASSERT_NE(delegate2, nullptr);
1295
1296 /**
1297 * @tc.steps:step2. insert 1-10 cloud data, start.
1298 * @tc.expected: step2. ok.
1299 */
1300 InsertCloudTableRecord(0, actualCount, actualCount, false);
1301 Query query = Query::Select().FromTable({ tableName_ });
1302 BlockSync(query, delegate_, g_actualDBStatus);
1303
1304 /**
1305 * @tc.steps:step3. check observer.
1306 * @tc.expected: step3. ok.
1307 */
1308 EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1309 EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1310
1311 /**
1312 * @tc.steps:step4. insert 11-20 cloud data, start.
1313 * @tc.expected: step4. ok.
1314 */
1315 delegate2->UnRegisterObserver();
1316 observer2->ResetCloudSyncToZero();
1317 int64_t begin = 11;
1318 InsertCloudTableRecord(begin, actualCount, actualCount, false);
1319 BlockSync(query, delegate_, g_actualDBStatus);
1320
1321 /**
1322 * @tc.steps:step5. check observer.
1323 * @tc.expected: step5. ok.
1324 */
1325 EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1326 EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1327
1328 delete observer1;
1329 observer1 = nullptr;
1330 EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1331
1332 delete observer2;
1333 observer2 = nullptr;
1334 EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1335 }
1336
1337 /**
1338 * @tc.name: CloudPrioritySyncTest001
1339 * @tc.desc: use priority sync interface when query in or from table
1340 * @tc.type: FUNC
1341 * @tc.require:
1342 * @tc.author: chenchaohao
1343 */
1344 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level0)
1345 {
1346 /**
1347 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1348 * @tc.expected: step1. ok.
1349 */
1350 const int actualCount = 10; // 10 is count of records
1351 InsertUserTableRecord(tableName_, actualCount);
1352 std::vector<std::string> idValue = {"0", "1", "2"};
1353 Query query = Query::Select().From(tableName_).In("id", idValue);
1354
1355 /**
1356 * @tc.steps:step2. check ParserQueryNodes
1357 * @tc.expected: step2. ok.
1358 */
__anon28bcdbc61c02(const std::string &tableName, VBucket &extend) 1359 virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1360 EXPECT_EQ(tableName_, tableName);
1361 if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1362 return;
1363 }
1364 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1365 DBStatus status = OK;
1366 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1367 EXPECT_EQ(status, OK);
1368 ASSERT_EQ(queryNodes.size(), 1u);
1369 EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1370 EXPECT_EQ(queryNodes[0].fieldName, "id");
1371 ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1372 for (size_t i = 0u; i < idValue.size(); i++) {
1373 std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1374 EXPECT_EQ(val, idValue[i]);
1375 }
1376 });
1377 BlockPrioritySync(query, delegate_, true, OK);
1378 virtualCloudDb_->ForkQuery(nullptr);
1379 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1380
1381 /**
1382 * @tc.steps:step3. use priority sync interface but not priority.
1383 * @tc.expected: step3. ok.
1384 */
1385 query = Query::Select().FromTable({ tableName_ });
1386 BlockPrioritySync(query, delegate_, false, OK);
1387 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1388
1389 /**
1390 * @tc.steps:step4. insert user table record and query from table, then priority sync.
1391 * @tc.expected: step4. ok.
1392 */
1393 InsertUserTableRecord(tableName_, actualCount, actualCount);
1394 BlockPrioritySync(query, delegate_, true, OK);
1395 CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1396 }
1397
1398
1399 /**
1400 * @tc.name: CloudPrioritySyncTest002
1401 * @tc.desc: priority sync in some abnormal query situations
1402 * @tc.type: FUNC
1403 * @tc.require:
1404 * @tc.author: chenchaohao
1405 */
1406 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level0)
1407 {
1408 /**
1409 * @tc.steps:step1. insert user table record.
1410 * @tc.expected: step1. ok.
1411 */
1412 const int actualCount = 1; // 1 is count of records
1413 InsertUserTableRecord(tableName_, actualCount);
1414
1415 /**
1416 * @tc.steps:step2. query select tablename then priority sync.
1417 * @tc.expected: step2. invalid.
1418 */
1419 Query query = Query::Select(tableName_);
1420 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1421 CheckCloudTableCount(tableName_, 0);
1422
1423 /**
1424 * @tc.steps:step3. query select without from then priority sync.
1425 * @tc.expected: step3. invalid.
1426 */
1427 query = Query::Select();
1428 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1429 CheckCloudTableCount(tableName_, 0);
1430
1431 /**
1432 * @tc.steps:step4. query select and from without in then priority sync.
1433 * @tc.expected: step4. invalid.
1434 */
1435 query = Query::Select().From(tableName_);
1436 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1437 CheckCloudTableCount(tableName_, 0);
1438
1439 /**
1440 * @tc.steps:step5. query select and fromtable then priority sync.
1441 * @tc.expected: step5. not support.
1442 */
1443 query = Query::Select().From(tableName_).FromTable({tableName_});
1444 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1445 CheckCloudTableCount(tableName_, 0);
1446
1447 /**
1448 * @tc.steps:step6. query select and from with other predicates then priority sync.
1449 * @tc.expected: step6. not support.
1450 */
1451 query = Query::Select().From(tableName_).IsNotNull("id");
1452 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1453 CheckCloudTableCount(tableName_, 0);
1454
1455 /**
1456 * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1457 * @tc.expected: step7 not support.
1458 */
1459 std::vector<std::string> idValue = {"0"};
1460 query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1461 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1462 CheckCloudTableCount(tableName_, 0);
1463
1464 /**
1465 * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1466 * @tc.expected: step8. not support.
1467 */
1468 std::vector<std::string> heightValue = {"155.10"};
1469 query = Query::Select().From(tableName_).In("height", heightValue);
1470 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1471 CheckCloudTableCount(tableName_, 0);
1472
1473 /**
1474 * @tc.steps:step9. query in count greater than 100.
1475 * @tc.expected: step9. over max limits.
1476 */
1477 idValue.resize(101); // 101 > 100
1478 query = Query::Select().From(tableName_).In("id", idValue);
1479 BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1480 CheckCloudTableCount(tableName_, 0);
1481 }
1482
1483 /**
1484 * @tc.name: CloudPrioritySyncTest003
1485 * @tc.desc: priority sync when normal syncing
1486 * @tc.type: FUNC
1487 * @tc.require:
1488 * @tc.author: chenchaohao
1489 */
1490 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level0)
1491 {
1492 /**
1493 * @tc.steps:step1. insert user table record.
1494 * @tc.expected: step1. ok.
1495 */
1496 const int actualCount = 10; // 10 is count of records
1497 InsertUserTableRecord(tableName_, actualCount);
1498
1499 /**
1500 * @tc.steps:step2. begin normal sync and priority sync.
1501 * @tc.expected: step2. ok.
1502 */
1503 Query normalQuery = Query::Select().FromTable({tableName_});
1504 std::vector<std::string> idValue = {"0", "1", "2"};
1505 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1506 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1507 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1508 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1509 virtualCloudDb_->Reset();
1510 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1511 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1512 }
1513
1514 /**
1515 * @tc.name: CloudPrioritySyncTest004
1516 * @tc.desc: non-primarykey table priority sync
1517 * @tc.type: FUNC
1518 * @tc.require:
1519 * @tc.author: chenchaohao
1520 */
1521 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level0)
1522 {
1523 /**
1524 * @tc.steps:step1. insert user non-primarykey table record.
1525 * @tc.expected: step1. ok.
1526 */
1527 const int actualCount = 10; // 10 is count of records
1528 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1529
1530 /**
1531 * @tc.steps:step2. begin priority sync.
1532 * @tc.expected: step2. not support.
1533 */
1534 std::vector<std::string> idValue = {"0", "1", "2"};
1535 Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1536 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1537 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1538
1539 /**
1540 * @tc.steps:step3. begin priority sync when in rowid.
1541 * @tc.expected: step3. invalid.
1542 */
1543 std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1544 query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1545 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1546 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1547 }
1548
1549 /**
1550 * @tc.name: CloudPrioritySyncTest005
1551 * @tc.desc: priority sync but don't have records
1552 * @tc.type: FUNC
1553 * @tc.require:
1554 * @tc.author: chenchaohao
1555 */
1556 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level0)
1557 {
1558 /**
1559 * @tc.steps:step1. insert user non-primarykey table record.
1560 * @tc.expected: step1. ok.
1561 */
1562 const int actualCount = 10; // 10 is count of records
1563 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1564
1565 /**
1566 * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1567 * @tc.expected: step2. ok.
1568 */
1569 std::vector<std::string> idValue = {"0", "1", "2"};
1570 Query query = Query::Select().From(tableName_).In("id", idValue);
1571 BlockPrioritySync(query, delegate_, true, OK);
1572 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1573 CheckCloudTableCount(tableName_, 0);
1574 }
1575
1576 /**
1577 * @tc.name: CloudPrioritySyncTest006
1578 * @tc.desc: priority sync tasks greater than limit
1579 * @tc.type: FUNC
1580 * @tc.require:
1581 * @tc.author: chenchaohao
1582 */
1583 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level0)
1584 {
1585 /**
1586 * @tc.steps:step1. insert user table record.
1587 * @tc.expected: step1. ok.
1588 */
1589 const int actualCount = 10; // 10 is count of records
1590 InsertUserTableRecord(tableName_, actualCount);
1591
1592 /**
1593 * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1594 * @tc.expected: step2. ok.
1595 */
1596 std::vector<std::string> idValue = {"0", "1", "2"};
1597 Query query = Query::Select().From(tableName_).In("id", idValue);
1598 std::mutex dataMutex;
1599 std::condition_variable cv;
1600 std::mutex callbackMutex;
1601 std::condition_variable callbackCv;
1602 bool finish = false;
1603 size_t finishCount = 0u;
__anon28bcdbc61d02(const std::string &tableName, VBucket &extend) 1604 virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1605 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1606 cv.wait(uniqueLock, [&finish]() {
1607 return finish;
1608 });
1609 });
__anon28bcdbc61f02(const std::map<std::string, SyncProcess> &process) 1610 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1611 for (const auto &item: process) {
1612 if (item.second.process == DistributedDB::FINISHED) {
1613 {
1614 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1615 finishCount++;
1616 }
1617 callbackCv.notify_one();
1618 }
1619 }
1620 };
1621 CloudSyncOption option;
1622 PrepareOption(option, query, true);
1623 for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1624 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1625 }
1626 ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1627 {
1628 std::lock_guard<std::mutex> autoLock(dataMutex);
1629 finish = true;
1630 }
1631 cv.notify_all();
1632 virtualCloudDb_->ForkQuery(nullptr);
1633 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62002() 1634 callbackCv.wait(callbackLock, [&finishCount]() {
1635 return (finishCount == 32u); // 32 is count of finished sync tasks
1636 });
1637 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1638 }
1639
1640 /**
1641 * @tc.name: CloudPrioritySyncTest007
1642 * @tc.desc: priority normal priority normal when different query
1643 * @tc.type: FUNC
1644 * @tc.require:
1645 * @tc.author: chenchaohao
1646 */
1647 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level0)
1648 {
1649 /**
1650 * @tc.steps:step1. insert user table record.
1651 * @tc.expected: step1. ok.
1652 */
1653 const int actualCount = 10; // 10 is count of records
1654 InsertUserTableRecord(tableName_, actualCount);
1655
1656 /**
1657 * @tc.steps:step2. set callback to check during sync.
1658 * @tc.expected: step2. ok.
1659 */
1660 std::atomic<int> count = 0;
1661 SetForkQueryForCloudPrioritySyncTest007(count);
1662
1663 /**
1664 * @tc.steps:step3. perform priority normal priority normal sync.
1665 * @tc.expected: step3. ok.
1666 */
1667 std::vector<std::string> idValue = {"0"};
1668 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1669 CloudSyncOption option;
1670 PrepareOption(option, priorytyQuery, true);
1671 option.lockAction = static_cast<LockAction>(0xff); // lock all
1672 std::mutex callbackMutex;
1673 std::condition_variable callbackCv;
1674 size_t finishCount = 0u;
__anon28bcdbc62102(const std::map<std::string, SyncProcess> &process) 1675 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1676 for (const auto &item: process) {
1677 if (item.second.process == DistributedDB::FINISHED) {
1678 {
1679 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1680 finishCount++;
1681 }
1682 callbackCv.notify_one();
1683 }
1684 }
1685 };
1686 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1687 Query normalQuery = Query::Select().FromTable({tableName_});
1688 PrepareOption(option, normalQuery, false);
1689 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1690 idValue = {"1"};
1691 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1692 PrepareOption(option, priorytyQuery, true);
1693 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1694 PrepareOption(option, normalQuery, false);
1695 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1696 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62202() 1697 callbackCv.wait(callbackLock, [&finishCount]() {
1698 return (finishCount == 4u); // 4 is count of finished sync tasks
1699 });
1700 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1701 }
1702
1703 /**
1704 * @tc.name: CloudPrioritySyncTest008
1705 * @tc.desc: priority normal priority normal when different query
1706 * @tc.type: FUNC
1707 * @tc.require:
1708 * @tc.author: chenchaohao
1709 */
1710 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level0)
1711 {
1712 /**
1713 * @tc.steps:step1. insert user table record.
1714 * @tc.expected: step1. ok.
1715 */
1716 const int actualCount = 10; // 10 is count of records
1717 InsertUserTableRecord(tableName_, actualCount);
1718
1719 /**
1720 * @tc.steps:step2. set callback to check during sync.
1721 * @tc.expected: step2. ok.
1722 */
1723 std::atomic<int> count = 0;
1724 SetForkQueryForCloudPrioritySyncTest008(count);
1725
1726 /**
1727 * @tc.steps:step3. perform priority normal priority normal sync.
1728 * @tc.expected: step3. ok.
1729 */
1730 std::vector<std::string> idValue = {"0"};
1731 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1732 CloudSyncOption option;
1733 option.lockAction = static_cast<LockAction>(0xff); // lock all
1734 PrepareOption(option, priorytyQuery, true);
1735 std::mutex callbackMutex;
1736 std::condition_variable callbackCv;
1737 size_t finishCount = 0u;
__anon28bcdbc62302(const std::map<std::string, SyncProcess> &process) 1738 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1739 for (const auto &item: process) {
1740 if (item.second.process == DistributedDB::FINISHED) {
1741 {
1742 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1743 finishCount++;
1744 }
1745 callbackCv.notify_one();
1746 }
1747 }
1748 };
1749 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1750 Query normalQuery = Query::Select().FromTable({tableName_});
1751 PrepareOption(option, normalQuery, false);
1752 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1753 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1754 PrepareOption(option, priorytyQuery, true);
1755 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1756 PrepareOption(option, normalQuery, false);
1757 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1758 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62402() 1759 callbackCv.wait(callbackLock, [&finishCount]() {
1760 return (finishCount == 4u); // 4 is count of finished sync tasks
1761 });
1762 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1763 }
1764
1765 /**
1766 * @tc.name: CloudPrioritySyncTest009
1767 * @tc.desc: use priority sync interface when query equal to from table
1768 * @tc.type: FUNC
1769 * @tc.require:
1770 * @tc.author: zhangqiquan
1771 */
1772 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level0)
1773 {
1774 /**
1775 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1776 * @tc.expected: step1. ok.
1777 */
1778 const int actualCount = 5; // 5 is count of records
1779 InsertUserTableRecord(tableName_, actualCount);
1780 Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1781
1782 /**
1783 * @tc.steps:step2. check ParserQueryNodes
1784 * @tc.expected: step2. ok.
1785 */
__anon28bcdbc62502(const std::string &tableName, VBucket &extend) 1786 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1787 EXPECT_EQ(tableName_, tableName);
1788 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1789 DBStatus status = OK;
1790 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1791 EXPECT_EQ(status, OK);
1792 ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1793 });
1794 BlockPrioritySync(query, delegate_, true, OK);
1795 virtualCloudDb_->ForkQuery(nullptr);
1796 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1797 }
1798
1799 /**
1800 * @tc.name: CloudPrioritySyncTest010
1801 * @tc.desc: priority sync after cloud delete
1802 * @tc.type: FUNC
1803 * @tc.require:
1804 * @tc.author: chenchaohao
1805 */
1806 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level0)
1807 {
1808 /**
1809 * @tc.steps:step1. insert user table record.
1810 * @tc.expected: step1. ok.
1811 */
1812 const int actualCount = 10; // 10 is count of records
1813 InsertUserTableRecord(tableName_, actualCount);
1814
1815 /**
1816 * @tc.steps:step2. normal sync and then delete cloud records.
1817 * @tc.expected: step2. ok.
1818 */
1819 Query query = Query::Select().FromTable({tableName_});
1820 BlockSync(query, delegate_, g_actualDBStatus);
1821 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1822 DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1823 CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1824 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1825
1826 /**
1827 * @tc.steps:step3. priory sync and set query then check user table records.
1828 * @tc.expected: step3. ok.
1829 */
1830 std::vector<std::string> idValue = {"3", "4", "5"};
1831 query = Query::Select().From(tableName_).In("id", idValue);
1832 BlockPrioritySync(query, delegate_, true, OK);
1833 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1834 idValue = {"0", "1", "2"};
1835 query = Query::Select().From(tableName_).In("id", idValue);
1836 BlockPrioritySync(query, delegate_, true, OK);
1837 CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1838 }
1839
1840 /**
1841 * @tc.name: CloudPrioritySyncTest011
1842 * @tc.desc: priority sync after cloud insert
1843 * @tc.type: FUNC
1844 * @tc.require:
1845 * @tc.author: chenchaohao
1846 */
1847 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level0)
1848 {
1849 /**
1850 * @tc.steps:step1. insert cloud table record.
1851 * @tc.expected: step1. ok.
1852 */
1853 const int actualCount = 10; // 10 is count of records
1854 InsertCloudTableRecord(0, actualCount, actualCount, false);
1855 std::vector<std::string> idValue = {"0", "1", "2"};
1856 Query query = Query::Select().From(tableName_).In("id", idValue);
1857 std::atomic<int> count = 0;
1858
1859 /**
1860 * @tc.steps:step2. check user records when query.
1861 * @tc.expected: step1. ok.
1862 */
__anon28bcdbc62602(const std::string &, VBucket &) 1863 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1864 count++;
1865 if (count == 1) { // taskid1
1866 std::this_thread::sleep_for(std::chrono::seconds(1));
1867 }
1868 if (count == 2) { // taskid2
1869 CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1870 }
1871 });
1872 CloudSyncOption option;
1873 PrepareOption(option, query, true);
1874 std::mutex callbackMutex;
1875 std::condition_variable callbackCv;
1876 size_t finishCount = 0u;
__anon28bcdbc62702(const std::map<std::string, SyncProcess> &process) 1877 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1878 for (const auto &item: process) {
1879 if (item.second.process == DistributedDB::FINISHED) {
1880 {
1881 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1882 finishCount++;
1883 }
1884 callbackCv.notify_one();
1885 }
1886 }
1887 };
1888
1889 /**
1890 * @tc.steps:step3. begin sync and check user record.
1891 * @tc.expected: step3. ok.
1892 */
1893 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1894 idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1895 query = Query::Select().From(tableName_).In("id", idValue);
1896 PrepareOption(option, query, true);
1897 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1898 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon28bcdbc62802() 1899 callbackCv.wait(callbackLock, [&finishCount]() {
1900 return (finishCount == 2u); // 2 is count of finished sync tasks
1901 });
1902 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1903 }
1904
1905 /**
1906 * @tc.name: CloudPrioritySyncTest012
1907 * @tc.desc: priority or normal sync when waittime > 300s or < -1
1908 * @tc.type: FUNC
1909 * @tc.require:
1910 * @tc.author: chenchaohao
1911 */
1912 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level0)
1913 {
1914 /**
1915 * @tc.steps:step1. insert cloud table record.
1916 * @tc.expected: step1. ok.
1917 */
1918 const int actualCount = 10; // 10 is count of records
1919 InsertCloudTableRecord(0, actualCount, actualCount, false);
1920 std::vector<std::string> idValue = {"0", "1", "2"};
1921 Query query = Query::Select().From(tableName_).In("id", idValue);
1922
1923 /**
1924 * @tc.steps:step2. set waittime < -1 then begin sync.
1925 * @tc.expected: step2. invalid.
1926 */
1927 CloudSyncOption option;
1928 PrepareOption(option, query, true);
1929 option.waitTime = -2; // -2 < -1;
1930 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1931 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1932
1933 /**
1934 * @tc.steps:step3. set waittime > 300s then begin sync.
1935 * @tc.expected: step3. invalid.
1936 */
1937
1938 option.waitTime = 300001; // 300001 > 300s
1939 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1940 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1941 }
1942
1943 /**
1944 * @tc.name: CloudPrioritySyncTest013
1945 * @tc.desc: priority sync in some abnormal composite pk query situations
1946 * @tc.type: FUNC
1947 * @tc.require:
1948 * @tc.author: chenchaohao
1949 */
1950 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level0)
1951 {
1952 /**
1953 * @tc.steps:step1. insert user table record.
1954 * @tc.expected: step1. ok.
1955 */
1956 const int actualCount = 1; // 1 is count of records
1957 InsertUserTableRecord(tableName_, actualCount);
1958
1959 /**
1960 * @tc.steps:step2. query only begingroup then priority sync.
1961 * @tc.expected: step2. invalid.
1962 */
1963 Query query = Query::Select().From(tableName_).BeginGroup();
1964 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1965 CheckCloudTableCount(tableName_, 0);
1966
1967 /**
1968 * @tc.steps:step3. query only endgroup then priority sync.
1969 * @tc.expected: step3. invalid.
1970 */
1971 query = Query::Select().From(tableName_).EndGroup();
1972 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1973 CheckCloudTableCount(tableName_, 0);
1974
1975 /**
1976 * @tc.steps:step4. query only begingroup and endgroup then priority sync.
1977 * @tc.expected: step4. invalid.
1978 */
1979 query = Query::Select().From(tableName_).BeginGroup().EndGroup();
1980 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1981 CheckCloudTableCount(tableName_, 0);
1982
1983 /**
1984 * @tc.steps:step5. query and from table then priority sync.
1985 * @tc.expected: step5. invalid.
1986 */
1987 query = Query::Select().And().From(tableName_);
1988 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1989 CheckCloudTableCount(tableName_, 0);
1990
1991 /**
1992 * @tc.steps:step6. query or from table then priority sync.
1993 * @tc.expected: step6. invalid.
1994 */
1995 query = Query::Select().Or().From(tableName_);
1996 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1997 CheckCloudTableCount(tableName_, 0);
1998
1999 /**
2000 * @tc.steps:step7. query begingroup from table then priority sync.
2001 * @tc.expected: step7 invalid.
2002 */
2003 query = Query::Select().BeginGroup().From(tableName_);
2004 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2005 CheckCloudTableCount(tableName_, 0);
2006
2007 /**
2008 * @tc.steps:step8. query endgroup from table then priority sync.
2009 * @tc.expected: step8 invalid.
2010 */
2011 query = Query::Select().EndGroup().From(tableName_);
2012 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
2013 CheckCloudTableCount(tableName_, 0);
2014
2015 /**
2016 * @tc.steps:step9. query and in then priority sync.
2017 * @tc.expected: step9. invalid.
2018 */
2019 std::vector<std::string> idValue = {"0"};
2020 query = Query::Select().From(tableName_).And().In("id", idValue);
2021 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
2022 CheckCloudTableCount(tableName_, 0);
2023
2024 /**
2025 * @tc.steps:step10. query when the table name does not exit then priority sync.
2026 * @tc.expected: step10. schema mismatch.
2027 */
2028 query = Query::Select().From("tableName").And().In("id", idValue);
2029 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2030 CheckCloudTableCount(tableName_, 0);
2031
2032 /**
2033 * @tc.steps:step11. query when the table name does not exit then priority sync.
2034 * @tc.expected: step11. schema mismatch.
2035 */
2036 query = Query::Select().From("tableName").In("id", idValue);
2037 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
2038 CheckCloudTableCount(tableName_, 0);
2039
2040 /**
2041 * @tc.steps:step12. query when the table name does not exit then sync.
2042 * @tc.expected: step12. schema mismatch.
2043 */
2044 query = Query::Select().FromTable({"tableName"});
2045 BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
2046 CheckCloudTableCount(tableName_, 0);
2047 }
2048
CheckUploadInfoAfterSync(int recordCount,SyncProcess & normalLast)2049 void DistributedDBCloudCheckSyncTest::CheckUploadInfoAfterSync(int recordCount, SyncProcess &normalLast)
2050 {
2051 uint32_t uintRecordCount = static_cast<uint32_t>(recordCount);
2052 const Info expectUploadInfo = {2u, uintRecordCount, uintRecordCount, 0u, uintRecordCount, 0u, 0u};
2053 for (const auto &table : normalLast.tableProcess) {
2054 CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2055 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2056 }
2057 }
2058
2059 /**
2060 * @tc.name: CloudPrioritySyncTest014
2061 * @tc.desc: Check the uploadInfo after the normal sync is paused by the priority sync
2062 * @tc.type: FUNC
2063 * @tc.require:
2064 * @tc.author: suyue
2065 */
2066 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest014, TestSize.Level0)
2067 {
2068 /**
2069 * @tc.steps:step1. insert data and sync pause.
2070 * @tc.expected: step1. ok.
2071 */
2072 const int recordCount = 50; // 50 is count of data records
2073 InsertUserTableRecord(tableName_, recordCount, 0);
2074 Query normalQuery = Query::Select().FromTable({tableName_});
2075 CloudSyncOption normalOption;
2076 PrepareOption(normalOption, normalQuery, false);
2077 bool isUpload = false;
2078 uint32_t blockTime = 1000;
__anon28bcdbc62902(const std::string &tableName, VBucket &extend) 2079 virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
2080 if (isUpload == false) {
2081 isUpload = true;
2082 std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
2083 }
2084 });
2085 bool isFinish = false;
2086 bool priorityFinish = false;
2087 SyncProcess normalLast;
__anon28bcdbc62a02(const std::map<std::string, SyncProcess> &process) 2088 auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2089 for (const auto &item : process) {
2090 if (item.second.process == DistributedDB::FINISHED) {
2091 isFinish = true;
2092 ASSERT_EQ(priorityFinish, true);
2093 normalLast = item.second;
2094 }
2095 }
2096 };
2097 ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2098
2099 /**
2100 * @tc.steps:step2. priority sync.
2101 * @tc.expected: step2. ok.
2102 */
2103 while (isUpload == false) {
2104 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2105 }
2106 std::vector<std::string> idValues = {"0", "1", "2", "3", "4"};
2107 Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2108 CloudSyncOption priorityOption;
2109 PrepareOption(priorityOption, priorityQuery, true);
__anon28bcdbc62b02(const std::map<std::string, SyncProcess> &process) 2110 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2111 for (const auto &item : process) {
2112 if (item.second.process == DistributedDB::FINISHED) {
2113 priorityFinish = true;
2114 }
2115 }
2116 };
2117 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2118 while (isFinish == false || priorityFinish == false) {
2119 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2120 }
2121
2122 /**
2123 * @tc.steps:step3. check uploadInfo after sync finished.
2124 * @tc.expected: step3. ok.
2125 */
2126 CheckUploadInfoAfterSync(recordCount, normalLast);
2127 virtualCloudDb_->ForkUpload(nullptr);
2128 }
2129
2130 /**
2131 * @tc.name: CloudPrioritySyncTest015
2132 * @tc.desc: Check the uploadInfo and the downloadInfo after the normal sync is paused by the priority sync
2133 * @tc.type: FUNC
2134 * @tc.require:
2135 * @tc.author: caihaoting
2136 */
2137 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest015, TestSize.Level0)
2138 {
2139 /**
2140 * @tc.steps:step1. insert data and sync pause.
2141 * @tc.expected: step1. ok.
2142 */
2143 const int localCount = 10; // 10 is count of local data records
2144 const int cloudCount = 50; // 50 is count of cloud data records
2145 InsertUserTableRecord(tableName_, localCount, 0);
2146 InsertCloudTableRecord(20, cloudCount, 0, false); // 20 is begin number
2147 uint32_t blockTime = 500; // 500ms
2148 virtualCloudDb_->SetBlockTime(blockTime);
2149 Query normalQuery = Query::Select().FromTable({tableName_});
2150 CloudSyncOption normalOption;
2151 PrepareOption(normalOption, normalQuery, false);
2152 bool isFinish = false;
2153 bool priorityFinish = false;
2154 SyncProcess normalLast;
__anon28bcdbc62c02(const std::map<std::string, SyncProcess> &process) 2155 auto normalCallback = [&isFinish, &priorityFinish, &normalLast](const std::map<std::string, SyncProcess> &process) {
2156 for (const auto &item : process) {
2157 if (item.second.process == DistributedDB::FINISHED) {
2158 isFinish = true;
2159 ASSERT_EQ(priorityFinish, true);
2160 normalLast = item.second;
2161 }
2162 }
2163 };
2164 ASSERT_EQ(delegate_->Sync(normalOption, normalCallback), OK);
2165
2166 /**
2167 * @tc.steps:step2. priority sync.
2168 * @tc.expected: step2. ok.
2169 */
2170 std::vector<std::string> idValues = {"10", "11", "12", "13", "14"};
2171 Query priorityQuery = Query::Select().From(tableName_).In("id", idValues);
2172 CloudSyncOption priorityOption;
2173 PrepareOption(priorityOption, priorityQuery, true);
__anon28bcdbc62d02(const std::map<std::string, SyncProcess> &process) 2174 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
2175 for (const auto &item : process) {
2176 if (item.second.process == DistributedDB::FINISHED) {
2177 priorityFinish = true;
2178 }
2179 }
2180 };
2181 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
2182 while (isFinish == false || priorityFinish == false) {
2183 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
2184 }
2185
2186 /**
2187 * @tc.steps:step3. check uploadInfo and downloadInfo after sync finished.
2188 * @tc.expected: step3. ok.
2189 */
2190 uint32_t uintLocalCount = static_cast<uint32_t>(localCount);
2191 uint32_t uintCloudCount = static_cast<uint32_t>(cloudCount);
2192 const Info expectUploadInfo = {1u, uintLocalCount, uintLocalCount, 0u, uintLocalCount, 0u, 0u};
2193 const Info expectDownloadInfo = {1u, uintCloudCount, uintCloudCount, 0u, uintCloudCount, 0u, 0u};
2194 for (const auto &table : normalLast.tableProcess) {
2195 CheckUploadInfo(table.second.upLoadInfo, expectUploadInfo);
2196 CheckDownloadInfo(table.second.downLoadInfo, expectDownloadInfo);
2197 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
2198 }
2199 CheckUserTableResult(db_, tableName_, 60);
2200 }
2201
2202 /**
2203 * @tc.name: CloudPrioritySyncTest016
2204 * @tc.desc: priority sync when normal syncing
2205 * @tc.type: FUNC
2206 * @tc.require:
2207 * @tc.author: wangxiangdong
2208 */
2209 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level0)
2210 {
2211 /**
2212 * @tc.steps:step1. insert cloud table record.
2213 * @tc.expected: step1. ok.
2214 */
2215 const int actualCount = 60; // 60 is count of records
2216 InsertCloudTableRecord(0, actualCount, 0, false);
2217 InsertUserTableRecord(tableName_, 10);
2218
2219 /**
2220 * @tc.steps:step2. begin normal sync and priority sync.
2221 * @tc.expected: step2. ok.
2222 */
2223 Query normalQuery = Query::Select().FromTable({tableName_});
2224 std::vector<std::string> idValue = {"0", "1", "2"};
2225 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
2226 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
2227 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
2228 virtualCloudDb_->Reset();
2229 CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
2230 /**
2231 * @tc.steps:step3. check sync process result.
2232 * @tc.expected: step3. ok.
2233 */
2234 std::vector<DistributedDB::SyncProcess> expectSyncResult = {
2235 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2236 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2237 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
2238 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
2239 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
2240 };
2241 EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
2242 }
2243
2244 /**
2245 * @tc.name: LogicDeleteSyncTest001
2246 * @tc.desc: sync with logic delete
2247 * @tc.type: FUNC
2248 * @tc.require:
2249 * @tc.author: zhangqiquan
2250 */
2251 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level0)
2252 {
2253 bool logicDelete = true;
2254 auto data = static_cast<PragmaData>(&logicDelete);
2255 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2256 int actualCount = 10;
2257 InitLogicDeleteDataEnv(actualCount, true);
2258 CheckLocalCount(actualCount);
2259 std::string device = "";
2260 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
2261 CheckLocalCount(actualCount);
2262 }
2263
2264 /**
2265 * @tc.name: LogicDeleteSyncTest002
2266 * @tc.desc: sync without logic delete
2267 * @tc.type: FUNC
2268 * @tc.require:
2269 * @tc.author: zhangqiquan
2270 */
2271 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level0)
2272 {
2273 bool logicDelete = false;
2274 auto data = static_cast<PragmaData>(&logicDelete);
2275 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2276 int actualCount = 10;
2277 InitLogicDeleteDataEnv(actualCount);
2278 CheckLocalCount(0);
2279 }
2280
2281 /**
2282 * @tc.name: LogicDeleteSyncTest003
2283 * @tc.desc: sync with logic delete and check observer
2284 * @tc.type: FUNC
2285 * @tc.require:
2286 * @tc.author: bty
2287 */
2288 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level0)
2289 {
2290 /**
2291 * @tc.steps:step1. register observer.
2292 * @tc.expected: step1. ok.
2293 */
2294 RelationalStoreDelegate::Option option;
2295 auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
2296 ASSERT_NE(observer, nullptr);
2297 observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
2298 EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
2299 ChangedData expectData;
2300 expectData.tableName = tableName_;
2301 expectData.type = ChangedDataType::DATA;
2302 expectData.field.push_back(std::string("id"));
2303 const int count = 10;
2304 for (int64_t i = 0; i < count; ++i) {
2305 expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
2306 }
2307 expectData.properties = { .isTrackedDataChange = true };
2308 observer->SetExpectedResult(expectData);
2309
2310 /**
2311 * @tc.steps:step2. set tracker table
2312 * @tc.expected: step2. ok.
2313 */
2314 TrackerSchema trackerSchema;
2315 trackerSchema.tableName = tableName_;
2316 trackerSchema.trackerColNames = { "id" };
2317 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2318
2319 /**
2320 * @tc.steps:step3. set logic delete and sync
2321 * @tc.expected: step3. ok.
2322 */
2323 bool logicDelete = true;
2324 auto data = static_cast<PragmaData>(&logicDelete);
2325 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2326 int actualCount = 10;
2327 InitLogicDeleteDataEnv(actualCount);
2328 CheckLocalCount(actualCount);
2329 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2330 observer->ClearChangedData();
2331
2332 /**
2333 * @tc.steps:step4. unSetTrackerTable and sync
2334 * @tc.expected: step4. ok.
2335 */
2336 expectData.properties = { .isTrackedDataChange = false };
2337 observer->SetExpectedResult(expectData);
2338 trackerSchema.trackerColNames = {};
2339 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
2340 InsertUserTableRecord(tableName_, actualCount);
2341 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2342 for (int i = 0; i < actualCount + actualCount; ++i) {
2343 DeleteCloudTableRecord(i);
2344 }
2345 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
2346 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
2347
2348 EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
2349 delete observer;
2350 observer = nullptr;
2351 }
2352
2353 /**
2354 * @tc.name: LogicDeleteSyncTest004
2355 * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
2356 * @tc.type: FUNC
2357 * @tc.require:
2358 * @tc.author: chenchaohao
2359 */
2360 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level0)
2361 {
2362 /**
2363 * @tc.steps:step1. set logic delete
2364 * @tc.expected: step1. ok.
2365 */
2366 bool logicDelete = true;
2367 auto data = static_cast<PragmaData>(&logicDelete);
2368 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2369
2370 /**
2371 * @tc.steps:step2. cloud delete data then sync, check removedevicedata
2372 * @tc.expected: step2. ok.
2373 */
2374 int actualCount = 10;
2375 InitLogicDeleteDataEnv(actualCount);
2376 CheckLocalCount(actualCount);
2377 std::string device = "";
2378 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
2379 CheckLocalCount(actualCount);
2380 CheckLogCleaned(0);
2381 }
2382
2383 /**
2384 * @tc.name: LogicDeleteSyncTest005
2385 * @tc.desc: test pragma when set cmd is not logic delete
2386 * @tc.type: FUNC
2387 * @tc.require:
2388 * @tc.author: chenchaohao
2389 */
2390 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
2391 {
2392 /**
2393 * @tc.steps:step1. set cmd is auto sync
2394 * @tc.expected: step1. ok.
2395 */
2396 bool logicDelete = true;
2397 auto data = static_cast<PragmaData>(&logicDelete);
2398 EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
2399 }
2400
2401 /**
2402 * @tc.name: LogicDeleteSyncTest006
2403 * @tc.desc: sync with logic delete after lock table.
2404 * @tc.type: FUNC
2405 * @tc.require:
2406 * @tc.author: liaoyonghuang
2407 */
2408 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level0)
2409 {
2410 /**
2411 * @tc.steps:step1. set logic delete
2412 * @tc.expected: step1. ok.
2413 */
2414 bool logicDelete = true;
2415 auto data = static_cast<PragmaData>(&logicDelete);
2416 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
2417
2418 /**
2419 * @tc.steps:step2. insert user table record and sync.
2420 * @tc.expected: step2. ok.
2421 */
2422 int dataCount = 10;
2423 InsertUserTableRecord(tableName_, dataCount);
2424 Query query = Query::Select().FromTable({ tableName_ });
2425 BlockSync(query, delegate_, g_actualDBStatus);
2426
2427 /**
2428 * @tc.steps:step3. Lock log table, and delete data from cloud table.
2429 * @tc.expected: step3. ok.
2430 */
2431 std::vector<std::vector<uint8_t>> hashKey;
2432 CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
2433 Lock(tableName_, hashKey, db_);
2434 for (int i = 0; i < dataCount; ++i) {
2435 DeleteCloudTableRecord(i);
2436 }
2437 /**
2438 * @tc.steps:step4. sync.
2439 * @tc.expected: step4. ok.
2440 */
2441 std::vector<DBStatus> actualDBStatus;
2442 BlockSync(query, delegate_, actualDBStatus);
2443 for (auto status : actualDBStatus) {
2444 EXPECT_EQ(status, OK);
2445 }
2446 }
2447
2448 /**
2449 * @tc.name: LogicDeleteSyncTest008
2450 * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2451 * @tc.type: FUNC
2452 * @tc.require:
2453 * @tc.author: liaoyonghuang
2454 */
2455 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level0)
2456 {
2457 /**
2458 * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2459 * @tc.expected: step1. ok.
2460 */
2461 int dataCount = 10;
2462 uint32_t logicDeleteCount = 4;
2463 InsertUserTableRecord(tableName_, dataCount);
2464 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2465 " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2466 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2467 InsertCloudTableRecord(0, dataCount, 0, false);
2468 sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2469 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2470 reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2471 /**
2472 * @tc.steps:step2. Do sync.
2473 * @tc.expected: step2. ok.
2474 */
2475 Query query = Query::Select().FromTable({ tableName_ });
2476 BlockSync(query, delegate_, g_actualDBStatus);
2477 /**
2478 * @tc.steps:step3. Check data flag in local DB.
2479 * @tc.expected: step3. No data flag is 0x800.
2480 */
2481 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2482 reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2483 }
2484
2485 /**
2486 * @tc.name: LockActionTest001
2487 * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2488 * @tc.type: FUNC
2489 * @tc.require:
2490 * @tc.author: wangxiangdong
2491 */
2492 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2493 {
2494 /**
2495 * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2496 * @tc.expected: step1. ok.
2497 */
2498 CloudSyncOption option;
2499 option.devices = { "CLOUD" };
2500 option.mode = SYNC_MODE_CLOUD_MERGE;
2501 option.query = Query::Select().FromTable({ tableName_ });
2502 option.waitTime = g_syncWaitTime;
2503 auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2504 | static_cast<uint32_t>(LockAction::DELETE);
2505 option.lockAction = static_cast<LockAction>(action);
2506 option.priorityTask = true;
2507 option.compensatedSyncOnly = true;
2508 const SyncProcessCallback onProcess;
2509 CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2510 EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2511 }
2512
2513 /**
2514 * @tc.name: LogicCreateRepeatedTableNameTest001
2515 * @tc.desc: test create repeated table name with different cases
2516 * @tc.type: FUNC
2517 * @tc.require:
2518 * @tc.author: wangxiangdong
2519 */
2520 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2521 {
2522 /**
2523 * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2524 * @tc.expected: step1. operate successfully.
2525 */
2526 DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2527 ASSERT_EQ(createStatus, DBStatus::OK);
2528 }
2529
2530 /**
2531 * @tc.name: SaveCursorTest001
2532 * @tc.desc: test whether cloud cursor is saved when first sync
2533 * @tc.type: FUNC
2534 * @tc.require:
2535 * @tc.author: chenchaohao
2536 */
2537 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level0)
2538 {
2539 /**
2540 * @tc.steps:step1. insert cloud records
2541 * @tc.expected: step1. OK
2542 */
2543 const int actualCount = 10;
2544 InsertCloudTableRecord(0, actualCount, 0, false);
2545
2546 /**
2547 * @tc.steps:step2. check cursor when first sync
2548 * @tc.expected: step2. OK
2549 */
__anon28bcdbc62e02(const std::string &tableName, VBucket &extend) 2550 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2551 EXPECT_EQ(tableName_, tableName);
2552 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2553 EXPECT_EQ(cursor, "0");
2554 });
2555 Query query = Query::Select().FromTable({ tableName_ });
2556 BlockSync(query, delegate_, g_actualDBStatus);
2557 CheckLocalCount(actualCount);
2558 }
2559
2560 /**
2561 * @tc.name: SaveCursorTest002
2562 * @tc.desc: test whether cloud cursor is saved when first download failed
2563 * @tc.type: FUNC
2564 * @tc.require:
2565 * @tc.author: chenchaohao
2566 */
2567 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level0)
2568 {
2569 /**
2570 * @tc.steps:step1. insert cloud records
2571 * @tc.expected: step1. OK
2572 */
2573 const int actualCount = 10;
2574 InsertCloudTableRecord(0, actualCount, 0, false);
2575
2576 /**
2577 * @tc.steps:step2. set download failed
2578 * @tc.expected: step2. OK
2579 */
2580 virtualCloudDb_->SetCloudError(true);
2581 Query query = Query::Select().FromTable({ tableName_ });
2582 BlockPrioritySync(query, delegate_, false, OK);
2583 CheckLocalCount(0);
2584
2585 /**
2586 * @tc.steps:step3. check cursor when query
2587 * @tc.expected: step3. OK
2588 */
2589 virtualCloudDb_->SetCloudError(false);
__anon28bcdbc62f02(const std::string &tableName, VBucket &extend) 2590 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2591 EXPECT_EQ(tableName_, tableName);
2592 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2593 EXPECT_EQ(cursor, "0");
2594 });
2595 BlockSync(query, delegate_, g_actualDBStatus);
2596 CheckLocalCount(actualCount);
2597 }
2598
2599 /**
2600 * @tc.name: SaveCursorTest003
2601 * @tc.desc: test whether cloud cursor is saved when first upload failed
2602 * @tc.type: FUNC
2603 * @tc.require:
2604 * @tc.author: chenchaohao
2605 */
2606 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level0)
2607 {
2608 /**
2609 * @tc.steps:step1. insert local records
2610 * @tc.expected: step1. OK
2611 */
2612 const int actualCount = 10;
2613 InsertUserTableRecord(tableName_, actualCount);
2614
2615 /**
2616 * @tc.steps:step2. set upload failed
2617 * @tc.expected: step2. OK
2618 */
2619 virtualCloudDb_->SetCloudError(true);
2620 Query query = Query::Select().FromTable({ tableName_ });
2621 BlockPrioritySync(query, delegate_, false, OK);
2622 CheckCloudTableCount(tableName_, 0);
2623
2624 /**
2625 * @tc.steps:step3. check cursor when query
2626 * @tc.expected: step3. OK
2627 */
2628 virtualCloudDb_->SetCloudError(false);
__anon28bcdbc63002(const std::string &tableName, VBucket &extend) 2629 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2630 EXPECT_EQ(tableName_, tableName);
2631 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2632 EXPECT_EQ(cursor, "0");
2633 });
2634 BlockSync(query, delegate_, g_actualDBStatus);
2635 CheckCloudTableCount(tableName_, actualCount);
2636 }
2637
2638 /**
2639 * @tc.name: RangeQuerySyncTest001
2640 * @tc.desc: Test sync that has option parameter with range query.
2641 * @tc.type: FUNC
2642 * @tc.require:
2643 * @tc.author: chenchaohao
2644 */
2645 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level0)
2646 {
2647 /**
2648 * @tc.steps:step1. insert user table record.
2649 * @tc.expected: step1. ok.
2650 */
2651 CloudSyncOption option;
2652 option.devices = { "CLOUD" };
2653 option.mode = SYNC_MODE_CLOUD_MERGE;
2654 option.waitTime = g_syncWaitTime;
2655 Query query = Query::Select().From(tableName_).Range({}, {});
2656 option.query = query;
2657
2658 /**
2659 * @tc.steps:step2. test normal sync with range query.
2660 * @tc.expected: step2. not support.
2661 */
2662 option.priorityTask = false;
2663 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2664
2665 /**
2666 * @tc.steps:step3. test Priority sync with range query.
2667 * @tc.expected: step3. not support.
2668 */
2669 option.priorityTask = true;
2670 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2671 }
2672
2673 /*
2674 * @tc.name: RangeQuerySyncTest002
2675 * @tc.desc: Test sync that has not option parameter with range query.
2676 * @tc.type: FUNC
2677 * @tc.require:
2678 * @tc.author: mazhao
2679 */
2680 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2681 {
2682 Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2683 ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2684 DBStatus::NOT_SUPPORT);
2685 }
2686
2687 /*
2688 * @tc.name: SameDataSync001
2689 * @tc.desc: Test query same data in one batch.
2690 * @tc.type: FUNC
2691 * @tc.require:
2692 * @tc.author: zqq
2693 */
2694 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level0)
2695 {
2696 /**
2697 * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2698 * @tc.expected: step1. OK
2699 */
2700 const int actualCount = 5;
2701 InsertCloudTableRecord(0, actualCount, 0, false);
2702 InsertCloudTableRecord(0, actualCount, 0, false);
2703 /**
2704 * @tc.steps:step2. call sync, local has one batch id:0-4
2705 * @tc.expected: step2. OK
2706 */
2707 Query query = Query::Select().FromTable({ tableName_ });
2708 BlockSync(query, delegate_, g_actualDBStatus);
2709 CheckLocalCount(actualCount);
2710 }
2711
2712 /*
2713 * @tc.name: SameDataSync002
2714 * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2715 * @tc.type: FUNC
2716 * @tc.require:
2717 * @tc.author: liaoyonghuang
2718 */
2719 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2720 {
2721 /**
2722 * @tc.steps:step1. insert local 1 record and sync to cloud.
2723 * @tc.expected: step1. OK
2724 */
2725 const int actualCount = 1;
2726 InsertUserTableRecord(tableName_, actualCount);
2727 Query query = Query::Select().FromTable({ tableName_ });
2728 BlockSync(query, delegate_, g_actualDBStatus);
2729
2730 /**
2731 * @tc.steps:step2. insert 2 records with the same primary key.
2732 * @tc.expected: step2. OK
2733 */
2734 std::vector<VBucket> record;
2735 std::vector<VBucket> extend;
2736 Timestamp now = TimeHelper::GetSysCurrentTime();
2737 VBucket data;
2738 std::vector<uint8_t> photo(0, 'v');
2739 data.insert_or_assign("id", std::string("0"));
2740 data.insert_or_assign("name", std::string("Cloud"));
2741 data.insert_or_assign("height", 166.0); // 166.0 is random double value
2742 data.insert_or_assign("married", false);
2743 data.insert_or_assign("photo", photo);
2744 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2745 record.push_back(data);
2746 data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2747 record.push_back(data);
2748 VBucket log;
2749 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2750 now / CloudDbConstant::TEN_THOUSAND));
2751 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2752 now / CloudDbConstant::TEN_THOUSAND));
2753 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2754 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2755 extend.push_back(log);
2756 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2757 extend.push_back(log);
2758 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2759
2760 /**
2761 * @tc.steps:step3. sync from cloud and check record.
2762 * @tc.expected: step3. The record with age of 14 has been updated locally.
2763 */
2764 BlockSync(query, delegate_, g_actualDBStatus);
2765 std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2766 int64_t actualAge = 0;
2767 int64_t expectAge = 14L;
__anon28bcdbc63102(sqlite3_stmt *stmt) 2768 RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2769 actualAge = sqlite3_column_int(stmt, 0);
2770 return E_OK;
2771 });
2772 EXPECT_EQ(actualAge, expectAge);
2773 }
2774
2775 /*
2776 * @tc.name: CreateDistributedTable001
2777 * @tc.desc: Test create distributed table when table not empty.
2778 * @tc.type: FUNC
2779 * @tc.require:
2780 * @tc.author: zqq
2781 */
2782 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level0)
2783 {
2784 const std::string table = "CreateDistributedTable001";
2785 const std::string createSQL =
2786 "CREATE TABLE IF NOT EXISTS " + table + "(" \
2787 "id TEXT PRIMARY KEY," \
2788 "name TEXT," \
2789 "height REAL ," \
2790 "photo BLOB," \
2791 "age INT);";
2792 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2793 int actualCount = 10;
2794 InsertUserTableRecord(table, actualCount);
2795 InsertCloudTableRecord(table, 0, actualCount, 0, true);
2796 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2797 DataBaseSchema dataBaseSchema = GetSchema();
2798 TableSchema schema = dataBaseSchema.tables.at(0);
2799 schema.name = table;
2800 schema.sharedTableName = "";
2801 dataBaseSchema.tables.push_back(schema);
2802 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2803 /**
2804 * @tc.steps:step2. call sync, local has one batch id:0-4
2805 * @tc.expected: step2. OK
2806 */
2807 Query query = Query::Select().FromTable({ table });
2808 BlockSync(query, delegate_, g_actualDBStatus);
2809 CheckCloudTableCount(table, actualCount);
2810 }
2811
2812 /*
2813 * @tc.name: CloseDbTest001
2814 * @tc.desc: Test process of db close during sync
2815 * @tc.type: FUNC
2816 * @tc.require:
2817 * @tc.author: bty
2818 */
2819 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2820 {
2821 /**
2822 * @tc.steps:step1. insert user table record.
2823 * @tc.expected: step1. ok.
2824 */
2825 const int actualCount = 10; // 10 is count of records
2826 InsertUserTableRecord(tableName_, actualCount);
2827
2828 /**
2829 * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2830 * @tc.expected: step2. ok.
2831 */
2832 std::mutex callMutex;
2833 int callCount = 0;
__anon28bcdbc63202(const std::string &, VBucket &) 2834 virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2835 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2836 });
2837 const auto callback = [&callCount, &callMutex](
__anon28bcdbc63302( const std::map<std::string, SyncProcess> &) 2838 const std::map<std::string, SyncProcess> &) {
2839 {
2840 std::lock_guard<std::mutex> autoLock(callMutex);
2841 callCount++;
2842 }
2843 };
2844 Query query = Query::Select().FromTable({ tableName_ });
2845 ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2846 std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2847 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2848 delegate_ = nullptr;
2849 mgr_ = nullptr;
2850
2851 /**
2852 * @tc.steps:step3. wait for 2 seconds to check the process call count.
2853 * @tc.expected: step3. ok.
2854 */
2855 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2856 EXPECT_EQ(callCount, 0L);
2857 }
2858
2859 /*
2860 * @tc.name: ConsistentFlagTest001
2861 * @tc.desc: Test the consistency flag of no asset table
2862 * @tc.type: FUNC
2863 * @tc.require:
2864 * @tc.author: bty
2865 */
2866 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2867 {
2868 /**
2869 * @tc.steps:step1. init data and sync
2870 * @tc.expected: step1. ok.
2871 */
2872 const int localCount = 20; // 20 is count of local
2873 const int cloudCount = 10; // 10 is count of cloud
2874 InsertUserTableRecord(tableName_, localCount);
2875 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2876 Query query = Query::Select().FromTable({ tableName_ });
2877 BlockSync(query, delegate_, g_actualDBStatus);
2878
2879 /**
2880 * @tc.steps:step2. check the 0x20 bit of flag after sync
2881 * @tc.expected: step2. ok.
2882 */
2883 std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2884 " where flag&0x20=0;";
2885 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2886 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2887
2888 /**
2889 * @tc.steps:step3. delete local data and check
2890 * @tc.expected: step3. ok.
2891 */
2892 std::string sql = "delete from " + tableName_ + " where id = '1';";
2893 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2894 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2895 reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2896
2897 /**
2898 * @tc.steps:step4. check the 0x20 bit of flag after sync
2899 * @tc.expected: step4. ok.
2900 */
2901 BlockSync(query, delegate_, g_actualDBStatus);
2902 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2903 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2904 }
2905
SyncDataStatusTest(bool isCompensatedSyncOnly)2906 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2907 {
2908 /**
2909 * @tc.steps:step1. init data and sync
2910 * @tc.expected: step1. ok.
2911 */
2912 const int localCount = 20; // 20 is count of local
2913 const int cloudCount = 10; // 10 is count of cloud
2914 InsertUserTableRecord(tableName_, localCount);
2915 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2916 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2917 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2918 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2919 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2920 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2921 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2922 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2923 Query query = Query::Select().FromTable({tableName_});
2924
2925 /**
2926 * @tc.steps:step2. check count
2927 * @tc.expected: step2. ok.
2928 */
2929 int64_t syncCount = 2;
2930 BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2931 if (!isCompensatedSyncOnly) {
2932 std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2933 }
2934 std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2935 std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2936 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2937 if (isCompensatedSyncOnly) {
2938 querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2939 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2940 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
2941 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2942 querySql = preSql + " where status=0 and cloud_gid ='';";
2943 int unSyncCount = 14; // 14 is the num of unSync data with status 0
2944 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2945 } else {
2946 // gid 12、13 are upload insert, lock to lock_change
2947 querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
2948 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2949 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
2950 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2951 querySql = preSql + " where status=0 and cloud_gid !='';";
2952 int unSyncCount = 16; // 16 is the num of sync finish
2953 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2954 }
2955 }
2956
2957 /*
2958 * @tc.name: SyncDataStatusTest001
2959 * @tc.desc: Test the status after compensated sync the no asset table
2960 * @tc.type: FUNC
2961 * @tc.require:
2962 * @tc.author: bty
2963 */
2964 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
2965 {
2966 SyncDataStatusTest(true);
2967 }
2968
2969 /*
2970 * @tc.name: SyncDataStatusTest002
2971 * @tc.desc: Test the status after normal sync the no asset table
2972 * @tc.type: FUNC
2973 * @tc.require:
2974 * @tc.author: bty
2975 */
2976 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
2977 {
2978 SyncDataStatusTest(false);
2979 }
2980 }
2981 #endif
2982