• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_data_generator.h"
17 
18 #include "distributeddb_tools_unit_test.h"
19 #include "log_print.h"
20 #include "res_finalizer.h"
21 namespace DistributedDBUnitTest {
22 using namespace DistributedDB;
InitDatabase(const DataBaseSchema & schema,sqlite3 & db)23 int RDBDataGenerator::InitDatabase(const DataBaseSchema &schema, sqlite3 &db)
24 {
25     int errCode = RelationalTestUtils::ExecSql(&db, "PRAGMA journal_mode=WAL;");
26     if (errCode != SQLITE_OK) {
27         return errCode;
28     }
29     for (const auto &table : schema.tables) {
30         errCode = InitTable(table, false, db);
31         if (errCode != SQLITE_OK) {
32             break;
33         }
34     }
35     return errCode;
36 }
37 
InitTable(const DistributedDB::TableSchema & table,bool notNullWithStr,sqlite3 & db)38 int RDBDataGenerator::InitTable(const DistributedDB::TableSchema &table, bool notNullWithStr, sqlite3 &db)
39 {
40     return InitTable(table, notNullWithStr, false, db);
41 }
42 
InitTable(const TableSchema & table,bool notNullWithStr,bool isAutoIncrement,sqlite3 & db)43 int RDBDataGenerator::InitTable(const TableSchema &table, bool notNullWithStr, bool isAutoIncrement, sqlite3 &db)
44 {
45     std::string sql = "CREATE TABLE IF NOT EXISTS " + table.name + "(";
46     for (const auto &field : table.fields) {
47         sql += "'" + field.colName + "' " + GetTypeText(field.type);
48         if (field.primary) {
49             sql += " PRIMARY KEY";
50             if (isAutoIncrement) {
51                 sql += " AUTOINCREMENT";
52             }
53         }
54         if (notNullWithStr && field.type == TYPE_INDEX<std::string>) {
55             sql += " NOT NULL ON CONFLICT IGNORE";
56         }
57         sql += ",";
58     }
59     sql.pop_back();
60     sql += ");";
61     int errCode = RelationalTestUtils::ExecSql(&db, sql);
62     if (errCode != SQLITE_OK) {
63         LOGE("execute sql failed %d, sql is %s", errCode, sql.c_str());
64     }
65     return errCode;
66 }
67 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const DistributedDB::DataBaseSchema & schema,const std::shared_ptr<DistributedDB::VirtualCloudDb> & virtualCloudDb)68 DistributedDB::DBStatus RDBDataGenerator::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
69     const DistributedDB::DataBaseSchema &schema,
70     const std::shared_ptr<DistributedDB::VirtualCloudDb> &virtualCloudDb)
71 {
72     for (const auto &table : schema.tables) {
73         auto [record, extend] = GenerateDataRecords(begin, count, gidStart, table.fields);
74         DBStatus res = virtualCloudDb->BatchInsertWithGid(table.name, std::move(record), extend);
75         if (res != DBStatus::OK) {
76             return res;
77         }
78     }
79     return DBStatus::OK;
80 }
81 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,const std::vector<Field> & fields)82 std::pair<std::vector<VBucket>, std::vector<VBucket>> RDBDataGenerator::GenerateDataRecords(int64_t begin,
83     int64_t count, int64_t gidStart, const std::vector<Field> &fields)
84 {
85     std::vector<VBucket> record;
86     std::vector<VBucket> extend;
87     Timestamp now = TimeHelper::GetSysCurrentTime();
88     auto time = static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND);
89     for (int64_t i = begin; i < begin + count; i++) {
90         VBucket data;
91         for (const auto &field : fields) {
92             FillColValueByType(i, field, data);
93         }
94         record.push_back(data);
95 
96         VBucket log;
97         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, time);
98         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, time);
99         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
100         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
101         extend.push_back(log);
102         time++;
103     }
104     return {record, extend};
105 }
106 
FillColValueByType(int64_t index,const Field & field,VBucket & vBucket)107 void RDBDataGenerator::FillColValueByType(int64_t index, const Field &field, VBucket &vBucket)
108 {
109     vBucket[field.colName] = GetColValueByType(index, field);
110 }
111 
GetColValueByType(int64_t index,const DistributedDB::Field & field)112 DistributedDB::Type RDBDataGenerator::GetColValueByType(int64_t index, const DistributedDB::Field &field)
113 {
114     Type value = Nil();
115     switch (field.type) {
116         case DistributedDB::TYPE_INDEX<int64_t>:
117             value = index;
118             break;
119         case DistributedDB::TYPE_INDEX<std::string>:
120             value = field.colName + "_" + std::to_string(index);
121             break;
122         case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
123             value = GenerateAssets(index, field);
124             break;
125         case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
126             value = GenerateAsset(index, field);
127             break;
128         case DistributedDB::TYPE_INDEX<double>:
129             value = static_cast<double>(index);
130             break;
131         case DistributedDB::TYPE_INDEX<bool>:
132             value = static_cast<bool>(index);
133             break;
134         case DistributedDB::TYPE_INDEX<Bytes>:
135             value = GenerateBytes(index);
136             break;
137     }
138     return value;
139 }
140 
GenerateAsset(int64_t index,const DistributedDB::Field & field)141 Asset RDBDataGenerator::GenerateAsset(int64_t index, const DistributedDB::Field &field)
142 {
143     Asset asset;
144     asset.name = field.colName + "_" + std::to_string(index);
145     asset.hash = "default_hash";
146     return asset;
147 }
148 
GenerateAssets(int64_t index,const Field & field)149 Assets RDBDataGenerator::GenerateAssets(int64_t index, const Field &field)
150 {
151     Assets assets;
152     assets.push_back(GenerateAsset(index, field));
153     return assets;
154 }
155 
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::DataBaseSchema & schema)156 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
157     const DistributedDB::DataBaseSchema &schema)
158 {
159     for (const auto &table : schema.tables) {
160         int errCode = InsertLocalDBData(begin, count, db, table);
161         if (errCode != E_OK) {
162             return errCode;
163         }
164     }
165     return E_OK;
166 }
167 
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)168 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
169     const DistributedDB::TableSchema &schema)
170 {
171     if (schema.fields.empty()) {
172         return -E_INTERNAL_ERROR;
173     }
174     std::string sql = "INSERT OR REPLACE INTO " + schema.name + " VALUES(";
175     for (size_t i = 0; i < schema.fields.size(); ++i) {
176         sql += "?,";
177     }
178     sql.pop_back();
179     sql += ");";
180     for (int64_t i = begin; i < begin + count; i++) {
181         sqlite3_stmt *stmt = nullptr;
182         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
183         if (errCode != E_OK) {
184             return errCode;
185         }
186         ResFinalizer resFinalizer([stmt]() {
187             sqlite3_stmt *sqlite3Stmt = stmt;
188             int ret = E_OK;
189             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
190         });
191         errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
192         if (errCode != E_OK) {
193             return errCode;
194         }
195         errCode = SQLiteUtils::StepNext(stmt);
196         if (errCode != -E_FINISHED) {
197             return errCode;
198         }
199     }
200     return E_OK;
201 }
202 
UpsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)203 int RDBDataGenerator::UpsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
204     const DistributedDB::TableSchema &schema)
205 {
206     if (schema.fields.empty()) {
207         return -E_INTERNAL_ERROR;
208     }
209     std::string sql = GetUpsertSQL(schema);
210     for (int64_t i = begin; i < begin + count; i++) {
211         sqlite3_stmt *stmt = nullptr;
212         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
213         if (errCode != E_OK) {
214             return errCode;
215         }
216         ResFinalizer resFinalizer([stmt]() {
217             sqlite3_stmt *sqlite3Stmt = stmt;
218             int ret = E_OK;
219             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
220         });
221         errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
222         if (errCode != E_OK) {
223             return errCode;
224         }
225         errCode = BindOneRowStmt(i, stmt, static_cast<int>(schema.fields.size()), true, schema.fields);
226         if (errCode != E_OK) {
227             return errCode;
228         }
229         errCode = SQLiteUtils::StepNext(stmt);
230         if (errCode != -E_FINISHED) {
231             return errCode;
232         }
233     }
234     return E_OK;
235 }
236 
UpdateLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const TableSchema & schema)237 int RDBDataGenerator::UpdateLocalDBData(int64_t begin, int64_t count, sqlite3 *db, const TableSchema &schema)
238 {
239     if (schema.fields.empty()) {
240         return -E_INTERNAL_ERROR;
241     }
242     std::string sql = GetUpdateSQL(schema);
243     for (int64_t i = begin; i < begin + count; i++) {
244         sqlite3_stmt *stmt = nullptr;
245         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
246         if (errCode != E_OK) {
247             return errCode;
248         }
249         ResFinalizer resFinalizer([stmt]() {
250             sqlite3_stmt *sqlite3Stmt = stmt;
251             int ret = E_OK;
252             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
253         });
254         errCode = BindOneRowUpdateStmt(i, stmt, 0, schema.fields);
255         if (errCode != E_OK) {
256             return errCode;
257         }
258         errCode = SQLiteUtils::StepNext(stmt);
259         if (errCode != -E_FINISHED) {
260             return errCode;
261         }
262     }
263     return E_OK;
264 }
265 
BindOneRowStmt(int64_t index,sqlite3_stmt * stmt,int cid,bool withoutPk,const std::vector<DistributedDB::Field> & fields)266 int RDBDataGenerator::BindOneRowStmt(int64_t index, sqlite3_stmt *stmt, int cid, bool withoutPk,
267     const std::vector<DistributedDB::Field> &fields)
268 {
269     for (const auto &field : fields) {
270         if (withoutPk && field.primary) {
271             continue;
272         }
273         cid++;
274         auto type = GetColValueByType(index, field);
275         int errCode = BindOneColStmt(cid, stmt, type);
276         if (errCode != E_OK) {
277             return errCode;
278         }
279     }
280     return E_OK;
281 }
282 
BindOneRowUpdateStmt(int64_t index,sqlite3_stmt * stmt,int cid,const std::vector<DistributedDB::Field> & fields)283 int RDBDataGenerator::BindOneRowUpdateStmt(int64_t index, sqlite3_stmt *stmt, int cid,
284     const std::vector<DistributedDB::Field> &fields)
285 {
286     std::vector<Field> pkFields;
287     for (const auto &field : fields) {
288         if (field.primary) {
289             pkFields.push_back(field);
290             continue;
291         }
292         cid++;
293         auto type = GetColValueByType(index, field);
294         int errCode = BindOneColStmt(cid, stmt, type);
295         if (errCode != E_OK) {
296             return errCode;
297         }
298     }
299     return BindOneRowStmt(index, stmt, cid, false, pkFields);
300 }
301 
BindOneColStmt(int cid,sqlite3_stmt * stmt,const DistributedDB::Type & type)302 int RDBDataGenerator::BindOneColStmt(int cid, sqlite3_stmt *stmt, const DistributedDB::Type &type)
303 {
304     switch (type.index()) {
305         case DistributedDB::TYPE_INDEX<int64_t>:
306             return SQLiteUtils::BindInt64ToStatement(stmt, cid, std::get<int64_t>(type));
307         case DistributedDB::TYPE_INDEX<std::string>:
308             return SQLiteUtils::BindTextToStatement(stmt, cid, std::get<std::string>(type));
309         case DistributedDB::TYPE_INDEX<DistributedDB::Assets>: {
310             auto assets = std::get<Assets>(type);
311             std::vector<uint8_t> blob;
312             int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blob);
313             if (errCode != E_OK) {
314                 return errCode;
315             }
316             return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
317         }
318         case DistributedDB::TYPE_INDEX<DistributedDB::Asset>: {
319             auto assets = std::get<Asset>(type);
320             std::vector<uint8_t> blob;
321             int errCode = RuntimeContext::GetInstance()->AssetToBlob(assets, blob);
322             if (errCode != E_OK) {
323                 return errCode;
324             }
325             return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
326         }
327     }
328     return E_OK;
329 }
330 
GetUpsertSQL(const DistributedDB::TableSchema & schema)331 std::string RDBDataGenerator::GetUpsertSQL(const DistributedDB::TableSchema &schema)
332 {
333     std::string sql = "INSERT INTO " + schema.name + "(";
334     for (const auto &field : schema.fields) {
335         sql += field.colName + ",";
336     }
337     sql.pop_back();
338     sql += ") VALUES(";
339     std::string pkFields;
340     std::string noPkFields;
341     for (const auto &field : schema.fields) {
342         sql += "?,";
343         if (field.primary) {
344             pkFields += field.colName + ",";
345         } else {
346             noPkFields += field.colName + "=?,";
347         }
348     }
349     pkFields.pop_back();
350     noPkFields.pop_back();
351     sql.pop_back();
352     sql += ") ON CONFLICT(" + pkFields + ") DO UPDATE SET " + noPkFields;
353     LOGI("upsert sql is %s", sql.c_str());
354     return sql;
355 }
356 
GetUpdateSQL(const TableSchema & schema)357 std::string RDBDataGenerator::GetUpdateSQL(const TableSchema &schema)
358 {
359     std::string pkFields;
360     std::string noPkFields;
361     for (const auto &field : schema.fields) {
362         if (field.primary) {
363             pkFields += "'" + field.colName + "'=?,";
364         } else {
365             noPkFields += "'" + field.colName + "'=?,";
366         }
367     }
368     pkFields.pop_back();
369     noPkFields.pop_back();
370     std::string sql = "UPDATE " + schema.name + " SET ";
371     sql += noPkFields + " WHERE " + pkFields;
372     LOGI("upsert sql is %s", sql.c_str());
373     return sql;
374 }
375 
InsertVirtualLocalDBData(int64_t begin,int64_t count,DistributedDB::RelationalVirtualDevice * device,const DistributedDB::TableSchema & schema,const bool isDelete)376 int RDBDataGenerator::InsertVirtualLocalDBData(int64_t begin, int64_t count,
377     DistributedDB::RelationalVirtualDevice *device, const DistributedDB::TableSchema &schema, const bool isDelete)
378 {
379     if (device == nullptr) {
380         return -E_INVALID_ARGS;
381     }
382     std::vector<VirtualRowData> rows;
383     for (int64_t index = begin; index < count; ++index) {
384         VirtualRowData virtualRowData;
385         for (const auto &field : schema.fields) {
386             auto type = GetColValueByType(index, field);
387             FillTypeIntoDataValue(field, type, virtualRowData);
388         }
389         virtualRowData.logInfo.timestamp = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
390         if (isDelete) {
391             virtualRowData.logInfo.flag = virtualRowData.logInfo.flag | DataItem::DELETE_FLAG;
392         }
393         rows.push_back(virtualRowData);
394     }
395     return device->PutData(schema.name, rows);
396 }
397 
ParseSchema(const DistributedDB::DataBaseSchema & schema,bool syncOnlyPk)398 DistributedDB::DistributedSchema RDBDataGenerator::ParseSchema(const DistributedDB::DataBaseSchema &schema,
399     bool syncOnlyPk)
400 {
401     DistributedDB::DistributedSchema res;
402     for (const auto &item : schema.tables) {
403         DistributedTable table;
404         for (const auto &field : item.fields) {
405             DistributedField distributedField;
406             distributedField.isP2pSync = syncOnlyPk ? field.primary : true;
407             distributedField.colName = field.colName;
408             distributedField.isSpecified = field.primary;
409             table.fields.push_back(distributedField);
410         }
411         table.tableName = item.name;
412         res.tables.push_back(table);
413     }
414     return res;
415 }
416 
FillTypeIntoDataValue(const DistributedDB::Field & field,const DistributedDB::Type & type,DistributedDB::VirtualRowData & virtualRow)417 void RDBDataGenerator::FillTypeIntoDataValue(const DistributedDB::Field &field, const DistributedDB::Type &type,
418     DistributedDB::VirtualRowData &virtualRow)
419 {
420     DataValue dataValue;
421     std::string hash;
422     switch (type.index()) {
423         case DistributedDB::TYPE_INDEX<int64_t>:
424             if (field.primary) {
425                 hash = std::to_string(std::get<int64_t>(type));
426             }
427             dataValue = std::get<int64_t>(type);
428             break;
429         case DistributedDB::TYPE_INDEX<std::string>:
430             if (field.primary) {
431                 hash = std::get<std::string>(type);
432             }
433             dataValue = std::get<std::string>(type);
434             break;
435         default:
436             return;
437     }
438     if (field.primary) {
439         std::vector<uint8_t> blob;
440         DBCommon::StringToVector(hash, blob);
441         DBCommon::CalcValueHash(blob, virtualRow.logInfo.hashKey);
442     }
443     virtualRow.objectData.PutDataValue(field.colName, dataValue);
444 }
445 
PrepareVirtualDeviceEnv(const std::string & tableName,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)446 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &tableName, sqlite3 *db,
447     DistributedDB::RelationalVirtualDevice *device)
448 {
449     return PrepareVirtualDeviceEnv(tableName, tableName, db, device);
450 }
451 
PrepareVirtualDeviceEnv(const std::string & scanTable,const std::string & expectTable,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)452 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &scanTable, const std::string &expectTable, sqlite3 *db,
453     DistributedDB::RelationalVirtualDevice *device)
454 {
455     TableInfo tableInfo;
456     int errCode = SQLiteUtils::AnalysisSchema(db, scanTable, tableInfo);
457     if (errCode != E_OK) {
458         return errCode;
459     }
460     tableInfo.SetTableName(expectTable);
461     device->SetLocalFieldInfo(tableInfo.GetFieldInfos());
462     device->SetTableInfo(tableInfo);
463     return E_OK;
464 }
465 
FlipTableSchema(const DistributedDB::TableSchema & origin)466 DistributedDB::TableSchema RDBDataGenerator::FlipTableSchema(const DistributedDB::TableSchema &origin)
467 {
468     DistributedDB::TableSchema res;
469     res.name = origin.name;
470     for (const auto &item : origin.fields) {
471         res.fields.insert(res.fields.begin(), item);
472     }
473     return res;
474 }
475 
GenerateBytes(int64_t index)476 Bytes RDBDataGenerator::GenerateBytes(int64_t index)
477 {
478     std::string str = std::to_string(index);
479     return std::vector<uint8_t>(str.begin(), str.end());
480 }
481 }