• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_data_generator.h"
17 
18 #include "distributeddb_tools_unit_test.h"
19 #include "log_print.h"
20 #include "res_finalizer.h"
21 namespace DistributedDBUnitTest {
22 using namespace DistributedDB;
InitDatabase(const DataBaseSchema & schema,sqlite3 & db)23 int RDBDataGenerator::InitDatabase(const DataBaseSchema &schema, sqlite3 &db)
24 {
25     int errCode = RelationalTestUtils::ExecSql(&db, "PRAGMA journal_mode=WAL;");
26     if (errCode != SQLITE_OK) {
27         return errCode;
28     }
29     for (const auto &table : schema.tables) {
30         errCode = InitTable(table, false, db);
31         if (errCode != SQLITE_OK) {
32             break;
33         }
34     }
35     return errCode;
36 }
37 
InitTable(const DistributedDB::TableSchema & table,bool notNullWithStr,sqlite3 & db)38 int RDBDataGenerator::InitTable(const DistributedDB::TableSchema &table, bool notNullWithStr, sqlite3 &db)
39 {
40     return InitTable(table, notNullWithStr, false, db);
41 }
42 
InitTable(const TableSchema & table,bool notNullWithStr,bool isAutoIncrement,sqlite3 & db)43 int RDBDataGenerator::InitTable(const TableSchema &table, bool notNullWithStr, bool isAutoIncrement, sqlite3 &db)
44 {
45     std::string sql = "CREATE TABLE IF NOT EXISTS " + table.name + "(";
46     for (const auto &field : table.fields) {
47         sql += "'" + field.colName + "' " + GetTypeText(field.type);
48         if (field.primary) {
49             sql += " PRIMARY KEY";
50             if (isAutoIncrement) {
51                 sql += " AUTOINCREMENT";
52             }
53         }
54         if (notNullWithStr && field.type == TYPE_INDEX<std::string>) {
55             sql += " NOT NULL ON CONFLICT IGNORE";
56         }
57         sql += ",";
58     }
59     sql.pop_back();
60     sql += ");";
61     int errCode = RelationalTestUtils::ExecSql(&db, sql);
62     if (errCode != SQLITE_OK) {
63         LOGE("execute sql failed %d, sql is %s", errCode, sql.c_str());
64     }
65     return errCode;
66 }
67 
GetTypeText(int type)68 std::string RDBDataGenerator::GetTypeText(int type)
69 {
70     switch (type) {
71         case DistributedDB::TYPE_INDEX<int64_t>:
72             return "INTEGER";
73         case DistributedDB::TYPE_INDEX<std::string>:
74             return "TEXT";
75         case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
76             return "ASSETS";
77         case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
78             return "ASSET";
79         default:
80             return "";
81     }
82 }
83 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const DistributedDB::DataBaseSchema & schema,const std::shared_ptr<DistributedDB::VirtualCloudDb> & virtualCloudDb)84 DistributedDB::DBStatus RDBDataGenerator::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
85     const DistributedDB::DataBaseSchema &schema,
86     const std::shared_ptr<DistributedDB::VirtualCloudDb> &virtualCloudDb)
87 {
88     for (const auto &table : schema.tables) {
89         auto [record, extend] = GenerateDataRecords(begin, count, gidStart, table.fields);
90         DBStatus res = virtualCloudDb->BatchInsertWithGid(table.name, std::move(record), extend);
91         if (res != DBStatus::OK) {
92             return res;
93         }
94     }
95     return DBStatus::OK;
96 }
97 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,const std::vector<Field> & fields)98 std::pair<std::vector<VBucket>, std::vector<VBucket>> RDBDataGenerator::GenerateDataRecords(int64_t begin,
99     int64_t count, int64_t gidStart, const std::vector<Field> &fields)
100 {
101     std::vector<VBucket> record;
102     std::vector<VBucket> extend;
103     Timestamp now = TimeHelper::GetSysCurrentTime();
104     auto time = static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND);
105     for (int64_t i = begin; i < begin + count; i++) {
106         VBucket data;
107         for (const auto &field : fields) {
108             FillColValueByType(i, field, data);
109         }
110         record.push_back(data);
111 
112         VBucket log;
113         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, time);
114         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, time);
115         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
116         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
117         extend.push_back(log);
118         time++;
119     }
120     return {record, extend};
121 }
122 
FillColValueByType(int64_t index,const Field & field,VBucket & vBucket)123 void RDBDataGenerator::FillColValueByType(int64_t index, const Field &field, VBucket &vBucket)
124 {
125     vBucket[field.colName] = GetColValueByType(index, field);
126 }
127 
GetColValueByType(int64_t index,const DistributedDB::Field & field)128 DistributedDB::Type RDBDataGenerator::GetColValueByType(int64_t index, const DistributedDB::Field &field)
129 {
130     Type value = Nil();
131     switch (field.type) {
132         case DistributedDB::TYPE_INDEX<int64_t>:
133             value = index;
134             break;
135         case DistributedDB::TYPE_INDEX<std::string>:
136             value = field.colName + "_" + std::to_string(index);
137             break;
138         case DistributedDB::TYPE_INDEX<DistributedDB::Assets>:
139             value = GenerateAssets(index, field);
140             break;
141         case DistributedDB::TYPE_INDEX<DistributedDB::Asset>:
142             value = GenerateAsset(index, field);
143             break;
144     }
145     return value;
146 }
147 
GenerateAsset(int64_t index,const DistributedDB::Field & field)148 Asset RDBDataGenerator::GenerateAsset(int64_t index, const DistributedDB::Field &field)
149 {
150     Asset asset;
151     asset.name = field.colName + "_" + std::to_string(index);
152     asset.hash = "default_hash";
153     return asset;
154 }
155 
GenerateAssets(int64_t index,const Field & field)156 Assets RDBDataGenerator::GenerateAssets(int64_t index, const Field &field)
157 {
158     Assets assets;
159     assets.push_back(GenerateAsset(index, field));
160     return assets;
161 }
162 
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::DataBaseSchema & schema)163 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
164     const DistributedDB::DataBaseSchema &schema)
165 {
166     for (const auto &table : schema.tables) {
167         int errCode = InsertLocalDBData(begin, count, db, table);
168         if (errCode != E_OK) {
169             return errCode;
170         }
171     }
172     return E_OK;
173 }
174 
InsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)175 int RDBDataGenerator::InsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
176     const DistributedDB::TableSchema &schema)
177 {
178     if (schema.fields.empty()) {
179         return -E_INTERNAL_ERROR;
180     }
181     std::string sql = "INSERT OR REPLACE INTO " + schema.name + " VALUES(";
182     for (size_t i = 0; i < schema.fields.size(); ++i) {
183         sql += "?,";
184     }
185     sql.pop_back();
186     sql += ");";
187     for (int64_t i = begin; i < begin + count; i++) {
188         sqlite3_stmt *stmt = nullptr;
189         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
190         if (errCode != E_OK) {
191             return errCode;
192         }
193         ResFinalizer resFinalizer([stmt]() {
194             sqlite3_stmt *sqlite3Stmt = stmt;
195             int ret = E_OK;
196             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
197         });
198         errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
199         if (errCode != E_OK) {
200             return errCode;
201         }
202         errCode = SQLiteUtils::StepNext(stmt);
203         if (errCode != -E_FINISHED) {
204             return errCode;
205         }
206     }
207     return E_OK;
208 }
209 
UpsertLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const DistributedDB::TableSchema & schema)210 int RDBDataGenerator::UpsertLocalDBData(int64_t begin, int64_t count, sqlite3 *db,
211     const DistributedDB::TableSchema &schema)
212 {
213     if (schema.fields.empty()) {
214         return -E_INTERNAL_ERROR;
215     }
216     std::string sql = GetUpsertSQL(schema);
217     for (int64_t i = begin; i < begin + count; i++) {
218         sqlite3_stmt *stmt = nullptr;
219         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
220         if (errCode != E_OK) {
221             return errCode;
222         }
223         ResFinalizer resFinalizer([stmt]() {
224             sqlite3_stmt *sqlite3Stmt = stmt;
225             int ret = E_OK;
226             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
227         });
228         errCode = BindOneRowStmt(i, stmt, 0, false, schema.fields);
229         if (errCode != E_OK) {
230             return errCode;
231         }
232         errCode = BindOneRowStmt(i, stmt, static_cast<int>(schema.fields.size()), true, schema.fields);
233         if (errCode != E_OK) {
234             return errCode;
235         }
236         errCode = SQLiteUtils::StepNext(stmt);
237         if (errCode != -E_FINISHED) {
238             return errCode;
239         }
240     }
241     return E_OK;
242 }
243 
UpdateLocalDBData(int64_t begin,int64_t count,sqlite3 * db,const TableSchema & schema)244 int RDBDataGenerator::UpdateLocalDBData(int64_t begin, int64_t count, sqlite3 *db, const TableSchema &schema)
245 {
246     if (schema.fields.empty()) {
247         return -E_INTERNAL_ERROR;
248     }
249     std::string sql = GetUpdateSQL(schema);
250     for (int64_t i = begin; i < begin + count; i++) {
251         sqlite3_stmt *stmt = nullptr;
252         int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
253         if (errCode != E_OK) {
254             return errCode;
255         }
256         ResFinalizer resFinalizer([stmt]() {
257             sqlite3_stmt *sqlite3Stmt = stmt;
258             int ret = E_OK;
259             SQLiteUtils::ResetStatement(sqlite3Stmt, true, ret);
260         });
261         errCode = BindOneRowUpdateStmt(i, stmt, 0, schema.fields);
262         if (errCode != E_OK) {
263             return errCode;
264         }
265         errCode = SQLiteUtils::StepNext(stmt);
266         if (errCode != -E_FINISHED) {
267             return errCode;
268         }
269     }
270     return E_OK;
271 }
272 
BindOneRowStmt(int64_t index,sqlite3_stmt * stmt,int cid,bool withoutPk,const std::vector<DistributedDB::Field> & fields)273 int RDBDataGenerator::BindOneRowStmt(int64_t index, sqlite3_stmt *stmt, int cid, bool withoutPk,
274     const std::vector<DistributedDB::Field> &fields)
275 {
276     for (const auto &field : fields) {
277         if (withoutPk && field.primary) {
278             continue;
279         }
280         cid++;
281         auto type = GetColValueByType(index, field);
282         int errCode = BindOneColStmt(cid, stmt, type);
283         if (errCode != E_OK) {
284             return errCode;
285         }
286     }
287     return E_OK;
288 }
289 
BindOneRowUpdateStmt(int64_t index,sqlite3_stmt * stmt,int cid,const std::vector<DistributedDB::Field> & fields)290 int RDBDataGenerator::BindOneRowUpdateStmt(int64_t index, sqlite3_stmt *stmt, int cid,
291     const std::vector<DistributedDB::Field> &fields)
292 {
293     std::vector<Field> pkFields;
294     for (const auto &field : fields) {
295         if (field.primary) {
296             pkFields.push_back(field);
297             continue;
298         }
299         cid++;
300         auto type = GetColValueByType(index, field);
301         int errCode = BindOneColStmt(cid, stmt, type);
302         if (errCode != E_OK) {
303             return errCode;
304         }
305     }
306     return BindOneRowStmt(index, stmt, cid, false, pkFields);
307 }
308 
BindOneColStmt(int cid,sqlite3_stmt * stmt,const DistributedDB::Type & type)309 int RDBDataGenerator::BindOneColStmt(int cid, sqlite3_stmt *stmt, const DistributedDB::Type &type)
310 {
311     switch (type.index()) {
312         case DistributedDB::TYPE_INDEX<int64_t>:
313             return SQLiteUtils::BindInt64ToStatement(stmt, cid, std::get<int64_t>(type));
314         case DistributedDB::TYPE_INDEX<std::string>:
315             return SQLiteUtils::BindTextToStatement(stmt, cid, std::get<std::string>(type));
316         case DistributedDB::TYPE_INDEX<DistributedDB::Assets>: {
317             auto assets = std::get<Assets>(type);
318             std::vector<uint8_t> blob;
319             int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blob);
320             if (errCode != E_OK) {
321                 return errCode;
322             }
323             return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
324         }
325         case DistributedDB::TYPE_INDEX<DistributedDB::Asset>: {
326             auto assets = std::get<Asset>(type);
327             std::vector<uint8_t> blob;
328             int errCode = RuntimeContext::GetInstance()->AssetToBlob(assets, blob);
329             if (errCode != E_OK) {
330                 return errCode;
331             }
332             return SQLiteUtils::BindBlobToStatement(stmt, cid, blob);
333         }
334     }
335     return E_OK;
336 }
337 
GetUpsertSQL(const DistributedDB::TableSchema & schema)338 std::string RDBDataGenerator::GetUpsertSQL(const DistributedDB::TableSchema &schema)
339 {
340     std::string sql = "INSERT INTO " + schema.name + "(";
341     for (const auto &field : schema.fields) {
342         sql += field.colName + ",";
343     }
344     sql.pop_back();
345     sql += ") VALUES(";
346     std::string pkFields;
347     std::string noPkFields;
348     for (const auto &field : schema.fields) {
349         sql += "?,";
350         if (field.primary) {
351             pkFields += field.colName + ",";
352         } else {
353             noPkFields += field.colName + "=?,";
354         }
355     }
356     pkFields.pop_back();
357     noPkFields.pop_back();
358     sql.pop_back();
359     sql += ") ON CONFLICT(" + pkFields + ") DO UPDATE SET " + noPkFields;
360     LOGI("upsert sql is %s", sql.c_str());
361     return sql;
362 }
363 
GetUpdateSQL(const TableSchema & schema)364 std::string RDBDataGenerator::GetUpdateSQL(const TableSchema &schema)
365 {
366     std::string pkFields;
367     std::string noPkFields;
368     for (const auto &field : schema.fields) {
369         if (field.primary) {
370             pkFields += "'" + field.colName + "'=?,";
371         } else {
372             noPkFields += "'" + field.colName + "'=?,";
373         }
374     }
375     pkFields.pop_back();
376     noPkFields.pop_back();
377     std::string sql = "UPDATE " + schema.name + " SET ";
378     sql += noPkFields + " WHERE " + pkFields;
379     LOGI("upsert sql is %s", sql.c_str());
380     return sql;
381 }
382 
InsertVirtualLocalDBData(int64_t begin,int64_t count,DistributedDB::RelationalVirtualDevice * device,const DistributedDB::TableSchema & schema)383 int RDBDataGenerator::InsertVirtualLocalDBData(int64_t begin, int64_t count,
384     DistributedDB::RelationalVirtualDevice *device, const DistributedDB::TableSchema &schema)
385 {
386     if (device == nullptr) {
387         return -E_INVALID_ARGS;
388     }
389     std::vector<VirtualRowData> rows;
390     for (int64_t index = begin; index < count; ++index) {
391         VirtualRowData virtualRowData;
392         for (const auto &field : schema.fields) {
393             auto type = GetColValueByType(index, field);
394             FillTypeIntoDataValue(field, type, virtualRowData);
395         }
396         virtualRowData.logInfo.timestamp = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
397         rows.push_back(virtualRowData);
398     }
399     return device->PutData(schema.name, rows);
400 }
401 
ParseSchema(const DistributedDB::DataBaseSchema & schema,bool syncOnlyPk)402 DistributedDB::DistributedSchema RDBDataGenerator::ParseSchema(const DistributedDB::DataBaseSchema &schema,
403     bool syncOnlyPk)
404 {
405     DistributedDB::DistributedSchema res;
406     for (const auto &item : schema.tables) {
407         DistributedTable table;
408         for (const auto &field : item.fields) {
409             DistributedField distributedField;
410             distributedField.isP2pSync = syncOnlyPk ? field.primary : true;
411             distributedField.colName = field.colName;
412             distributedField.isSpecified = field.primary;
413             table.fields.push_back(distributedField);
414         }
415         table.tableName = item.name;
416         res.tables.push_back(table);
417     }
418     return res;
419 }
420 
FillTypeIntoDataValue(const DistributedDB::Field & field,const DistributedDB::Type & type,DistributedDB::VirtualRowData & virtualRow)421 void RDBDataGenerator::FillTypeIntoDataValue(const DistributedDB::Field &field, const DistributedDB::Type &type,
422     DistributedDB::VirtualRowData &virtualRow)
423 {
424     DataValue dataValue;
425     std::string hash;
426     switch (type.index()) {
427         case DistributedDB::TYPE_INDEX<int64_t>:
428             if (field.primary) {
429                 hash = std::to_string(std::get<int64_t>(type));
430             }
431             dataValue = std::get<int64_t>(type);
432             break;
433         case DistributedDB::TYPE_INDEX<std::string>:
434             if (field.primary) {
435                 hash = std::get<std::string>(type);
436             }
437             dataValue = std::get<std::string>(type);
438             break;
439         default:
440             return;
441     }
442     if (field.primary) {
443         std::vector<uint8_t> blob;
444         DBCommon::StringToVector(hash, blob);
445         DBCommon::CalcValueHash(blob, virtualRow.logInfo.hashKey);
446     }
447     virtualRow.objectData.PutDataValue(field.colName, dataValue);
448 }
449 
PrepareVirtualDeviceEnv(const std::string & tableName,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)450 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &tableName, sqlite3 *db,
451     DistributedDB::RelationalVirtualDevice *device)
452 {
453     return PrepareVirtualDeviceEnv(tableName, tableName, db, device);
454 }
455 
PrepareVirtualDeviceEnv(const std::string & scanTable,const std::string & expectTable,sqlite3 * db,DistributedDB::RelationalVirtualDevice * device)456 int RDBDataGenerator::PrepareVirtualDeviceEnv(const std::string &scanTable, const std::string &expectTable, sqlite3 *db,
457     DistributedDB::RelationalVirtualDevice *device)
458 {
459     TableInfo tableInfo;
460     int errCode = SQLiteUtils::AnalysisSchema(db, scanTable, tableInfo);
461     if (errCode != E_OK) {
462         return errCode;
463     }
464     tableInfo.SetTableName(expectTable);
465     device->SetLocalFieldInfo(tableInfo.GetFieldInfos());
466     device->SetTableInfo(tableInfo);
467     return E_OK;
468 }
469 
FlipTableSchema(const DistributedDB::TableSchema & origin)470 DistributedDB::TableSchema RDBDataGenerator::FlipTableSchema(const DistributedDB::TableSchema &origin)
471 {
472     DistributedDB::TableSchema res;
473     res.name = origin.name;
474     for (const auto &item : origin.fields) {
475         res.fields.insert(res.fields.begin(), item);
476     }
477     return res;
478 }
479 }