1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 #include <algorithm>
18 #include "data_transformer.h"
19 #include "db_common.h"
20
21 namespace DistributedDB {
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable)22 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable)
23 : SQLiteStorageExecutor(dbHandle, writable, false)
24 {}
25
CreateDistributedTable(const std::string & tableName,TableInfo & table,bool isUpgrade)26 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(const std::string &tableName, TableInfo &table,
27 bool isUpgrade)
28 {
29 if (dbHandle_ == nullptr) {
30 return -E_INVALID_DB;
31 }
32
33 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
34 if (errCode != E_OK) {
35 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
36 return errCode;
37 }
38
39 if (table.GetCreateTableSql().find("WITHOUT ROWID") != std::string::npos) {
40 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
41 return -E_NOT_SUPPORT;
42 }
43
44 bool isTableEmpty = false;
45 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isTableEmpty);
46 if (errCode != E_OK) {
47 LOGE("[CreateDistributedTable] Check table [%s] is empty failed. %d", tableName.c_str(), errCode);
48 return errCode;
49 }
50
51 if (!isUpgrade && !isTableEmpty) { // create distributed table should on an empty table
52 LOGE("[CreateDistributedTable] Create distributed table should on an empty table when first create.");
53 return -E_NOT_SUPPORT;
54 }
55
56 // create log table
57 errCode = SQLiteUtils::CreateRelationalLogTable(dbHandle_, tableName);
58 if (errCode != E_OK) {
59 LOGE("[CreateDistributedTable] create log table failed");
60 return errCode;
61 }
62
63 // add trigger
64 errCode = SQLiteUtils::AddRelationalLogTableTrigger(dbHandle_, table);
65 if (errCode != E_OK) {
66 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
67 return errCode;
68 }
69 return E_OK;
70 }
71
UpgradeDistributedTable(const TableInfo & tableInfo,TableInfo & newTableInfo)72 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const TableInfo &tableInfo,
73 TableInfo &newTableInfo)
74 {
75 if (dbHandle_ == nullptr) {
76 return -E_INVALID_DB;
77 }
78
79 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableInfo.GetTableName(), newTableInfo);
80 if (errCode != E_OK) {
81 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
82 return errCode;
83 }
84
85 if (newTableInfo.GetCreateTableSql().find("WITHOUT ROWID") != std::string::npos) {
86 LOGE("[UpgradeDistributedTable] Not support create distributed table without rowid.");
87 return -E_NOT_SUPPORT;
88 }
89
90 // new table should has same or compatible upgrade
91 errCode = tableInfo.CompareWithTable(newTableInfo);
92 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
93 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
94 return -E_SCHEMA_MISMATCH;
95 }
96
97 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
98 if (errCode != E_OK) {
99 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
100 }
101
102 return errCode;
103 }
104
105 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)106 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
107 std::vector<std::string> &deviceTables)
108 {
109 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
110 return -E_INVALID_ARGS;
111 }
112 std::string deviceHash = DBCommon::TransferStringToHex(DBCommon::TransferHashString(device));
113 std::string devicePattern = device.empty() ? "%" : deviceHash;
114 std::string tablePattern = tableName.empty() ? "%" : tableName;
115 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
116
117 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
118 deviceTableName + "';";
119 sqlite3_stmt *stmt = nullptr;
120 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
121 if (errCode != E_OK) {
122 SQLiteUtils::ResetStatement(stmt, true, errCode);
123 return errCode;
124 }
125
126 do {
127 errCode = SQLiteUtils::StepWithRetry(stmt, false);
128 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
129 errCode = E_OK;
130 break;
131 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
132 LOGE("Get table name failed. %d", errCode);
133 break;
134 }
135 std::string realTableName;
136 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
137 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
138 continue;
139 }
140 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
141 continue;
142 }
143 deviceTables.emplace_back(realTableName);
144 } while (true);
145
146 SQLiteUtils::ResetStatement(stmt, true, errCode);
147 return errCode;
148 }
149
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)150 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
151 {
152 std::vector<FieldInfo> fields;
153 auto itOld = oldTableInfo.GetFields().begin();
154 auto itNew = newTableInfo.GetFields().begin();
155 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
156 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
157 fields.emplace_back(itNew->second);
158 continue;
159 }
160 itOld++;
161 }
162 return fields;
163 }
164
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)165 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
166 {
167 if (db == nullptr) {
168 return -E_INVALID_ARGS;
169 }
170
171 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
172 return a.GetColumnId()< b.GetColumnId();
173 });
174 int errCode = E_OK;
175 for (const auto &table : tables) {
176 for (const auto &field : fields) {
177 std::string alterSql = "ALTER TABLE " + table + " ADD " + field.GetFieldName() + " " + field.GetDataType();
178 alterSql += field.IsNotNull() ? " NOT NULL" : "";
179 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
180 alterSql += ";";
181 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
182 if (errCode != E_OK) {
183 LOGE("Alter table failed. %d", errCode);
184 break;
185 }
186 }
187 }
188 return errCode;
189 }
190
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)191 std::map<std::string, CompositeFields> GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
192 {
193 std::map<std::string, CompositeFields> indexes;
194 auto itOld = oldTableInfo.GetIndexDefine().begin();
195 auto itNew = newTableInfo.GetIndexDefine().begin();
196 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
197 auto itNewEnd = newTableInfo.GetIndexDefine().end();
198
199 while (itOld != itOldEnd && itNew != itNewEnd) {
200 if (itOld->first == itNew->first) {
201 if (itOld->second != itNew->second) {
202 indexes.insert({itNew->first, itNew->second});
203 }
204 itOld++;
205 itNew++;
206 } else if (itOld->first < itNew->first) {
207 indexes.insert({itOld->first,{}});
208 itOld++;
209 } else if (itOld->first > itNew->first) {
210 indexes.insert({itNew->first, itNew->second});
211 itNew++;
212 }
213 }
214
215 while (itOld != itOldEnd) {
216 indexes.insert({itOld->first,{}});
217 itOld++;
218 }
219
220 while (itNew != itNewEnd) {
221 indexes.insert({itNew->first, itNew->second});
222 itNew++;
223 }
224
225 return indexes;
226 }
227
Upgradeindexes(sqlite3 * db,const std::vector<std::string> & tables,const std::map<std::string,CompositeFields> & indexes)228 int Upgradeindexes(sqlite3 *db, const std::vector<std::string> &tables,
229 const std::map<std::string, CompositeFields> &indexes)
230 {
231 if (db == nullptr) {
232 return -E_INVALID_ARGS;
233 }
234
235 int errCode = E_OK;
236 for (const auto &table : tables) {
237 for (const auto &index : indexes) {
238 if (index.first.empty()) {
239 continue;
240 }
241 std::string realIndexName = table + "_" + index.first;
242 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
243 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
244 if (errCode != E_OK) {
245 LOGE("Drop index failed. %d", errCode);
246 return errCode;
247 }
248
249 if (index.second.empty()) { // empty means drop index only
250 continue;
251 }
252
253 auto it = index.second.begin();
254 std::string indexDefine = *it++;
255 while (it != index.second.end()) {
256 indexDefine += ", " + *it++;
257 }
258 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
259 "(" + indexDefine + ");";
260 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
261 if (errCode != E_OK) {
262 LOGE("Create index failed. %d", errCode);
263 break;
264 }
265 }
266 }
267 return errCode;
268 }
269 }
270
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)271 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
272 const TableInfo &newTableInfo)
273 {
274 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
275 std::map<std::string, CompositeFields> upgradeIndexces = GetChangedIndexes(oldTableInfo, newTableInfo);
276 std::vector<std::string> deviceTables;
277 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
278 if (errCode != E_OK) {
279 LOGE("Get device table name for alter table failed. %d", errCode);
280 return errCode;
281 }
282
283 LOGD("Begin to alter table: upgrade fields[%d], indexces[%d], deviceTable[%d]", upgradeFields.size(),
284 upgradeIndexces.size(), deviceTables.size());
285 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
286 if (errCode != E_OK) {
287 LOGE("upgrade fields failed. %d", errCode);
288 return errCode;
289 }
290
291 errCode = Upgradeindexes(dbHandle_, deviceTables, upgradeIndexces);
292 if (errCode != E_OK) {
293 LOGE("upgrade indexes failed. %d", errCode);
294 }
295
296 return E_OK;
297 }
298
StartTransaction(TransactType type)299 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
300 {
301 if (dbHandle_ == nullptr) {
302 LOGE("Begin transaction failed, dbHandle is null.");
303 return -E_INVALID_DB;
304 }
305 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
306 if (errCode != E_OK) {
307 LOGE("Begin transaction failed, errCode = %d", errCode);
308 }
309 return errCode;
310 }
311
Commit()312 int SQLiteSingleVerRelationalStorageExecutor::Commit()
313 {
314 if (dbHandle_ == nullptr) {
315 return -E_INVALID_DB;
316 }
317
318 return SQLiteUtils::CommitTransaction(dbHandle_);
319 }
320
Rollback()321 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
322 {
323 if (dbHandle_ == nullptr) {
324 return -E_INVALID_DB;
325 }
326 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
327 if (errCode != E_OK) {
328 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
329 }
330 return errCode;
331 }
332
SetTableInfo(const TableInfo & tableInfo)333 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
334 {
335 table_ = tableInfo;
336 }
337
GetDataValueByType(sqlite3_stmt * statement,DataValue & value,int cid)338 static int GetDataValueByType(sqlite3_stmt *statement, DataValue &value, int cid)
339 {
340 int errCode = E_OK;
341 int storageType = sqlite3_column_type(statement, cid);
342 switch (storageType) {
343 case SQLITE_INTEGER: {
344 value = static_cast<int64_t>(sqlite3_column_int64(statement, cid));
345 break;
346 }
347 case SQLITE_FLOAT: {
348 value = sqlite3_column_double(statement, cid);
349 break;
350 }
351 case SQLITE_BLOB: {
352 std::vector<uint8_t> blobValue;
353 errCode = SQLiteUtils::GetColumnBlobValue(statement, cid, blobValue);
354 if (errCode != E_OK) {
355 return errCode;
356 }
357 auto blob = new (std::nothrow) Blob;
358 if (blob == nullptr) {
359 return -E_OUT_OF_MEMORY;
360 }
361 blob->WriteBlob(blobValue.data(), static_cast<uint32_t>(blobValue.size()));
362 errCode = value.Set(blob);
363 break;
364 }
365 case SQLITE_NULL: {
366 break;
367 }
368 case SQLITE3_TEXT: {
369 const char *colValue = reinterpret_cast<const char *>(sqlite3_column_text(statement, cid));
370 if (colValue == nullptr) {
371 value.ResetValue();
372 } else {
373 value = std::string(colValue);
374 if (value.GetType() == StorageType::STORAGE_TYPE_NULL) {
375 errCode = -E_OUT_OF_MEMORY;
376 }
377 }
378 break;
379 }
380 default: {
381 break;
382 }
383 }
384 return errCode;
385 }
386
BindDataValueByType(sqlite3_stmt * statement,const std::optional<DataValue> & data,int cid)387 static int BindDataValueByType(sqlite3_stmt *statement, const std::optional<DataValue> &data, int cid)
388 {
389 int errCode = E_OK;
390 StorageType type = data.value().GetType();
391 switch (type) {
392 case StorageType::STORAGE_TYPE_INTEGER: {
393 int64_t intData = 0;
394 (void)data.value().GetInt64(intData);
395 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int64(statement, cid, intData));
396 break;
397 }
398
399 case StorageType::STORAGE_TYPE_REAL: {
400 double doubleData = 0;
401 (void)data.value().GetDouble(doubleData);
402 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(statement, cid, doubleData));
403 break;
404 }
405
406 case StorageType::STORAGE_TYPE_TEXT: {
407 std::string strData;
408 (void)data.value().GetText(strData);
409 errCode = SQLiteUtils::BindTextToStatement(statement, cid, strData);
410 break;
411 }
412
413 case StorageType::STORAGE_TYPE_BLOB: {
414 Blob blob;
415 (void)data.value().GetBlob(blob);
416 std::vector<uint8_t> blobData(blob.GetData(), blob.GetData() + blob.GetSize());
417 errCode = SQLiteUtils::BindBlobToStatement(statement, cid, blobData, true);
418 break;
419 }
420
421 case StorageType::STORAGE_TYPE_NULL: {
422 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(statement, cid));
423 break;
424 }
425
426 default:
427 break;
428 }
429 return errCode;
430 }
431
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)432 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
433 {
434 logInfo.dataKey = sqlite3_column_int64(logStatement, 0); // 0 means dataKey index
435
436 std::vector<uint8_t> dev;
437 int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev); // 1 means dev index
438 if (errCode != E_OK) {
439 return errCode;
440 }
441 logInfo.device = std::string(dev.begin(), dev.end());
442
443 std::vector<uint8_t> oriDev;
444 errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev); // 2 means ori_dev index
445 if (errCode != E_OK) {
446 return errCode;
447 }
448 logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
449 logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3)); // 3 means timestamp index
450 logInfo.wTimeStamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4)); // 4 means w_timestamp index
451 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5)); // 5 means flag index
452 logInfo.flag &= (~DataItem::LOCAL_FLAG);
453 logInfo.flag &= (~DataItem::UPDATE_FLAG);
454 return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey); // 6 means hashKey index
455 }
456
GetDataItemSerialSize(DataItem & item,size_t appendLen)457 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
458 {
459 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
460 // the size would not be very large.
461 static const size_t maxOrigDevLength = 40;
462 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
463 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
464 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
465 return dataSize;
466 }
467
GetKvData(const Key & key,Value & value) const468 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
469 {
470 static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
471 "metadata WHERE key=?;";
472 sqlite3_stmt *statement = nullptr;
473 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
474 if (errCode != E_OK) {
475 goto END;
476 }
477
478 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
479 if (errCode != E_OK) {
480 goto END;
481 }
482
483 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
484 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
485 errCode = -E_NOT_FOUND;
486 goto END;
487 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
488 goto END;
489 }
490
491 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
492 END:
493 SQLiteUtils::ResetStatement(statement, true, errCode);
494 return errCode;
495 }
496
PutKvData(const Key & key,const Value & value) const497 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
498 {
499 static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
500 "metadata VALUES(?,?);";
501 sqlite3_stmt *statement = nullptr;
502 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
503 if (errCode != E_OK) {
504 goto ERROR;
505 }
506
507 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // 1 means key index
508 if (errCode != E_OK) {
509 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
510 goto ERROR;
511 }
512
513 errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true); // 2 means value index
514 if (errCode != E_OK) {
515 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
516 goto ERROR;
517 }
518 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
519 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
520 errCode = E_OK;
521 }
522 ERROR:
523 SQLiteUtils::ResetStatement(statement, true, errCode);
524 return errCode;
525 }
526
DeleteMetaData(const std::vector<Key> & keys) const527 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
528 {
529 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
530 "metadata WHERE key=?;";
531 sqlite3_stmt *statement = nullptr;
532 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
533 if (errCode != E_OK) {
534 return errCode;
535 }
536
537 for (const auto &key : keys) {
538 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
539 if (errCode != E_OK) {
540 break;
541 }
542
543 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
544 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
545 break;
546 }
547 errCode = E_OK;
548 SQLiteUtils::ResetStatement(statement, false, errCode);
549 }
550 SQLiteUtils::ResetStatement(statement, true, errCode);
551 return CheckCorruptedStatus(errCode);
552 }
553
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const554 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
555 {
556 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
557 "metadata WHERE key>=? AND key<=?;";
558 sqlite3_stmt *statement = nullptr;
559 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
560 if (errCode != E_OK) {
561 return errCode;
562 }
563
564 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
565 if (errCode == E_OK) {
566 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
567 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
568 errCode = E_OK;
569 }
570 }
571 SQLiteUtils::ResetStatement(statement, true, errCode);
572 return CheckCorruptedStatus(errCode);
573 }
574
GetAllKeys(sqlite3_stmt * statement,std::vector<Key> & keys)575 static int GetAllKeys(sqlite3_stmt *statement, std::vector<Key> &keys)
576 {
577 if (statement == nullptr) {
578 return -E_INVALID_DB;
579 }
580 int errCode;
581 do {
582 errCode = SQLiteUtils::StepWithRetry(statement, false);
583 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
584 Key key;
585 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, key);
586 if (errCode != E_OK) {
587 break;
588 }
589
590 keys.push_back(std::move(key));
591 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
592 errCode = E_OK;
593 break;
594 } else {
595 LOGE("SQLite step for getting all keys failed:%d", errCode);
596 break;
597 }
598 } while (true);
599 return errCode;
600 }
601
GetAllMetaKeys(std::vector<Key> & keys) const602 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
603 {
604 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
605 sqlite3_stmt *statement = nullptr;
606 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
607 if (errCode != E_OK) {
608 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
609 return errCode;
610 }
611 errCode = GetAllKeys(statement, keys);
612 SQLiteUtils::ResetStatement(statement, true, errCode);
613 return errCode;
614 }
615
PrepareForSavingLog(const QueryObject & object,const std::string & deviceName,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt) const616 int SQLiteSingleVerRelationalStorageExecutor::PrepareForSavingLog(const QueryObject &object,
617 const std::string &deviceName, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt) const
618 {
619 std::string devName = DBCommon::TransferHashString(deviceName);
620 const std::string tableName = DBConstant::RELATIONAL_PREFIX + object.GetTableName() + "_log";
621 std::string dataFormat = "?, '" + deviceName + "', ?, ?, ?, ?, ?";
622 std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key";
623 std::string sql = "INSERT OR REPLACE INTO " + tableName +
624 " (" + columnList + ") VALUES (" + dataFormat + ");";
625 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, logStmt);
626 if (errCode != E_OK) {
627 LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
628 return errCode;
629 }
630 std::string selectSql = "select " + columnList + " from " + tableName + " where hash_key = ? and device = ?;";
631 errCode = SQLiteUtils::GetStatement(dbHandle_, selectSql, queryStmt);
632 if (errCode != E_OK) {
633 SQLiteUtils::ResetStatement(logStmt, true, errCode);
634 LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
635 }
636 return errCode;
637 }
638
PrepareForSavingData(const QueryObject & object,sqlite3_stmt * & statement) const639 int SQLiteSingleVerRelationalStorageExecutor::PrepareForSavingData(const QueryObject &object,
640 sqlite3_stmt *&statement) const
641 {
642 std::string colName;
643 std::string dataFormat;
644 for (size_t colId = 0; colId < table_.GetFields().size(); ++colId) {
645 colName += table_.GetFieldName(colId) + ",";
646 dataFormat += "?,";
647 }
648 colName.pop_back();
649 dataFormat.pop_back();
650
651 const std::string sql = "INSERT OR REPLACE INTO " + table_.GetTableName() +
652 " (" + colName + ") VALUES (" + dataFormat + ");";
653 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
654 if (errCode != E_OK) {
655 LOGE("[info statement] Get saving data statement fail! errCode:%d", errCode);
656 }
657 return errCode;
658 }
659
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,TimeStamp & maxTimestamp,int64_t rowid)660 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
661 const DataItem &dataItem, TimeStamp &maxTimestamp, int64_t rowid)
662 {
663 int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey); // 1 means hashkey index.
664 if (errCode != E_OK) {
665 return errCode;
666 }
667 errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev); // 2 means device index.
668 if (errCode != E_OK) {
669 return errCode;
670 }
671
672 LogInfo logInfoGet;
673 errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
674 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
675 errCode = -E_NOT_FOUND;
676 } else {
677 errCode = GetLogData(queryStmt, logInfoGet);
678 }
679
680 LogInfo logInfoBind;
681 logInfoBind.hashKey = dataItem.hashKey;
682 logInfoBind.device = dataItem.dev;
683 logInfoBind.timestamp = dataItem.timeStamp;
684 logInfoBind.flag = dataItem.flag;
685 logInfoBind.wTimeStamp = maxTimestamp;
686
687 if (errCode == -E_NOT_FOUND) { // insert
688 logInfoBind.originDev = dataItem.dev;
689 } else if (errCode == E_OK) { // update
690 logInfoBind.wTimeStamp = logInfoGet.wTimeStamp;
691 logInfoBind.originDev = logInfoGet.originDev;
692 } else {
693 return errCode;
694 }
695
696 // bind
697 SQLiteUtils::BindInt64ToStatement(statement, 1, rowid); // 1 means dataKey index
698 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
699 SQLiteUtils::BindBlobToStatement(statement, 2, originDev); // 2 means ori_dev index
700 SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp); // 3 means timestamp index
701 SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimeStamp); // 4 means w_timestamp index
702 SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag); // 5 means flag index
703 SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey); // 6 means hashKey index
704 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
705 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
706 return E_OK;
707 }
708 return errCode;
709 }
710
DeleteSyncDataItem(const DataItem & dataItem,sqlite3_stmt * & stmt)711 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem, sqlite3_stmt *&stmt)
712 {
713 if (stmt == nullptr) {
714 const std::string sql = "DELETE FROM " + table_.GetTableName() + " WHERE rowid IN ("
715 "SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + baseTblName_ + "_log "
716 "WHERE hash_key=? AND device=? AND flag&0x01=0);";
717 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
718 if (errCode != E_OK) {
719 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
720 return errCode;
721 }
722 }
723
724 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
725 if (errCode != E_OK) {
726 SQLiteUtils::ResetStatement(stmt, true, errCode);
727 return errCode;
728 }
729 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
730 if (errCode != E_OK) {
731 SQLiteUtils::ResetStatement(stmt, true, errCode);
732 return errCode;
733 }
734 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
735 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
736 errCode = E_OK;
737 }
738 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
739 return errCode;
740 }
741
SaveSyncDataItem(const DataItem & dataItem,sqlite3_stmt * & saveDataStmt,sqlite3_stmt * & rmDataStmt,const std::vector<FieldInfo> & fieldInfos,int64_t & rowid)742 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, sqlite3_stmt *&saveDataStmt,
743 sqlite3_stmt *&rmDataStmt, const std::vector<FieldInfo> &fieldInfos, int64_t &rowid)
744 {
745 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
746 return DeleteSyncDataItem(dataItem, rmDataStmt);
747 }
748
749 OptRowDataWithLog data;
750 int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, fieldInfos);
751 if (errCode != E_OK) {
752 LOGE("[RelationalStorageExecutor] DeSerialize dataItem failed! errCode = [%d]", errCode);
753 return errCode;
754 }
755
756 if (data.optionalData.size() != table_.GetFields().size()) {
757 LOGW("Remote data has different fields with local data. Remote size:%zu, local size:%zu",
758 data.optionalData.size(), table_.GetFields().size());
759 }
760
761 auto putSize = std::min(data.optionalData.size(), table_.GetFields().size());
762 for (size_t cid = 0; cid < putSize; ++cid) {
763 const auto &fieldData = data.optionalData[cid];
764 errCode = BindDataValueByType(saveDataStmt, fieldData, cid + 1);
765 if (errCode != E_OK) {
766 LOGE("Bind data failed, errCode:%d, cid:%d.", errCode, cid + 1);
767 return errCode;
768 }
769 }
770
771 errCode = SQLiteUtils::StepWithRetry(saveDataStmt, isMemDb_);
772 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
773 rowid = SQLiteUtils::GetLastRowId(dbHandle_);
774 errCode = E_OK;
775 }
776 return errCode;
777 }
778
DeleteSyncLog(const DataItem & dataItem,sqlite3_stmt * & stmt)779 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem, sqlite3_stmt *&stmt)
780 {
781 if (stmt == nullptr) {
782 const std::string sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + baseTblName_ + "_log "
783 "WHERE hash_key=? AND device=?";
784 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
785 if (errCode != E_OK) {
786 LOGE("[DeleteSyncLog] Get statement fail!");
787 return errCode;
788 }
789 }
790
791 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
792 if (errCode != E_OK) {
793 SQLiteUtils::ResetStatement(stmt, true, errCode);
794 return errCode;
795 }
796 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
797 if (errCode != E_OK) {
798 SQLiteUtils::ResetStatement(stmt, true, errCode);
799 return errCode;
800 }
801 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
802 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
803 errCode = E_OK;
804 }
805 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
806 return errCode;
807 }
808
ProcessMissQueryData(const DataItem & item,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)809 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item, sqlite3_stmt *&rmDataStmt,
810 sqlite3_stmt *&rmLogStmt)
811 {
812 int errCode = DeleteSyncDataItem(item, rmDataStmt);
813 if (errCode != E_OK) {
814 return errCode;
815 }
816 return DeleteSyncLog(item, rmLogStmt);
817 }
818
GetSyncDataPre(const DataItem & dataItem,DataItem & itemGet)819 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, DataItem &itemGet)
820 {
821 if (saveStmt_.queryStmt == nullptr) {
822 return -E_INVALID_ARGS;
823 }
824 int errCode = SQLiteUtils::BindBlobToStatement(saveStmt_.queryStmt, 1, dataItem.hashKey); // 1 index for hashkey
825 if (errCode != E_OK) {
826 return errCode;
827 }
828 errCode = SQLiteUtils::BindTextToStatement(saveStmt_.queryStmt, 2, dataItem.dev); // 2 index for devices
829 if (errCode != E_OK) {
830 return errCode;
831 }
832
833 LogInfo logInfoGet;
834 errCode = SQLiteUtils::StepWithRetry(saveStmt_.queryStmt, isMemDb_);
835 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
836 errCode = -E_NOT_FOUND;
837 } else {
838 errCode = GetLogData(saveStmt_.queryStmt, logInfoGet);
839 }
840 itemGet.timeStamp = logInfoGet.timestamp;
841 SQLiteUtils::ResetStatement(saveStmt_.queryStmt, false, errCode);
842 return errCode;
843 }
844
CheckDataConflictDefeated(const DataItem & dataItem,bool & isDefeated)845 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem, bool &isDefeated)
846 {
847 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
848 isDefeated = false; // no need to slove conflict except miss query data
849 return E_OK;
850 }
851
852 DataItem itemGet;
853 int errCode = GetSyncDataPre(dataItem, itemGet);
854 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
855 LOGE("Failed to get raw data. %d", errCode);
856 return errCode;
857 }
858 isDefeated = (dataItem.timeStamp <= itemGet.timeStamp); // defeated if item timestamp is earlier then raw data
859 return E_OK;
860 }
861
SaveSyncDataItem(const std::vector<FieldInfo> & fieldInfos,const std::string & deviceName,DataItem & item,TimeStamp & maxTimestamp)862 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const std::vector<FieldInfo> &fieldInfos,
863 const std::string &deviceName, DataItem &item, TimeStamp &maxTimestamp)
864 {
865 item.dev = deviceName;
866 bool isDefeated = false;
867 int errCode = CheckDataConflictDefeated(item, isDefeated);
868 if (errCode != E_OK) {
869 LOGE("check data conflict failed. %d", errCode);
870 return errCode;
871 }
872
873 if (isDefeated) {
874 LOGD("Data was defeated.");
875 return E_OK;
876 }
877 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
878 return ProcessMissQueryData(item, saveStmt_.rmDataStmt, saveStmt_.rmLogStmt);
879 }
880 int64_t rowid = -1;
881 errCode = SaveSyncDataItem(item, saveStmt_.saveDataStmt, saveStmt_.rmDataStmt, fieldInfos, rowid);
882 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
883 errCode = SaveSyncLog(saveStmt_.saveLogStmt, saveStmt_.queryStmt, item, maxTimestamp, rowid);
884 }
885 return errCode;
886 }
887
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName,TimeStamp & maxTimestamp)888 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(const QueryObject &object,
889 std::vector<DataItem> &dataItems, const std::string &deviceName, TimeStamp &maxTimestamp)
890 {
891 int errCode = PrepareForSavingData(object, saveStmt_.saveDataStmt);
892 if (errCode != E_OK) {
893 return errCode;
894 }
895 errCode = PrepareForSavingLog(object, deviceName, saveStmt_.saveLogStmt, saveStmt_.queryStmt);
896 if (errCode != E_OK) {
897 SQLiteUtils::ResetStatement(saveStmt_.saveDataStmt, true, errCode);
898 return errCode;
899 }
900 std::vector<FieldInfo> fieldInfos;
901 for (const auto &col: table_.GetFields()) {
902 fieldInfos.push_back(col.second);
903 }
904
905 for (auto &item : dataItems) {
906 if (item.neglect) { // Do not save this record if it is neglected
907 continue;
908 }
909 errCode = SaveSyncDataItem(fieldInfos, deviceName, item, maxTimestamp);
910 if (errCode != E_OK) {
911 break;
912 }
913 maxTimestamp = std::max(item.timeStamp, maxTimestamp);
914 // Need not reset rmDataStmt and rmLogStmt here.
915 saveStmt_.ResetStatements(false);
916 }
917 if (errCode == -E_NOT_FOUND) {
918 errCode = E_OK;
919 }
920 saveStmt_.ResetStatements(true);
921 return errCode;
922 }
923
SaveSyncItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName,const TableInfo & table,TimeStamp & timeStamp)924 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(const QueryObject &object, std::vector<DataItem> &dataItems,
925 const std::string &deviceName, const TableInfo &table, TimeStamp &timeStamp)
926 {
927 int errCode = StartTransaction(TransactType::IMMEDIATE);
928 if (errCode != E_OK) {
929 return errCode;
930 }
931 baseTblName_ = object.GetTableName();
932 SetTableInfo(table);
933 const std::string tableName = DBCommon::GetDistributedTableName(deviceName, baseTblName_);
934 table_.SetTableName(tableName);
935 errCode = SaveSyncDataItems(object, dataItems, deviceName, timeStamp);
936 if (errCode == E_OK) {
937 errCode = Commit();
938 } else {
939 (void)Rollback(); // Keep the error code of the first scene
940 }
941 return errCode;
942 }
943
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const944 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
945 bool isGettingDeletedData) const
946 {
947 RowDataWithLog data;
948 int errCode = GetLogData(stmt, data.logInfo);
949 if (errCode != E_OK) {
950 LOGE("relational data value transfer to kv fail");
951 return errCode;
952 }
953
954 if (!isGettingDeletedData) {
955 for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
956 DataValue value;
957 errCode = GetDataValueByType(stmt, value, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM);
958 if (errCode != E_OK) {
959 return errCode;
960 }
961 data.rowData.push_back(std::move(value));
962 }
963 }
964
965 errCode = DataTransformer::SerializeDataItem(data,
966 isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
967 if (errCode != E_OK) {
968 LOGE("relational data value transfer to kv fail");
969 }
970 return errCode;
971 }
972
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)973 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
974 {
975 int errCode = GetDataItemForSync(fullStmt, item, false);
976 if (errCode != E_OK) {
977 return errCode;
978 }
979 item.value = {};
980 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
981 return errCode;
982 }
983
984 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,TimeStamp & timestamp)985 int StepNext(bool isMemDB, sqlite3_stmt *stmt, TimeStamp ×tamp)
986 {
987 if (stmt == nullptr) {
988 return -E_INVALID_ARGS;
989 }
990 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
991 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
992 timestamp = INT64_MAX;
993 errCode = E_OK;
994 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
995 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
996 errCode = E_OK;
997 }
998 return errCode;
999 }
1000
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1001 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1002 std::vector<DataItem> &dataItems, DataItem &&item)
1003 {
1004 // If one record is over 4M, ignore it.
1005 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1006 overLongSize++;
1007 } else {
1008 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1009 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1010 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1011 return -E_UNFINISHED;
1012 } else {
1013 dataItems.push_back(item);
1014 }
1015 }
1016 return E_OK;
1017 }
1018 }
1019
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,TimeStamp & queryTime)1020 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1021 sqlite3_stmt *queryStmt, DataItem &item, TimeStamp &queryTime)
1022 {
1023 if (!isFirstTime) { // For the first time, never step before, can get nothing
1024 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1025 if (errCode != E_OK) {
1026 return errCode;
1027 }
1028 }
1029 return StepNext(isMemDb_, queryStmt, queryTime);
1030 }
1031
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,TimeStamp & missQueryTime)1032 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1033 TimeStamp &missQueryTime)
1034 {
1035 int errCode = GetMissQueryData(fullStmt, item);
1036 if (errCode != E_OK) {
1037 return errCode;
1038 }
1039 return StepNext(isMemDb_, fullStmt, missQueryTime);
1040 }
1041
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1042 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1043 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1044 const TableInfo &tableInfo)
1045 {
1046 baseTblName_ = tableInfo.GetTableName();
1047 SetTableInfo(tableInfo);
1048 sqlite3_stmt *queryStmt = nullptr;
1049 sqlite3_stmt *fullStmt = nullptr;
1050 bool isGettingDeletedData = false;
1051 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1052 if (errCode != E_OK) {
1053 return errCode;
1054 }
1055
1056 TimeStamp queryTime = 0;
1057 TimeStamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1058
1059 bool isFirstTime = true;
1060 size_t dataTotalSize = 0;
1061 size_t overLongSize = 0;
1062 do {
1063 DataItem item;
1064 if (queryTime < missQueryTime) {
1065 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1066 } else if (queryTime == missQueryTime) {
1067 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1068 if (errCode != E_OK) {
1069 break;
1070 }
1071 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1072 } else {
1073 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1074 }
1075 if (errCode != E_OK) {
1076 break;
1077 }
1078
1079 if (!isFirstTime) {
1080 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1081 if (errCode != E_OK) {
1082 break;
1083 }
1084 }
1085
1086 isFirstTime = false;
1087 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1088 errCode = -E_FINISHED;
1089 break;
1090 }
1091 } while (true);
1092 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1093 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1094 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1095 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1096 return errCode;
1097 }
1098
CheckDBModeForRelational()1099 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1100 {
1101 std::string journalMode;
1102 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1103
1104 for (auto &c : journalMode) { // convert to lowercase
1105 c = static_cast<char>(std::tolower(c));
1106 }
1107
1108 if (errCode == E_OK && journalMode != "wal") {
1109 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1110 return -E_NOT_SUPPORT;
1111 }
1112 return errCode;
1113 }
1114
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1115 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1116 const std::string &tableName)
1117 {
1118 std::vector<std::string> deviceTables;
1119 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1120 if (errCode != E_OK) {
1121 LOGE("Get device table name for alter table failed. %d", errCode);
1122 return errCode;
1123 }
1124
1125 LOGD("Begin to delete device table: deviceTable[%d]", deviceTables.size());
1126 for (const auto &table : deviceTables) {
1127 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1128 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1129 if (errCode != E_OK) {
1130 LOGE("Delete device data failed. %d", errCode);
1131 break;
1132 }
1133 }
1134 return errCode;
1135 }
1136
DeleteDistributedLogTable(const std::string & tableName)1137 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1138 {
1139 if (tableName.empty()) {
1140 return -E_INVALID_ARGS;
1141 }
1142 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1143 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1144 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1145 if (errCode != E_OK) {
1146 LOGE("Delete distributed log table failed. %d", errCode);
1147 }
1148 return errCode;
1149 }
1150
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1151 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1152 std::vector<std::string> &missingTables)
1153 {
1154 if (tableNames.empty()) {
1155 return E_OK;
1156 }
1157 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1158 sqlite3_stmt *stmt = nullptr;
1159 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1160 if (errCode != E_OK) {
1161 SQLiteUtils::ResetStatement(stmt, true, errCode);
1162 return errCode;
1163 }
1164 for (const auto &tableName : tableNames) {
1165 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1166 if (errCode != E_OK) {
1167 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1168 break;
1169 }
1170
1171 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1172 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1173 errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1174 if (errCode != E_OK) {
1175 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1176 break;
1177 }
1178 errCode = DeleteDistributedLogTable(tableName);
1179 if (errCode != E_OK) {
1180 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1181 break;
1182 }
1183 missingTables.emplace_back(tableName);
1184 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1185 LOGE("Check distributed table failed. %d", errCode);
1186 break;
1187 }
1188 errCode = E_OK; // Check result ok for distributed table is still exists
1189 SQLiteUtils::ResetStatement(stmt, false, errCode);
1190 }
1191 SQLiteUtils::ResetStatement(stmt, true, errCode);
1192 return CheckCorruptedStatus(errCode);
1193 }
1194
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl)1195 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1196 const TableInfo &baseTbl)
1197 {
1198 if (dbHandle_ == nullptr) {
1199 return -E_INVALID_DB;
1200 }
1201
1202 if (device.empty() || !baseTbl.IsValid()) {
1203 return -E_INVALID_ARGS;
1204 }
1205
1206 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName());
1207 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1208 if (errCode != E_OK) {
1209 LOGE("Create device table failed. %d", errCode);
1210 return errCode;
1211 }
1212
1213 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1214 if (errCode != E_OK) {
1215 LOGE("Copy index to device table failed. %d", errCode);
1216 }
1217 return errCode;
1218 }
1219
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query)1220 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query)
1221 {
1222 if (dbHandle_ == nullptr) {
1223 return -E_INVALID_DB;
1224 }
1225
1226 TableInfo newTable;
1227 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1228 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1229 LOGE("Check new schema failed. %d", errCode);
1230 return errCode;
1231 } else {
1232 errCode = table.CompareWithTable(newTable);
1233 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1234 LOGE("Check schema failed, schema was changed. %d", errCode);
1235 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1236 } else {
1237 errCode = E_OK;
1238 }
1239 }
1240
1241 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1242 if (errCode != E_OK) {
1243 LOGE("Get query helper for check query failed. %d", errCode);
1244 return errCode;
1245 }
1246
1247 if (!query.IsQueryForRelationalDB()) {
1248 LOGE("Not support for this query type.");
1249 return -E_NOT_SUPPORT;
1250 }
1251
1252 SyncTimeRange defaultTimeRange;
1253 sqlite3_stmt *stmt = nullptr;
1254 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1255 stmt);
1256 if (errCode != E_OK) {
1257 LOGE("Get query statement for check query failed. %d", errCode);
1258 }
1259
1260 SQLiteUtils::ResetStatement(stmt, true, errCode);
1261 return errCode;
1262 }
1263
ResetStatements(bool isNeedFinalize)1264 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
1265 {
1266 int errCode = E_OK;
1267 if (saveDataStmt != nullptr) {
1268 SQLiteUtils::ResetStatement(saveDataStmt, isNeedFinalize, errCode);
1269 }
1270 if (saveLogStmt != nullptr) {
1271 SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
1272 }
1273 if (queryStmt != nullptr) {
1274 SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
1275 }
1276 if (rmDataStmt != nullptr) {
1277 SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
1278 }
1279 if (rmLogStmt != nullptr) {
1280 SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
1281 }
1282 return errCode;
1283 }
1284 } // namespace DistributedDB
1285 #endif
1286