1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_data_generator.h"
17
18 #include "distributeddb_tools_unit_test.h"
19 #include "log_print.h"
20 #include "res_finalizer.h"
21 namespace DistributedDBUnitTest {
22 using namespace DistributedDB;
InitDatabase(const DataBaseSchema & schema,sqlite3 & db)23 int RDBDataGenerator::InitDatabase(const DataBaseSchema &schema, sqlite3 &db)
24 {
25 int errCode = RelationalTestUtils::ExecSql(&db, "PRAGMA journal_mode=WAL;");
26 if (errCode != SQLITE_OK) {
27 return errCode;
28 }
29 for (const auto &table : schema.tables) {
30 errCode = InitTable(table, false, db);
31 if (errCode != SQLITE_OK) {
32 break;
33 }
34 }
35 return errCode;
36 }
37
InitTable(const DistributedDB::TableSchema & table,bool notNullWithStr,sqlite3 & db)38 int RDBDataGenerator::InitTable(const DistributedDB::TableSchema &table, bool notNullWithStr, sqlite3 &db)
39 {
40 return InitTable(table, notNullWithStr, false, db);
41 }
42
InitTable(const TableSchema & table,bool notNullWithStr,bool isAutoIncrement,sqlite3 & db)43 int RDBDataGenerator::InitTable(const TableSchema &table, bool notNullWithStr, bool isAutoIncrement, sqlite3 &db)
44 {
45 std::string sql = "CREATE TABLE IF NOT EXISTS " + table.name + "(";
46 for (const auto &field : table.fields) {
47 sql += "'" + field.colName + "' " + GetTypeText(field.type);
48 if (field.primary) {
49 sql += " PRIMARY KEY";
50 if (isAutoIncrement) {
51 sql += " AUTOINCREMENT";
52 }
53 }
54 if (notNullWithStr && field.type == TYPE_INDEX<std::string>) {
55 sql += " NOT NULL ON CONFLICT IGNORE";
56 }
57 sql += ",";
58 }
59 sql.pop_back();
60 sql += ");";
61 int errCode = RelationalTestUtils::ExecSql(&db, sql);
62 if (errCode != SQLITE_OK) {
63 LOGE("execute sql failed %d, sql is %s", errCode, sql.c_str());
64 }
65 return errCode;
66 }
67
GetTypeText(int type)68 std::string RDBDataGenerator::GetTypeText(int type)
69 {
70 switch (type) {
71 case DistributedDB::TYPE_INDEX<int64_t>:
72 return "INTEGER";
73 case DistributedDB::TYPE_INDEX<std::string>:
74 return "TEXT";
75 case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
76 return "ASSETS";
77 case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
78 return "ASSET";
79 default:
80 return "";
81 }
82 }
83
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const DistributedDB::DataBaseSchema & schema,const std::shared_ptr<DistributedDB::VirtualCloudDb> & virtualCloudDb)84 DistributedDB::DBStatus RDBDataGenerator::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
85 const DistributedDB::DataBaseSchema &schema,
86 const std::shared_ptr<DistributedDB::VirtualCloudDb> &virtualCloudDb)
87 {
88 for (const auto &table : schema.tables) {
89 auto [record, extend] = GenerateDataRecords(begin, count, gidStart, table.fields);
90 DBStatus res = virtualCloudDb->BatchInsertWithGid(table.name, std::move(record), extend);
91 if (res != DBStatus::OK) {
92 return res;
93 }
94 }
95 return DBStatus::OK;
96 }
97
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,const std::vector<Field> & fields)98 std::pair<std::vector<VBucket>, std::vector<VBucket>> RDBDataGenerator::GenerateDataRecords(int64_t begin,
99 int64_t count, int64_t gidStart, const std::vector<Field> &fields)
100 {
101 std::vector<VBucket> record;
102 std::vector<VBucket> extend;
103 Timestamp now = TimeHelper::GetSysCurrentTime();
104 auto time = static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND);
105 for (int64_t i = begin; i < begin + count; i++) {
106 VBucket data;
107 for (const auto &field : fields) {
108 FillColValueByType(i, field, data);
109 }
110 record.push_back(data);
111
112 VBucket log;
113 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, time);
114 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, time);
115 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
116 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
117 extend.push_back(log);
118 time++;
119 }
120 return {record, extend};
121 }
122
FillColValueByType(int64_t index,const Field & field,VBucket & vBucket)123 void RDBDataGenerator::FillColValueByType(int64_t index, const Field &field, VBucket &vBucket)
124 {
125 vBucket[field.colName] = GetColValueByType(index, field);
126 }
127
GetColValueByType(int64_t index,const DistributedDB::Field & field)128 DistributedDB::Type RDBDataGenerator::GetColValueByType(int64_t index, const DistributedDB::Field &field)
129 {
130 Type value = Nil();
131 switch (field.type) {
132 case DistributedDB::TYPE_INDEX<int64_t>:
133 value = index;
134 break;
135 case DistributedDB::TYPE_INDEX<std::string>:
136 value = field.colName + "_" + std::to_string(index);
137 break;
138 case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
139 value = GenerateAssets(index, field);
140 break;
141 case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
142 value = GenerateAsset(index, field);
143 break;
144 }
145 return value;
146 }
147
GenerateAsset(int64_t index,const DistributedDB::Field & field)148 Asset RDBDataGenerator::GenerateAsset(int64_t index, const DistributedDB::Field &field)
149 {
150 Asset asset;
151 asset.name = field.colName + "_" + std::to_string(index);
152 asset.hash = "default_hash";
153 return asset;
154 }
155
GenerateAssets(int64_t index,const Field & field)156 Assets RDBDataGenerator::GenerateAssets(int64_t index, const Field &field)
157 {
158 Assets assets;
159 assets.push_back(GenerateAsset(index, field));
160 return assets;
161 }
162
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::DataBaseSchema & schema)163 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
164 const DistributedDB::DataBaseSchema &schema)
165 {
166 for (const auto &table : schema.tables) {
167 int errCode = InsertLocalDBData(begin, count, db, table);
168 if (errCode != E_OK) {
169 return errCode;
170 }
171 }
172 return E_OK;
173 }
174
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)175 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
176 const DistributedDB::TableSchema &schema)
177 {
178 if (schema.fields.empty()) {
179 return -E_INTERNAL_ERROR;
180 }
181 std::string sql = "INSERT OR REPLACE INTO " + schema.name + " VALUES(";
182 for (size_t i = 0; i < schema.fields.size(); ++i) {
183 sql += "?,";
184 }
185 sql.pop_back();
186 sql += ");";
187 for (int64_t i = begin; i < begin + count; i++) {
188 sqlite3_stmt *stmt = nullptr;
189 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 ResFinalizer resFinalizer([stmt]() {
194 sqlite3_stmt *sqlite3Stmt = stmt;
195 int ret = E_OK;
196 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
197 });
198 errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
199 if (errCode != E_OK) {
200 return errCode;
201 }
202 errCode = SQLiteUtils::StepNext(stmt);
203 if (errCode != -E_FINISHED) {
204 return errCode;
205 }
206 }
207 return E_OK;
208 }
209
UpsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)210 int RDBDataGenerator::UpsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
211 const DistributedDB::TableSchema &schema)
212 {
213 if (schema.fields.empty()) {
214 return -E_INTERNAL_ERROR;
215 }
216 std::string sql = GetUpsertSQL(schema);
217 for (int64_t i = begin; i < begin + count; i++) {
218 sqlite3_stmt *stmt = nullptr;
219 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
220 if (errCode != E_OK) {
221 return errCode;
222 }
223 ResFinalizer resFinalizer([stmt]() {
224 sqlite3_stmt *sqlite3Stmt = stmt;
225 int ret = E_OK;
226 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
227 });
228 errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
229 if (errCode != E_OK) {
230 return errCode;
231 }
232 errCode = BindOneRowStmt(i, stmt, static_cast<int>(schema.fields.size()), true, schema.fields);
233 if (errCode != E_OK) {
234 return errCode;
235 }
236 errCode = SQLiteUtils::StepNext(stmt);
237 if (errCode != -E_FINISHED) {
238 return errCode;
239 }
240 }
241 return E_OK;
242 }
243
UpdateLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const TableSchema & schema)244 int RDBDataGenerator::UpdateLocalDBData(int64_t begin, int64_t count, sqlite3 *db, const TableSchema &schema)
245 {
246 if (schema.fields.empty()) {
247 return -E_INTERNAL_ERROR;
248 }
249 std::string sql = GetUpdateSQL(schema);
250 for (int64_t i = begin; i < begin + count; i++) {
251 sqlite3_stmt *stmt = nullptr;
252 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
253 if (errCode != E_OK) {
254 return errCode;
255 }
256 ResFinalizer resFinalizer([stmt]() {
257 sqlite3_stmt *sqlite3Stmt = stmt;
258 int ret = E_OK;
259 SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
260 });
261 errCode = BindOneRowUpdateStmt(i, stmt, 0, schema.fields);
262 if (errCode != E_OK) {
263 return errCode;
264 }
265 errCode = SQLiteUtils::StepNext(stmt);
266 if (errCode != -E_FINISHED) {
267 return errCode;
268 }
269 }
270 return E_OK;
271 }
272
BindOneRowStmt(int64_t index,sqlite3_stmt * stmt,int cid,bool withoutPk,const std::vector<DistributedDB::Field> & fields)273 int RDBDataGenerator::BindOneRowStmt(int64_t index, sqlite3_stmt *stmt, int cid, bool withoutPk,
274 const std::vector<DistributedDB::Field> &fields)
275 {
276 for (const auto &field : fields) {
277 if (withoutPk && field.primary) {
278 continue;
279 }
280 cid++;
281 auto type = GetColValueByType(index, field);
282 int errCode = BindOneColStmt(cid, stmt, type);
283 if (errCode != E_OK) {
284 return errCode;
285 }
286 }
287 return E_OK;
288 }
289
BindOneRowUpdateStmt(int64_t index,sqlite3_stmt * stmt,int cid,const std::vector<DistributedDB::Field> & fields)290 int RDBDataGenerator::BindOneRowUpdateStmt(int64_t index, sqlite3_stmt *stmt, int cid,
291 const std::vector<DistributedDB::Field> &fields)
292 {
293 std::vector<Field> pkFields;
294 for (const auto &field : fields) {
295 if (field.primary) {
296 pkFields.push_back(field);
297 continue;
298 }
299 cid++;
300 auto type = GetColValueByType(index, field);
301 int errCode = BindOneColStmt(cid, stmt, type);
302 if (errCode != E_OK) {
303 return errCode;
304 }
305 }
306 return BindOneRowStmt(index, stmt, cid, false, pkFields);
307 }
308
BindOneColStmt(int cid,sqlite3_stmt * stmt,const DistributedDB::Type & type)309 int RDBDataGenerator::BindOneColStmt(int cid, sqlite3_stmt *stmt, const DistributedDB::Type &type)
310 {
311 switch (type.index()) {
312 case DistributedDB::TYPE_INDEX<int64_t>:
313 return SQLiteUtils::BindInt64ToStatement(stmt, cid, std::get<int64_t>(type));
314 case DistributedDB::TYPE_INDEX<std::string>:
315 return SQLiteUtils::BindTextToStatement(stmt, cid, std::get<std::string>(type));
316 case DistributedDB::TYPE_INDEX<DistributedDB::Assets>: {
317 auto assets = std::get<Assets>(type);
318 std::vector<uint8_t> blob;
319 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blob);
320 if (errCode != E_OK) {
321 return errCode;
322 }
323 return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
324 }
325 case DistributedDB::TYPE_INDEX<DistributedDB::Asset>: {
326 auto assets = std::get<Asset>(type);
327 std::vector<uint8_t> blob;
328 int errCode = RuntimeContext::GetInstance()->AssetToBlob(assets, blob);
329 if (errCode != E_OK) {
330 return errCode;
331 }
332 return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
333 }
334 }
335 return E_OK;
336 }
337
GetUpsertSQL(const DistributedDB::TableSchema & schema)338 std::string RDBDataGenerator::GetUpsertSQL(const DistributedDB::TableSchema &schema)
339 {
340 std::string sql = "INSERT INTO " + schema.name + "(";
341 for (const auto &field : schema.fields) {
342 sql += field.colName + ",";
343 }
344 sql.pop_back();
345 sql += ") VALUES(";
346 std::string pkFields;
347 std::string noPkFields;
348 for (const auto &field : schema.fields) {
349 sql += "?,";
350 if (field.primary) {
351 pkFields += field.colName + ",";
352 } else {
353 noPkFields += field.colName + "=?,";
354 }
355 }
356 pkFields.pop_back();
357 noPkFields.pop_back();
358 sql.pop_back();
359 sql += ") ON CONFLICT(" + pkFields + ") DO UPDATE SET " + noPkFields;
360 LOGI("upsert sql is %s", sql.c_str());
361 return sql;
362 }
363
GetUpdateSQL(const TableSchema & schema)364 std::string RDBDataGenerator::GetUpdateSQL(const TableSchema &schema)
365 {
366 std::string pkFields;
367 std::string noPkFields;
368 for (const auto &field : schema.fields) {
369 if (field.primary) {
370 pkFields += "'" + field.colName + "'=?,";
371 } else {
372 noPkFields += "'" + field.colName + "'=?,";
373 }
374 }
375 pkFields.pop_back();
376 noPkFields.pop_back();
377 std::string sql = "UPDATE " + schema.name + " SET ";
378 sql += noPkFields + " WHERE " + pkFields;
379 LOGI("upsert sql is %s", sql.c_str());
380 return sql;
381 }
382
InsertVirtualLocalDBData(int64_t begin,int64_t count,DistributedDB::RelationalVirtualDevice * device,const DistributedDB::TableSchema & schema)383 int RDBDataGenerator::InsertVirtualLocalDBData(int64_t begin, int64_t count,
384 DistributedDB::RelationalVirtualDevice *device, const DistributedDB::TableSchema &schema)
385 {
386 if (device == nullptr) {
387 return -E_INVALID_ARGS;
388 }
389 std::vector<VirtualRowData> rows;
390 for (int64_t index = begin; index < count; ++index) {
391 VirtualRowData virtualRowData;
392 for (const auto &field : schema.fields) {
393 auto type = GetColValueByType(index, field);
394 FillTypeIntoDataValue(field, type, virtualRowData);
395 }
396 virtualRowData.logInfo.timestamp = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
397 rows.push_back(virtualRowData);
398 }
399 return device->PutData(schema.name, rows);
400 }
401
ParseSchema(const DistributedDB::DataBaseSchema & schema,bool syncOnlyPk)402 DistributedDB::DistributedSchema RDBDataGenerator::ParseSchema(const DistributedDB::DataBaseSchema &schema,
403 bool syncOnlyPk)
404 {
405 DistributedDB::DistributedSchema res;
406 for (const auto &item : schema.tables) {
407 DistributedTable table;
408 for (const auto &field : item.fields) {
409 DistributedField distributedField;
410 distributedField.isP2pSync = syncOnlyPk ? field.primary : true;
411 distributedField.colName = field.colName;
412 distributedField.isSpecified = field.primary;
413 table.fields.push_back(distributedField);
414 }
415 table.tableName = item.name;
416 res.tables.push_back(table);
417 }
418 return res;
419 }
420
FillTypeIntoDataValue(const DistributedDB::Field & field,const DistributedDB::Type & type,DistributedDB::VirtualRowData & virtualRow)421 void RDBDataGenerator::FillTypeIntoDataValue(const DistributedDB::Field &field, const DistributedDB::Type &type,
422 DistributedDB::VirtualRowData &virtualRow)
423 {
424 DataValue dataValue;
425 std::string hash;
426 switch (type.index()) {
427 case DistributedDB::TYPE_INDEX<int64_t>:
428 if (field.primary) {
429 hash = std::to_string(std::get<int64_t>(type));
430 }
431 dataValue = std::get<int64_t>(type);
432 break;
433 case DistributedDB::TYPE_INDEX<std::string>:
434 if (field.primary) {
435 hash = std::get<std::string>(type);
436 }
437 dataValue = std::get<std::string>(type);
438 break;
439 default:
440 return;
441 }
442 if (field.primary) {
443 std::vector<uint8_t> blob;
444 DBCommon::StringToVector(hash, blob);
445 DBCommon::CalcValueHash(blob, virtualRow.logInfo.hashKey);
446 }
447 virtualRow.objectData.PutDataValue(field.colName, dataValue);
448 }
449
PrepareVirtualDeviceEnv(const std::string & tableName,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)450 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &tableName, sqlite3 *db,
451 DistributedDB::RelationalVirtualDevice *device)
452 {
453 return PrepareVirtualDeviceEnv(tableName, tableName, db, device);
454 }
455
PrepareVirtualDeviceEnv(const std::string & scanTable,const std::string & expectTable,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)456 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &scanTable, const std::string &expectTable, sqlite3 *db,
457 DistributedDB::RelationalVirtualDevice *device)
458 {
459 TableInfo tableInfo;
460 int errCode = SQLiteUtils::AnalysisSchema(db, scanTable, tableInfo);
461 if (errCode != E_OK) {
462 return errCode;
463 }
464 tableInfo.SetTableName(expectTable);
465 device->SetLocalFieldInfo(tableInfo.GetFieldInfos());
466 device->SetTableInfo(tableInfo);
467 return E_OK;
468 }
469
FlipTableSchema(const DistributedDB::TableSchema & origin)470 DistributedDB::TableSchema RDBDataGenerator::FlipTableSchema(const DistributedDB::TableSchema &origin)
471 {
472 DistributedDB::TableSchema res;
473 res.name = origin.name;
474 for (const auto &item : origin.fields) {
475 res.fields.insert(res.fields.begin(), item);
476 }
477 return res;
478 }
479 }