1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_data_generator.h"
17
18 #include "distributeddb_tools_unit_test.h"
19 #include "log_print.h"
20 #include "res_finalizer.h"
21 namespace DistributedDBUnitTest {
22 using namespace DistributedDB;
InitDatabase(const DataBaseSchema & schema,sqlite3 & db)23 int RDBDataGenerator::InitDatabase(const DataBaseSchema &schema, sqlite3 &db)
24 {
25 int errCode = RelationalTestUtils::ExecSql(&db, "PRAGMA journal_mode=WAL;");
26 if (errCode != SQLITE_OK) {
27 return errCode;
28 }
29 for (const auto &table : schema.tables) {
30 errCode = InitTable(table, false, db);
31 if (errCode != SQLITE_OK) {
32 break;
33 }
34 }
35 return errCode;
36 }
37
InitTable(const DistributedDB::TableSchema & table,bool notNullWithStr,sqlite3 & db)38 int RDBDataGenerator::InitTable(const DistributedDB::TableSchema &table, bool notNullWithStr, sqlite3 &db)
39 {
40 return InitTable(table, notNullWithStr, false, db);
41 }
42
InitTable(const TableSchema & table,bool notNullWithStr,bool isAutoIncrement,sqlite3 & db)43 int RDBDataGenerator::InitTable(const TableSchema &table, bool notNullWithStr, bool isAutoIncrement, sqlite3 &db)
44 {
45 std::string sql = "CREATE TABLE IF NOT EXISTS " + table.name + "(";
46 for (const auto &field : table.fields) {
47 sql += "'" + field.colName + "' " + GetTypeText(field.type);
48 if (field.primary) {
49 sql += " PRIMARY KEY";
50 if (isAutoIncrement) {
51 sql += " AUTOINCREMENT";
52 }
53 }
54 if (notNullWithStr && field.type == TYPE_INDEX<std::string>) {
55 sql += " NOT NULL ON CONFLICT IGNORE";
56 }
57 sql += ",";
58 }
59 sql.pop_back();
60 sql += ");";
61 int errCode = RelationalTestUtils::ExecSql(&db, sql);
62 if (errCode != SQLITE_OK) {
63 LOGE("execute sql failed %d, sql is %s", errCode, sql.c_str());
64 }
65 return errCode;
66 }
67
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const DistributedDB::DataBaseSchema & schema,const std::shared_ptr<DistributedDB::VirtualCloudDb> & virtualCloudDb)68 DistributedDB::DBStatus RDBDataGenerator::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
69 const DistributedDB::DataBaseSchema &schema,
70 const std::shared_ptr<DistributedDB::VirtualCloudDb> &virtualCloudDb)
71 {
72 for (const auto &table : schema.tables) {
73 auto [record, extend] = GenerateDataRecords(begin, count, gidStart, table.fields);
74 DBStatus res = virtualCloudDb->BatchInsertWithGid(table.name, std::move(record), extend);
75 if (res != DBStatus::OK) {
76 return res;
77 }
78 }
79 return DBStatus::OK;
80 }
81
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,const std::vector<Field> & fields)82 std::pair<std::vector<VBucket>, std::vector<VBucket>> RDBDataGenerator::GenerateDataRecords(int64_t begin,
83 int64_t count, int64_t gidStart, const std::vector<Field> &fields)
84 {
85 std::vector<VBucket> record;
86 std::vector<VBucket> extend;
87 Timestamp now = TimeHelper::GetSysCurrentTime();
88 auto time = static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND);
89 for (int64_t i = begin; i < begin + count; i++) {
90 VBucket data;
91 for (const auto &field : fields) {
92 FillColValueByType(i, field, data);
93 }
94 record.push_back(data);
95
96 VBucket log;
97 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, time);
98 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, time);
99 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
100 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
101 extend.push_back(log);
102 time++;
103 }
104 return {record, extend};
105 }
106
FillColValueByType(int64_t index,const Field & field,VBucket & vBucket)107 void RDBDataGenerator::FillColValueByType(int64_t index, const Field &field, VBucket &vBucket)
108 {
109 vBucket[field.colName] = GetColValueByType(index, field);
110 }
111
GetColValueByType(int64_t index,const DistributedDB::Field & field)112 DistributedDB::Type RDBDataGenerator::GetColValueByType(int64_t index, const DistributedDB::Field &field)
113 {
114 Type value = Nil();
115 switch (field.type) {
116 case DistributedDB::TYPE_INDEX<int64_t>:
117 value = index;
118 break;
119 case DistributedDB::TYPE_INDEX<std::string>:
120 value = field.colName + "_" + std::to_string(index);
121 break;
122 case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
123 value = GenerateAssets(index, field);
124 break;
125 case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
126 value = GenerateAsset(index, field);
127 break;
128 case DistributedDB::TYPE_INDEX<double>:
129 value = static_cast<double>(index);
130 break;
131 case DistributedDB::TYPE_INDEX<bool>:
132 value = static_cast<bool>(index);
133 break;
134 case DistributedDB::TYPE_INDEX<Bytes>:
135 value = GenerateBytes(index);
136 break;
137 }
138 return value;
139 }
140
GenerateAsset(int64_t index,const DistributedDB::Field & field)141 Asset RDBDataGenerator::GenerateAsset(int64_t index, const DistributedDB::Field &field)
142 {
143 Asset asset;
144 asset.name = field.colName + "_" + std::to_string(index);
145 asset.hash = "default_hash";
146 return asset;
147 }
148
GenerateAssets(int64_t index,const Field & field)149 Assets RDBDataGenerator::GenerateAssets(int64_t index, const Field &field)
150 {
151 Assets assets;
152 assets.push_back(GenerateAsset(index, field));
153 return assets;
154 }
155
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::DataBaseSchema & schema)156 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
157 const DistributedDB::DataBaseSchema &schema)
158 {
159 for (const auto &table : schema.tables) {
160 int errCode = InsertLocalDBData(begin, count, db, table);
161 if (errCode != E_OK) {
162 return errCode;
163 }
164 }
165 return E_OK;
166 }
167
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)168 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
169 const DistributedDB::TableSchema &schema)
170 {
171 if (schema.fields.empty()) {
172 return -E_INTERNAL_ERROR;
173 }
174 std::string sql = "INSERT OR REPLACE INTO " + schema.name + " VALUES(";
175 for (size_t i = 0; i < schema.fields.size(); ++i) {
176 sql += "?,";
177 }
178 sql.pop_back();
179 sql += ");";
180 for (int64_t i = begin; i < begin + count; i++) {
181 sqlite3_stmt *stmt = nullptr;
182 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
183 if (errCode != E_OK) {
184 return errCode;
185 }
186 ResFinalizer resFinalizer([stmt]() {
187 sqlite3_stmt *sqlite3Stmt = stmt;
188 int ret = E_OK;
189 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
190 });
191 errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
192 if (errCode != E_OK) {
193 return errCode;
194 }
195 errCode = SQLiteUtils::StepNext(stmt);
196 if (errCode != -E_FINISHED) {
197 return errCode;
198 }
199 }
200 return E_OK;
201 }
202
UpsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)203 int RDBDataGenerator::UpsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
204 const DistributedDB::TableSchema &schema)
205 {
206 if (schema.fields.empty()) {
207 return -E_INTERNAL_ERROR;
208 }
209 std::string sql = GetUpsertSQL(schema);
210 for (int64_t i = begin; i < begin + count; i++) {
211 sqlite3_stmt *stmt = nullptr;
212 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
213 if (errCode != E_OK) {
214 return errCode;
215 }
216 ResFinalizer resFinalizer([stmt]() {
217 sqlite3_stmt *sqlite3Stmt = stmt;
218 int ret = E_OK;
219 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
220 });
221 errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
222 if (errCode != E_OK) {
223 return errCode;
224 }
225 errCode = BindOneRowStmt(i, stmt, static_cast<int>(schema.fields.size()), true, schema.fields);
226 if (errCode != E_OK) {
227 return errCode;
228 }
229 errCode = SQLiteUtils::StepNext(stmt);
230 if (errCode != -E_FINISHED) {
231 return errCode;
232 }
233 }
234 return E_OK;
235 }
236
UpdateLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const TableSchema & schema)237 int RDBDataGenerator::UpdateLocalDBData(int64_t begin, int64_t count, sqlite3 *db, const TableSchema &schema)
238 {
239 if (schema.fields.empty()) {
240 return -E_INTERNAL_ERROR;
241 }
242 std::string sql = GetUpdateSQL(schema);
243 for (int64_t i = begin; i < begin + count; i++) {
244 sqlite3_stmt *stmt = nullptr;
245 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
246 if (errCode != E_OK) {
247 return errCode;
248 }
249 ResFinalizer resFinalizer([stmt]() {
250 sqlite3_stmt *sqlite3Stmt = stmt;
251 int ret = E_OK;
252 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
253 });
254 errCode = BindOneRowUpdateStmt(i, stmt, 0, schema.fields);
255 if (errCode != E_OK) {
256 return errCode;
257 }
258 errCode = SQLiteUtils::StepNext(stmt);
259 if (errCode != -E_FINISHED) {
260 return errCode;
261 }
262 }
263 return E_OK;
264 }
265
BindOneRowStmt(int64_t index,sqlite3_stmt * stmt,int cid,bool withoutPk,const std::vector<DistributedDB::Field> & fields)266 int RDBDataGenerator::BindOneRowStmt(int64_t index, sqlite3_stmt *stmt, int cid, bool withoutPk,
267 const std::vector<DistributedDB::Field> &fields)
268 {
269 for (const auto &field : fields) {
270 if (withoutPk && field.primary) {
271 continue;
272 }
273 cid++;
274 auto type = GetColValueByType(index, field);
275 int errCode = BindOneColStmt(cid, stmt, type);
276 if (errCode != E_OK) {
277 return errCode;
278 }
279 }
280 return E_OK;
281 }
282
BindOneRowUpdateStmt(int64_t index,sqlite3_stmt * stmt,int cid,const std::vector<DistributedDB::Field> & fields)283 int RDBDataGenerator::BindOneRowUpdateStmt(int64_t index, sqlite3_stmt *stmt, int cid,
284 const std::vector<DistributedDB::Field> &fields)
285 {
286 std::vector<Field> pkFields;
287 for (const auto &field : fields) {
288 if (field.primary) {
289 pkFields.push_back(field);
290 continue;
291 }
292 cid++;
293 auto type = GetColValueByType(index, field);
294 int errCode = BindOneColStmt(cid, stmt, type);
295 if (errCode != E_OK) {
296 return errCode;
297 }
298 }
299 return BindOneRowStmt(index, stmt, cid, false, pkFields);
300 }
301
BindOneColStmt(int cid,sqlite3_stmt * stmt,const DistributedDB::Type & type)302 int RDBDataGenerator::BindOneColStmt(int cid, sqlite3_stmt *stmt, const DistributedDB::Type &type)
303 {
304 switch (type.index()) {
305 case DistributedDB::TYPE_INDEX<int64_t>:
306 return SQLiteUtils::BindInt64ToStatement(stmt, cid, std::get<int64_t>(type));
307 case DistributedDB::TYPE_INDEX<std::string>:
308 return SQLiteUtils::BindTextToStatement(stmt, cid, std::get<std::string>(type));
309 case DistributedDB::TYPE_INDEX<DistributedDB::Assets>: {
310 auto assets = std::get<Assets>(type);
311 std::vector<uint8_t> blob;
312 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blob);
313 if (errCode != E_OK) {
314 return errCode;
315 }
316 return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
317 }
318 case DistributedDB::TYPE_INDEX<DistributedDB::Asset>: {
319 auto assets = std::get<Asset>(type);
320 std::vector<uint8_t> blob;
321 int errCode = RuntimeContext::GetInstance()->AssetToBlob(assets, blob);
322 if (errCode != E_OK) {
323 return errCode;
324 }
325 return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
326 }
327 }
328 return E_OK;
329 }
330
GetUpsertSQL(const DistributedDB::TableSchema & schema)331 std::string RDBDataGenerator::GetUpsertSQL(const DistributedDB::TableSchema &schema)
332 {
333 std::string sql = "INSERT INTO " + schema.name + "(";
334 for (const auto &field : schema.fields) {
335 sql += field.colName + ",";
336 }
337 sql.pop_back();
338 sql += ") VALUES(";
339 std::string pkFields;
340 std::string noPkFields;
341 for (const auto &field : schema.fields) {
342 sql += "?,";
343 if (field.primary) {
344 pkFields += field.colName + ",";
345 } else {
346 noPkFields += field.colName + "=?,";
347 }
348 }
349 pkFields.pop_back();
350 noPkFields.pop_back();
351 sql.pop_back();
352 sql += ") ON CONFLICT(" + pkFields + ") DO UPDATE SET " + noPkFields;
353 LOGI("upsert sql is %s", sql.c_str());
354 return sql;
355 }
356
GetUpdateSQL(const TableSchema & schema)357 std::string RDBDataGenerator::GetUpdateSQL(const TableSchema &schema)
358 {
359 std::string pkFields;
360 std::string noPkFields;
361 for (const auto &field : schema.fields) {
362 if (field.primary) {
363 pkFields += "'" + field.colName + "'=?,";
364 } else {
365 noPkFields += "'" + field.colName + "'=?,";
366 }
367 }
368 pkFields.pop_back();
369 noPkFields.pop_back();
370 std::string sql = "UPDATE " + schema.name + " SET ";
371 sql += noPkFields + " WHERE " + pkFields;
372 LOGI("upsert sql is %s", sql.c_str());
373 return sql;
374 }
375
InsertVirtualLocalDBData(int64_t begin,int64_t count,DistributedDB::RelationalVirtualDevice * device,const DistributedDB::TableSchema & schema,const bool isDelete)376 int RDBDataGenerator::InsertVirtualLocalDBData(int64_t begin, int64_t count,
377 DistributedDB::RelationalVirtualDevice *device, const DistributedDB::TableSchema &schema, const bool isDelete)
378 {
379 if (device == nullptr) {
380 return -E_INVALID_ARGS;
381 }
382 std::vector<VirtualRowData> rows;
383 for (int64_t index = begin; index < count; ++index) {
384 VirtualRowData virtualRowData;
385 for (const auto &field : schema.fields) {
386 auto type = GetColValueByType(index, field);
387 FillTypeIntoDataValue(field, type, virtualRowData);
388 }
389 virtualRowData.logInfo.timestamp = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
390 if (isDelete) {
391 virtualRowData.logInfo.flag = virtualRowData.logInfo.flag | DataItem::DELETE_FLAG;
392 }
393 rows.push_back(virtualRowData);
394 }
395 return device->PutData(schema.name, rows);
396 }
397
ParseSchema(const DistributedDB::DataBaseSchema & schema,bool syncOnlyPk)398 DistributedDB::DistributedSchema RDBDataGenerator::ParseSchema(const DistributedDB::DataBaseSchema &schema,
399 bool syncOnlyPk)
400 {
401 DistributedDB::DistributedSchema res;
402 for (const auto &item : schema.tables) {
403 DistributedTable table;
404 for (const auto &field : item.fields) {
405 DistributedField distributedField;
406 distributedField.isP2pSync = syncOnlyPk ? field.primary : true;
407 distributedField.colName = field.colName;
408 distributedField.isSpecified = field.primary;
409 table.fields.push_back(distributedField);
410 }
411 table.tableName = item.name;
412 res.tables.push_back(table);
413 }
414 return res;
415 }
416
FillTypeIntoDataValue(const DistributedDB::Field & field,const DistributedDB::Type & type,DistributedDB::VirtualRowData & virtualRow)417 void RDBDataGenerator::FillTypeIntoDataValue(const DistributedDB::Field &field, const DistributedDB::Type &type,
418 DistributedDB::VirtualRowData &virtualRow)
419 {
420 DataValue dataValue;
421 std::string hash;
422 switch (type.index()) {
423 case DistributedDB::TYPE_INDEX<int64_t>:
424 if (field.primary) {
425 hash = std::to_string(std::get<int64_t>(type));
426 }
427 dataValue = std::get<int64_t>(type);
428 break;
429 case DistributedDB::TYPE_INDEX<std::string>:
430 if (field.primary) {
431 hash = std::get<std::string>(type);
432 }
433 dataValue = std::get<std::string>(type);
434 break;
435 default:
436 return;
437 }
438 if (field.primary) {
439 std::vector<uint8_t> blob;
440 DBCommon::StringToVector(hash, blob);
441 DBCommon::CalcValueHash(blob, virtualRow.logInfo.hashKey);
442 }
443 virtualRow.objectData.PutDataValue(field.colName, dataValue);
444 }
445
PrepareVirtualDeviceEnv(const std::string & tableName,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)446 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &tableName, sqlite3 *db,
447 DistributedDB::RelationalVirtualDevice *device)
448 {
449 return PrepareVirtualDeviceEnv(tableName, tableName, db, device);
450 }
451
PrepareVirtualDeviceEnv(const std::string & scanTable,const std::string & expectTable,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)452 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &scanTable, const std::string &expectTable, sqlite3 *db,
453 DistributedDB::RelationalVirtualDevice *device)
454 {
455 TableInfo tableInfo;
456 int errCode = SQLiteUtils::AnalysisSchema(db, scanTable, tableInfo);
457 if (errCode != E_OK) {
458 return errCode;
459 }
460 tableInfo.SetTableName(expectTable);
461 device->SetLocalFieldInfo(tableInfo.GetFieldInfos());
462 device->SetTableInfo(tableInfo);
463 return E_OK;
464 }
465
FlipTableSchema(const DistributedDB::TableSchema & origin)466 DistributedDB::TableSchema RDBDataGenerator::FlipTableSchema(const DistributedDB::TableSchema &origin)
467 {
468 DistributedDB::TableSchema res;
469 res.name = origin.name;
470 for (const auto &item : origin.fields) {
471 res.fields.insert(res.fields.begin(), item);
472 }
473 return res;
474 }
475
GenerateBytes(int64_t index)476 Bytes RDBDataGenerator::GenerateBytes(int64_t index)
477 {
478 std::string str = std::to_string(index);
479 return std::vector<uint8_t>(str.begin(), str.end());
480 }
481 }