• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 #include <algorithm>
18 #include "data_transformer.h"
19 #include "db_common.h"
20 
21 namespace DistributedDB {
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable)22 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable)
23     : SQLiteStorageExecutor(dbHandle, writable, false)
24 {}
25 
CreateDistributedTable(const std::string & tableName,TableInfo & table,bool isUpgrade)26 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(const std::string &tableName, TableInfo &table,
27     bool isUpgrade)
28 {
29     if (dbHandle_ == nullptr) {
30         return -E_INVALID_DB;
31     }
32 
33     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
34     if (errCode != E_OK) {
35         LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
36         return errCode;
37     }
38 
39     if (table.GetCreateTableSql().find("WITHOUT ROWID") != std::string::npos) {
40         LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
41         return -E_NOT_SUPPORT;
42     }
43 
44     bool isTableEmpty = false;
45     errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isTableEmpty);
46     if (errCode != E_OK) {
47         LOGE("[CreateDistributedTable] Check table [%s] is empty failed. %d", tableName.c_str(), errCode);
48         return errCode;
49     }
50 
51     if (!isUpgrade && !isTableEmpty) { // create distributed table should on an empty table
52         LOGE("[CreateDistributedTable] Create distributed table should on an empty table when first create.");
53         return -E_NOT_SUPPORT;
54     }
55 
56     // create log table
57     errCode = SQLiteUtils::CreateRelationalLogTable(dbHandle_, tableName);
58     if (errCode != E_OK) {
59         LOGE("[CreateDistributedTable] create log table failed");
60         return errCode;
61     }
62 
63     // add trigger
64     errCode = SQLiteUtils::AddRelationalLogTableTrigger(dbHandle_, table);
65     if (errCode != E_OK) {
66         LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
67         return errCode;
68     }
69     return E_OK;
70 }
71 
UpgradeDistributedTable(const TableInfo & tableInfo,TableInfo & newTableInfo)72 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const TableInfo &tableInfo,
73     TableInfo &newTableInfo)
74 {
75     if (dbHandle_ == nullptr) {
76         return -E_INVALID_DB;
77     }
78 
79     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableInfo.GetTableName(), newTableInfo);
80     if (errCode != E_OK) {
81         LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
82         return errCode;
83     }
84 
85     if (newTableInfo.GetCreateTableSql().find("WITHOUT ROWID") != std::string::npos) {
86         LOGE("[UpgradeDistributedTable] Not support create distributed table without rowid.");
87         return -E_NOT_SUPPORT;
88     }
89 
90     // new table should has same or compatible upgrade
91     errCode = tableInfo.CompareWithTable(newTableInfo);
92     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
93         LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
94         return -E_SCHEMA_MISMATCH;
95     }
96 
97     errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
98     if (errCode != E_OK) {
99         LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
100     }
101 
102     return errCode;
103 }
104 
105 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)106 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
107     std::vector<std::string> &deviceTables)
108 {
109     if (device.empty() && tableName.empty()) { // device and table name should not both be empty
110         return -E_INVALID_ARGS;
111     }
112     std::string deviceHash = DBCommon::TransferStringToHex(DBCommon::TransferHashString(device));
113     std::string devicePattern = device.empty() ? "%" : deviceHash;
114     std::string tablePattern = tableName.empty() ? "%" : tableName;
115     std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
116 
117     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
118         deviceTableName + "';";
119     sqlite3_stmt *stmt = nullptr;
120     int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
121     if (errCode != E_OK) {
122         SQLiteUtils::ResetStatement(stmt, true, errCode);
123         return errCode;
124     }
125 
126     do {
127         errCode = SQLiteUtils::StepWithRetry(stmt, false);
128         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
129             errCode = E_OK;
130             break;
131         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
132             LOGE("Get table name failed. %d", errCode);
133             break;
134         }
135         std::string realTableName;
136         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
137         if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
138             continue;
139         }
140         if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
141             continue;
142         }
143         deviceTables.emplace_back(realTableName);
144     } while (true);
145 
146     SQLiteUtils::ResetStatement(stmt, true, errCode);
147     return errCode;
148 }
149 
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)150 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
151 {
152     std::vector<FieldInfo> fields;
153     auto itOld = oldTableInfo.GetFields().begin();
154     auto itNew = newTableInfo.GetFields().begin();
155     for (; itNew != newTableInfo.GetFields().end(); itNew++) {
156         if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
157             fields.emplace_back(itNew->second);
158             continue;
159         }
160         itOld++;
161     }
162     return fields;
163 }
164 
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)165 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
166 {
167     if (db == nullptr) {
168         return -E_INVALID_ARGS;
169     }
170 
171     std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
172         return a.GetColumnId()< b.GetColumnId();
173     });
174     int errCode = E_OK;
175     for (const auto &table : tables) {
176         for (const auto &field : fields) {
177             std::string alterSql = "ALTER TABLE " + table + " ADD " + field.GetFieldName() + " " + field.GetDataType();
178             alterSql += field.IsNotNull() ? " NOT NULL" : "";
179             alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
180             alterSql += ";";
181             errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
182             if (errCode != E_OK) {
183                 LOGE("Alter table failed. %d", errCode);
184                 break;
185             }
186         }
187     }
188     return errCode;
189 }
190 
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)191 std::map<std::string, CompositeFields> GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
192 {
193     std::map<std::string, CompositeFields> indexes;
194     auto itOld = oldTableInfo.GetIndexDefine().begin();
195     auto itNew = newTableInfo.GetIndexDefine().begin();
196     auto itOldEnd = oldTableInfo.GetIndexDefine().end();
197     auto itNewEnd = newTableInfo.GetIndexDefine().end();
198 
199     while (itOld != itOldEnd && itNew != itNewEnd) {
200         if (itOld->first == itNew->first) {
201             if (itOld->second != itNew->second) {
202                 indexes.insert({itNew->first, itNew->second});
203             }
204             itOld++;
205             itNew++;
206         } else if (itOld->first < itNew->first) {
207             indexes.insert({itOld->first,{}});
208             itOld++;
209         } else if (itOld->first > itNew->first) {
210             indexes.insert({itNew->first, itNew->second});
211             itNew++;
212         }
213     }
214 
215     while (itOld != itOldEnd) {
216         indexes.insert({itOld->first,{}});
217         itOld++;
218     }
219 
220     while (itNew != itNewEnd) {
221         indexes.insert({itNew->first, itNew->second});
222         itNew++;
223     }
224 
225     return indexes;
226 }
227 
Upgradeindexes(sqlite3 * db,const std::vector<std::string> & tables,const std::map<std::string,CompositeFields> & indexes)228 int Upgradeindexes(sqlite3 *db, const std::vector<std::string> &tables,
229     const std::map<std::string, CompositeFields> &indexes)
230 {
231     if (db == nullptr) {
232         return -E_INVALID_ARGS;
233     }
234 
235     int errCode = E_OK;
236     for (const auto &table : tables) {
237         for (const auto &index : indexes) {
238             if (index.first.empty()) {
239                 continue;
240             }
241             std::string realIndexName = table + "_" + index.first;
242             std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
243             errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
244             if (errCode != E_OK) {
245                 LOGE("Drop index failed. %d", errCode);
246                 return errCode;
247             }
248 
249             if (index.second.empty()) { // empty means drop index only
250                 continue;
251             }
252 
253             auto it = index.second.begin();
254             std::string indexDefine = *it++;
255             while (it != index.second.end()) {
256                 indexDefine += ", " + *it++;
257             }
258             std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
259                 "(" + indexDefine + ");";
260             errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
261             if (errCode != E_OK) {
262                 LOGE("Create index failed. %d", errCode);
263                 break;
264             }
265         }
266     }
267     return errCode;
268 }
269 }
270 
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)271 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
272     const TableInfo &newTableInfo)
273 {
274     std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
275     std::map<std::string, CompositeFields> upgradeIndexces = GetChangedIndexes(oldTableInfo, newTableInfo);
276     std::vector<std::string> deviceTables;
277     int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
278     if (errCode != E_OK) {
279         LOGE("Get device table name for alter table failed. %d", errCode);
280         return errCode;
281     }
282 
283     LOGD("Begin to alter table: upgrade fields[%d], indexces[%d], deviceTable[%d]", upgradeFields.size(),
284         upgradeIndexces.size(), deviceTables.size());
285     errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
286     if (errCode != E_OK) {
287         LOGE("upgrade fields failed. %d", errCode);
288         return errCode;
289     }
290 
291     errCode = Upgradeindexes(dbHandle_, deviceTables, upgradeIndexces);
292     if (errCode != E_OK) {
293         LOGE("upgrade indexes failed. %d", errCode);
294     }
295 
296     return E_OK;
297 }
298 
StartTransaction(TransactType type)299 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
300 {
301     if (dbHandle_ == nullptr) {
302         LOGE("Begin transaction failed, dbHandle is null.");
303         return -E_INVALID_DB;
304     }
305     int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
306     if (errCode != E_OK) {
307         LOGE("Begin transaction failed, errCode = %d", errCode);
308     }
309     return errCode;
310 }
311 
Commit()312 int SQLiteSingleVerRelationalStorageExecutor::Commit()
313 {
314     if (dbHandle_ == nullptr) {
315         return -E_INVALID_DB;
316     }
317 
318     return SQLiteUtils::CommitTransaction(dbHandle_);
319 }
320 
Rollback()321 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
322 {
323     if (dbHandle_ == nullptr) {
324         return -E_INVALID_DB;
325     }
326     int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
327     if (errCode != E_OK) {
328         LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
329     }
330     return errCode;
331 }
332 
SetTableInfo(const TableInfo & tableInfo)333 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
334 {
335     table_ = tableInfo;
336 }
337 
GetDataValueByType(sqlite3_stmt * statement,DataValue & value,int cid)338 static int GetDataValueByType(sqlite3_stmt *statement, DataValue &value, int cid)
339 {
340     int errCode = E_OK;
341     int storageType = sqlite3_column_type(statement, cid);
342     switch (storageType) {
343         case SQLITE_INTEGER: {
344             value = static_cast<int64_t>(sqlite3_column_int64(statement, cid));
345             break;
346         }
347         case SQLITE_FLOAT: {
348             value = sqlite3_column_double(statement, cid);
349             break;
350         }
351         case SQLITE_BLOB: {
352             std::vector<uint8_t> blobValue;
353             errCode = SQLiteUtils::GetColumnBlobValue(statement, cid, blobValue);
354             if (errCode != E_OK) {
355                 return errCode;
356             }
357             auto blob = new (std::nothrow) Blob;
358             if (blob == nullptr) {
359                 return -E_OUT_OF_MEMORY;
360             }
361             blob->WriteBlob(blobValue.data(), static_cast<uint32_t>(blobValue.size()));
362             errCode = value.Set(blob);
363             break;
364         }
365         case SQLITE_NULL: {
366             break;
367         }
368         case SQLITE3_TEXT: {
369             const char *colValue = reinterpret_cast<const char *>(sqlite3_column_text(statement, cid));
370             if (colValue == nullptr) {
371                 value.ResetValue();
372             } else {
373                 value = std::string(colValue);
374                 if (value.GetType() == StorageType::STORAGE_TYPE_NULL) {
375                     errCode = -E_OUT_OF_MEMORY;
376                 }
377             }
378             break;
379         }
380         default: {
381             break;
382         }
383     }
384     return errCode;
385 }
386 
BindDataValueByType(sqlite3_stmt * statement,const std::optional<DataValue> & data,int cid)387 static int BindDataValueByType(sqlite3_stmt *statement, const std::optional<DataValue> &data, int cid)
388 {
389     int errCode = E_OK;
390     StorageType type = data.value().GetType();
391     switch (type) {
392         case StorageType::STORAGE_TYPE_INTEGER: {
393             int64_t intData = 0;
394             (void)data.value().GetInt64(intData);
395             errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int64(statement, cid, intData));
396             break;
397         }
398 
399         case StorageType::STORAGE_TYPE_REAL: {
400             double doubleData = 0;
401             (void)data.value().GetDouble(doubleData);
402             errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(statement, cid, doubleData));
403             break;
404         }
405 
406         case StorageType::STORAGE_TYPE_TEXT: {
407             std::string strData;
408             (void)data.value().GetText(strData);
409             errCode = SQLiteUtils::BindTextToStatement(statement, cid, strData);
410             break;
411         }
412 
413         case StorageType::STORAGE_TYPE_BLOB: {
414             Blob blob;
415             (void)data.value().GetBlob(blob);
416             std::vector<uint8_t> blobData(blob.GetData(), blob.GetData() + blob.GetSize());
417             errCode = SQLiteUtils::BindBlobToStatement(statement, cid, blobData, true);
418             break;
419         }
420 
421         case StorageType::STORAGE_TYPE_NULL: {
422             errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(statement, cid));
423             break;
424         }
425 
426         default:
427             break;
428     }
429     return errCode;
430 }
431 
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)432 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
433 {
434     logInfo.dataKey = sqlite3_column_int64(logStatement, 0);  // 0 means dataKey index
435 
436     std::vector<uint8_t> dev;
437     int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev);  // 1 means dev index
438     if (errCode != E_OK) {
439         return errCode;
440     }
441     logInfo.device = std::string(dev.begin(), dev.end());
442 
443     std::vector<uint8_t> oriDev;
444     errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev);  // 2 means ori_dev index
445     if (errCode != E_OK) {
446         return errCode;
447     }
448     logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
449     logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3));  // 3 means timestamp index
450     logInfo.wTimeStamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4));  // 4 means w_timestamp index
451     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5));  // 5 means flag index
452     logInfo.flag &= (~DataItem::LOCAL_FLAG);
453     logInfo.flag &= (~DataItem::UPDATE_FLAG);
454     return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey);  // 6 means hashKey index
455 }
456 
GetDataItemSerialSize(DataItem & item,size_t appendLen)457 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
458 {
459     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
460     // the size would not be very large.
461     static const size_t maxOrigDevLength = 40;
462     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
463     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
464         Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
465     return dataSize;
466 }
467 
GetKvData(const Key & key,Value & value) const468 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
469 {
470     static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
471         "metadata WHERE key=?;";
472     sqlite3_stmt *statement = nullptr;
473     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
474     if (errCode != E_OK) {
475         goto END;
476     }
477 
478     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
479     if (errCode != E_OK) {
480         goto END;
481     }
482 
483     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
484     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
485         errCode = -E_NOT_FOUND;
486         goto END;
487     } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
488         goto END;
489     }
490 
491     errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
492     END:
493     SQLiteUtils::ResetStatement(statement, true, errCode);
494     return errCode;
495 }
496 
PutKvData(const Key & key,const Value & value) const497 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
498 {
499     static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
500         "metadata VALUES(?,?);";
501     sqlite3_stmt *statement = nullptr;
502     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
503     if (errCode != E_OK) {
504         goto ERROR;
505     }
506 
507     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);  // 1 means key index
508     if (errCode != E_OK) {
509         LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
510         goto ERROR;
511     }
512 
513     errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true);  // 2 means value index
514     if (errCode != E_OK) {
515         LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
516         goto ERROR;
517     }
518     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
519     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
520         errCode = E_OK;
521     }
522 ERROR:
523     SQLiteUtils::ResetStatement(statement, true, errCode);
524     return errCode;
525 }
526 
DeleteMetaData(const std::vector<Key> & keys) const527 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
528 {
529     static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
530         "metadata WHERE key=?;";
531     sqlite3_stmt *statement = nullptr;
532     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
533     if (errCode != E_OK) {
534         return errCode;
535     }
536 
537     for (const auto &key : keys) {
538         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
539         if (errCode != E_OK) {
540             break;
541         }
542 
543         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
544         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
545             break;
546         }
547         errCode = E_OK;
548         SQLiteUtils::ResetStatement(statement, false, errCode);
549     }
550     SQLiteUtils::ResetStatement(statement, true, errCode);
551     return CheckCorruptedStatus(errCode);
552 }
553 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const554 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
555 {
556     static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
557         "metadata WHERE key>=? AND key<=?;";
558     sqlite3_stmt *statement = nullptr;
559     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
560     if (errCode != E_OK) {
561         return errCode;
562     }
563 
564     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
565     if (errCode == E_OK) {
566         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
567         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
568             errCode = E_OK;
569         }
570     }
571     SQLiteUtils::ResetStatement(statement, true, errCode);
572     return CheckCorruptedStatus(errCode);
573 }
574 
GetAllKeys(sqlite3_stmt * statement,std::vector<Key> & keys)575 static int GetAllKeys(sqlite3_stmt *statement, std::vector<Key> &keys)
576 {
577     if (statement == nullptr) {
578         return -E_INVALID_DB;
579     }
580     int errCode;
581     do {
582         errCode = SQLiteUtils::StepWithRetry(statement, false);
583         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
584             Key key;
585             errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, key);
586             if (errCode != E_OK) {
587                 break;
588             }
589 
590             keys.push_back(std::move(key));
591         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
592             errCode = E_OK;
593             break;
594         } else {
595             LOGE("SQLite step for getting all keys failed:%d", errCode);
596             break;
597         }
598     } while (true);
599     return errCode;
600 }
601 
GetAllMetaKeys(std::vector<Key> & keys) const602 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
603 {
604     static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
605     sqlite3_stmt *statement = nullptr;
606     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
607     if (errCode != E_OK) {
608         LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
609         return errCode;
610     }
611     errCode = GetAllKeys(statement, keys);
612     SQLiteUtils::ResetStatement(statement, true, errCode);
613     return errCode;
614 }
615 
PrepareForSavingLog(const QueryObject & object,const std::string & deviceName,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt) const616 int SQLiteSingleVerRelationalStorageExecutor::PrepareForSavingLog(const QueryObject &object,
617     const std::string &deviceName, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt) const
618 {
619     std::string devName = DBCommon::TransferHashString(deviceName);
620     const std::string tableName = DBConstant::RELATIONAL_PREFIX + object.GetTableName() + "_log";
621     std::string dataFormat = "?, '" + deviceName + "', ?, ?, ?, ?, ?";
622     std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key";
623     std::string sql = "INSERT OR REPLACE INTO " + tableName +
624         " (" + columnList + ") VALUES (" + dataFormat + ");";
625     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, logStmt);
626     if (errCode != E_OK) {
627         LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
628         return errCode;
629     }
630     std::string selectSql = "select " + columnList + " from " + tableName + " where hash_key = ? and device = ?;";
631     errCode = SQLiteUtils::GetStatement(dbHandle_, selectSql, queryStmt);
632     if (errCode != E_OK) {
633         SQLiteUtils::ResetStatement(logStmt, true, errCode);
634         LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
635     }
636     return errCode;
637 }
638 
PrepareForSavingData(const QueryObject & object,sqlite3_stmt * & statement) const639 int SQLiteSingleVerRelationalStorageExecutor::PrepareForSavingData(const QueryObject &object,
640     sqlite3_stmt *&statement) const
641 {
642     std::string colName;
643     std::string dataFormat;
644     for (size_t colId = 0; colId < table_.GetFields().size(); ++colId) {
645         colName += table_.GetFieldName(colId) + ",";
646         dataFormat += "?,";
647     }
648     colName.pop_back();
649     dataFormat.pop_back();
650 
651     const std::string sql = "INSERT OR REPLACE INTO " + table_.GetTableName() +
652         " (" + colName + ") VALUES (" + dataFormat + ");";
653     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
654     if (errCode != E_OK) {
655         LOGE("[info statement] Get saving data statement fail! errCode:%d", errCode);
656     }
657     return errCode;
658 }
659 
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,TimeStamp & maxTimestamp,int64_t rowid)660 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
661     const DataItem &dataItem, TimeStamp &maxTimestamp, int64_t rowid)
662 {
663     int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey);  // 1 means hashkey index.
664     if (errCode != E_OK) {
665         return errCode;
666     }
667     errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev);  // 2 means device index.
668     if (errCode != E_OK) {
669         return errCode;
670     }
671 
672     LogInfo logInfoGet;
673     errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
674     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
675         errCode = -E_NOT_FOUND;
676     } else {
677         errCode = GetLogData(queryStmt, logInfoGet);
678     }
679 
680     LogInfo logInfoBind;
681     logInfoBind.hashKey = dataItem.hashKey;
682     logInfoBind.device = dataItem.dev;
683     logInfoBind.timestamp = dataItem.timeStamp;
684     logInfoBind.flag = dataItem.flag;
685     logInfoBind.wTimeStamp = maxTimestamp;
686 
687     if (errCode == -E_NOT_FOUND) { // insert
688         logInfoBind.originDev = dataItem.dev;
689     } else if (errCode == E_OK) { // update
690         logInfoBind.wTimeStamp = logInfoGet.wTimeStamp;
691         logInfoBind.originDev = logInfoGet.originDev;
692     } else {
693         return errCode;
694     }
695 
696     // bind
697     SQLiteUtils::BindInt64ToStatement(statement, 1, rowid);  // 1 means dataKey index
698     std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
699     SQLiteUtils::BindBlobToStatement(statement, 2, originDev);  // 2 means ori_dev index
700     SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp);  // 3 means timestamp index
701     SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimeStamp);  // 4 means w_timestamp index
702     SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag);  // 5 means flag index
703     SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey);  // 6 means hashKey index
704     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
705     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
706         return E_OK;
707     }
708     return errCode;
709 }
710 
DeleteSyncDataItem(const DataItem & dataItem,sqlite3_stmt * & stmt)711 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem, sqlite3_stmt *&stmt)
712 {
713     if (stmt == nullptr) {
714         const std::string sql = "DELETE FROM " + table_.GetTableName() + " WHERE rowid IN ("
715             "SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + baseTblName_ + "_log "
716             "WHERE hash_key=? AND device=? AND flag&0x01=0);";
717         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
718         if (errCode != E_OK) {
719             LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
720             return errCode;
721         }
722     }
723 
724     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
725     if (errCode != E_OK) {
726         SQLiteUtils::ResetStatement(stmt, true, errCode);
727         return errCode;
728     }
729     errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
730     if (errCode != E_OK) {
731         SQLiteUtils::ResetStatement(stmt, true, errCode);
732         return errCode;
733     }
734     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
735     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
736         errCode = E_OK;
737     }
738     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
739     return errCode;
740 }
741 
SaveSyncDataItem(const DataItem & dataItem,sqlite3_stmt * & saveDataStmt,sqlite3_stmt * & rmDataStmt,const std::vector<FieldInfo> & fieldInfos,int64_t & rowid)742 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, sqlite3_stmt *&saveDataStmt,
743     sqlite3_stmt *&rmDataStmt, const std::vector<FieldInfo> &fieldInfos, int64_t &rowid)
744 {
745     if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
746         return DeleteSyncDataItem(dataItem, rmDataStmt);
747     }
748 
749     OptRowDataWithLog data;
750     int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, fieldInfos);
751     if (errCode != E_OK) {
752         LOGE("[RelationalStorageExecutor] DeSerialize dataItem failed! errCode = [%d]", errCode);
753         return errCode;
754     }
755 
756     if (data.optionalData.size() != table_.GetFields().size()) {
757         LOGW("Remote data has different fields with local data. Remote size:%zu, local size:%zu",
758             data.optionalData.size(), table_.GetFields().size());
759     }
760 
761     auto putSize = std::min(data.optionalData.size(), table_.GetFields().size());
762     for (size_t cid = 0; cid < putSize; ++cid) {
763         const auto &fieldData = data.optionalData[cid];
764         errCode = BindDataValueByType(saveDataStmt, fieldData, cid + 1);
765         if (errCode != E_OK) {
766             LOGE("Bind data failed, errCode:%d, cid:%d.", errCode, cid + 1);
767             return errCode;
768         }
769     }
770 
771     errCode = SQLiteUtils::StepWithRetry(saveDataStmt, isMemDb_);
772     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
773         rowid = SQLiteUtils::GetLastRowId(dbHandle_);
774         errCode = E_OK;
775     }
776     return errCode;
777 }
778 
DeleteSyncLog(const DataItem & dataItem,sqlite3_stmt * & stmt)779 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem, sqlite3_stmt *&stmt)
780 {
781     if (stmt == nullptr) {
782         const std::string sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + baseTblName_ + "_log "
783                                 "WHERE hash_key=? AND device=?";
784         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
785         if (errCode != E_OK) {
786             LOGE("[DeleteSyncLog] Get statement fail!");
787             return errCode;
788         }
789     }
790 
791     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
792     if (errCode != E_OK) {
793         SQLiteUtils::ResetStatement(stmt, true, errCode);
794         return errCode;
795     }
796     errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
797     if (errCode != E_OK) {
798         SQLiteUtils::ResetStatement(stmt, true, errCode);
799         return errCode;
800     }
801     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
802     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
803         errCode = E_OK;
804     }
805     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
806     return errCode;
807 }
808 
ProcessMissQueryData(const DataItem & item,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)809 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item, sqlite3_stmt *&rmDataStmt,
810     sqlite3_stmt *&rmLogStmt)
811 {
812     int errCode = DeleteSyncDataItem(item, rmDataStmt);
813     if (errCode != E_OK) {
814         return errCode;
815     }
816     return DeleteSyncLog(item, rmLogStmt);
817 }
818 
GetSyncDataPre(const DataItem & dataItem,DataItem & itemGet)819 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, DataItem &itemGet)
820 {
821     if (saveStmt_.queryStmt == nullptr) {
822         return -E_INVALID_ARGS;
823     }
824     int errCode = SQLiteUtils::BindBlobToStatement(saveStmt_.queryStmt, 1, dataItem.hashKey); // 1 index for hashkey
825     if (errCode != E_OK) {
826         return errCode;
827     }
828     errCode = SQLiteUtils::BindTextToStatement(saveStmt_.queryStmt, 2, dataItem.dev); // 2 index for devices
829     if (errCode != E_OK) {
830         return errCode;
831     }
832 
833     LogInfo logInfoGet;
834     errCode = SQLiteUtils::StepWithRetry(saveStmt_.queryStmt, isMemDb_);
835     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
836         errCode = -E_NOT_FOUND;
837     } else {
838         errCode = GetLogData(saveStmt_.queryStmt, logInfoGet);
839     }
840     itemGet.timeStamp = logInfoGet.timestamp;
841     SQLiteUtils::ResetStatement(saveStmt_.queryStmt, false, errCode);
842     return errCode;
843 }
844 
CheckDataConflictDefeated(const DataItem & dataItem,bool & isDefeated)845 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem, bool &isDefeated)
846 {
847     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
848         isDefeated = false; // no need to slove conflict except miss query data
849         return E_OK;
850     }
851 
852     DataItem itemGet;
853     int errCode = GetSyncDataPre(dataItem, itemGet);
854     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
855         LOGE("Failed to get raw data. %d", errCode);
856         return errCode;
857     }
858     isDefeated = (dataItem.timeStamp <= itemGet.timeStamp); // defeated if item timestamp is earlier then raw data
859     return E_OK;
860 }
861 
SaveSyncDataItem(const std::vector<FieldInfo> & fieldInfos,const std::string & deviceName,DataItem & item,TimeStamp & maxTimestamp)862 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const std::vector<FieldInfo> &fieldInfos,
863     const std::string &deviceName, DataItem &item, TimeStamp &maxTimestamp)
864 {
865     item.dev = deviceName;
866     bool isDefeated = false;
867     int errCode = CheckDataConflictDefeated(item, isDefeated);
868     if (errCode != E_OK) {
869         LOGE("check data conflict failed. %d", errCode);
870         return errCode;
871     }
872 
873     if (isDefeated) {
874         LOGD("Data was defeated.");
875         return E_OK;
876     }
877     if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
878         return ProcessMissQueryData(item, saveStmt_.rmDataStmt, saveStmt_.rmLogStmt);
879     }
880     int64_t rowid = -1;
881     errCode = SaveSyncDataItem(item, saveStmt_.saveDataStmt, saveStmt_.rmDataStmt, fieldInfos, rowid);
882     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
883         errCode = SaveSyncLog(saveStmt_.saveLogStmt, saveStmt_.queryStmt, item, maxTimestamp, rowid);
884     }
885     return errCode;
886 }
887 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName,TimeStamp & maxTimestamp)888 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(const QueryObject &object,
889     std::vector<DataItem> &dataItems, const std::string &deviceName, TimeStamp &maxTimestamp)
890 {
891     int errCode = PrepareForSavingData(object, saveStmt_.saveDataStmt);
892     if (errCode != E_OK) {
893         return errCode;
894     }
895     errCode = PrepareForSavingLog(object, deviceName, saveStmt_.saveLogStmt, saveStmt_.queryStmt);
896     if (errCode != E_OK) {
897         SQLiteUtils::ResetStatement(saveStmt_.saveDataStmt, true, errCode);
898         return errCode;
899     }
900     std::vector<FieldInfo> fieldInfos;
901     for (const auto &col: table_.GetFields()) {
902         fieldInfos.push_back(col.second);
903     }
904 
905     for (auto &item : dataItems) {
906         if (item.neglect) { // Do not save this record if it is neglected
907             continue;
908         }
909         errCode = SaveSyncDataItem(fieldInfos, deviceName, item, maxTimestamp);
910         if (errCode != E_OK) {
911             break;
912         }
913         maxTimestamp = std::max(item.timeStamp, maxTimestamp);
914         // Need not reset rmDataStmt and rmLogStmt here.
915         saveStmt_.ResetStatements(false);
916     }
917     if (errCode == -E_NOT_FOUND) {
918         errCode = E_OK;
919     }
920     saveStmt_.ResetStatements(true);
921     return errCode;
922 }
923 
SaveSyncItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName,const TableInfo & table,TimeStamp & timeStamp)924 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(const QueryObject &object, std::vector<DataItem> &dataItems,
925     const std::string &deviceName, const TableInfo &table, TimeStamp &timeStamp)
926 {
927     int errCode = StartTransaction(TransactType::IMMEDIATE);
928     if (errCode != E_OK) {
929         return errCode;
930     }
931     baseTblName_ = object.GetTableName();
932     SetTableInfo(table);
933     const std::string tableName = DBCommon::GetDistributedTableName(deviceName, baseTblName_);
934     table_.SetTableName(tableName);
935     errCode = SaveSyncDataItems(object, dataItems, deviceName, timeStamp);
936     if (errCode == E_OK) {
937         errCode = Commit();
938     } else {
939         (void)Rollback(); // Keep the error code of the first scene
940     }
941     return errCode;
942 }
943 
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const944 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
945     bool isGettingDeletedData) const
946 {
947     RowDataWithLog data;
948     int errCode = GetLogData(stmt, data.logInfo);
949     if (errCode != E_OK) {
950         LOGE("relational data value transfer to kv fail");
951         return errCode;
952     }
953 
954     if (!isGettingDeletedData) {
955         for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
956             DataValue value;
957             errCode = GetDataValueByType(stmt, value, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM);
958             if (errCode != E_OK) {
959                 return errCode;
960             }
961             data.rowData.push_back(std::move(value));
962         }
963     }
964 
965     errCode = DataTransformer::SerializeDataItem(data,
966         isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
967     if (errCode != E_OK) {
968         LOGE("relational data value transfer to kv fail");
969     }
970     return errCode;
971 }
972 
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)973 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
974 {
975     int errCode = GetDataItemForSync(fullStmt, item, false);
976     if (errCode != E_OK) {
977         return errCode;
978     }
979     item.value = {};
980     item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
981     return errCode;
982 }
983 
984 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,TimeStamp & timestamp)985 int StepNext(bool isMemDB, sqlite3_stmt *stmt, TimeStamp &timestamp)
986 {
987     if (stmt == nullptr) {
988         return -E_INVALID_ARGS;
989     }
990     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
991     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
992         timestamp = INT64_MAX;
993         errCode = E_OK;
994     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
995         timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3));  // 3 means timestamp index
996         errCode = E_OK;
997     }
998     return errCode;
999 }
1000 
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1001 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1002     std::vector<DataItem> &dataItems, DataItem &&item)
1003 {
1004     // If one record is over 4M, ignore it.
1005     if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1006         overLongSize++;
1007     } else {
1008         // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1009         dataTotalSize += GetDataItemSerialSize(item, appendLength);
1010         if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1011             return -E_UNFINISHED;
1012         } else {
1013             dataItems.push_back(item);
1014         }
1015     }
1016     return E_OK;
1017 }
1018 }
1019 
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,TimeStamp & queryTime)1020 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1021     sqlite3_stmt *queryStmt, DataItem &item, TimeStamp &queryTime)
1022 {
1023     if (!isFirstTime) { // For the first time, never step before, can get nothing
1024         int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1025         if (errCode != E_OK) {
1026             return errCode;
1027         }
1028     }
1029     return StepNext(isMemDb_, queryStmt, queryTime);
1030 }
1031 
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,TimeStamp & missQueryTime)1032 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1033     TimeStamp &missQueryTime)
1034 {
1035     int errCode = GetMissQueryData(fullStmt, item);
1036     if (errCode != E_OK) {
1037         return errCode;
1038     }
1039     return StepNext(isMemDb_, fullStmt, missQueryTime);
1040 }
1041 
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1042 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1043     const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1044     const TableInfo &tableInfo)
1045 {
1046     baseTblName_ = tableInfo.GetTableName();
1047     SetTableInfo(tableInfo);
1048     sqlite3_stmt *queryStmt = nullptr;
1049     sqlite3_stmt *fullStmt = nullptr;
1050     bool isGettingDeletedData = false;
1051     int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1052     if (errCode != E_OK) {
1053         return errCode;
1054     }
1055 
1056     TimeStamp queryTime = 0;
1057     TimeStamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1058 
1059     bool isFirstTime = true;
1060     size_t dataTotalSize = 0;
1061     size_t overLongSize = 0;
1062     do {
1063         DataItem item;
1064         if (queryTime < missQueryTime) {
1065             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1066         } else if (queryTime == missQueryTime) {
1067             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1068             if (errCode != E_OK) {
1069                 break;
1070             }
1071             errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1072         } else {
1073             errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1074         }
1075         if (errCode != E_OK) {
1076             break;
1077         }
1078 
1079         if (!isFirstTime) {
1080             errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1081             if (errCode != E_OK) {
1082                 break;
1083             }
1084         }
1085 
1086         isFirstTime = false;
1087         if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1088             errCode = -E_FINISHED;
1089             break;
1090         }
1091     } while (true);
1092     LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1093         errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1094     SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1095     SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1096     return errCode;
1097 }
1098 
CheckDBModeForRelational()1099 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1100 {
1101     std::string journalMode;
1102     int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1103 
1104     for (auto &c : journalMode) { // convert to lowercase
1105         c = static_cast<char>(std::tolower(c));
1106     }
1107 
1108     if (errCode == E_OK && journalMode != "wal") {
1109         LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1110         return -E_NOT_SUPPORT;
1111     }
1112     return errCode;
1113 }
1114 
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1115 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1116     const std::string &tableName)
1117 {
1118     std::vector<std::string> deviceTables;
1119     int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1120     if (errCode != E_OK) {
1121         LOGE("Get device table name for alter table failed. %d", errCode);
1122         return errCode;
1123     }
1124 
1125     LOGD("Begin to delete device table: deviceTable[%d]", deviceTables.size());
1126     for (const auto &table : deviceTables) {
1127         std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1128         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1129         if (errCode != E_OK) {
1130             LOGE("Delete device data failed. %d", errCode);
1131             break;
1132         }
1133     }
1134     return errCode;
1135 }
1136 
DeleteDistributedLogTable(const std::string & tableName)1137 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1138 {
1139     if (tableName.empty()) {
1140         return -E_INVALID_ARGS;
1141     }
1142     std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1143     std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1144     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1145     if (errCode != E_OK) {
1146         LOGE("Delete distributed log table failed. %d", errCode);
1147     }
1148     return errCode;
1149 }
1150 
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1151 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1152     std::vector<std::string> &missingTables)
1153 {
1154     if (tableNames.empty()) {
1155         return E_OK;
1156     }
1157     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1158     sqlite3_stmt *stmt = nullptr;
1159     int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1160     if (errCode != E_OK) {
1161         SQLiteUtils::ResetStatement(stmt, true, errCode);
1162         return errCode;
1163     }
1164     for (const auto &tableName : tableNames) {
1165         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1166         if (errCode != E_OK) {
1167             LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1168             break;
1169         }
1170 
1171         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1172         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1173             errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1174             if (errCode != E_OK) {
1175                 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1176                 break;
1177             }
1178             errCode = DeleteDistributedLogTable(tableName);
1179             if (errCode != E_OK) {
1180                 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1181                 break;
1182             }
1183             missingTables.emplace_back(tableName);
1184         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1185             LOGE("Check distributed table failed. %d", errCode);
1186             break;
1187         }
1188         errCode = E_OK; // Check result ok for distributed table is still exists
1189         SQLiteUtils::ResetStatement(stmt, false, errCode);
1190     }
1191     SQLiteUtils::ResetStatement(stmt, true, errCode);
1192     return CheckCorruptedStatus(errCode);
1193 }
1194 
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl)1195 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1196     const TableInfo &baseTbl)
1197 {
1198     if (dbHandle_ == nullptr) {
1199         return -E_INVALID_DB;
1200     }
1201 
1202     if (device.empty() || !baseTbl.IsValid()) {
1203         return -E_INVALID_ARGS;
1204     }
1205 
1206     std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName());
1207     int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1208     if (errCode != E_OK) {
1209         LOGE("Create device table failed. %d", errCode);
1210         return errCode;
1211     }
1212 
1213     errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1214     if (errCode != E_OK) {
1215         LOGE("Copy index to device table failed. %d", errCode);
1216     }
1217     return errCode;
1218 }
1219 
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query)1220 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query)
1221 {
1222     if (dbHandle_ == nullptr) {
1223         return -E_INVALID_DB;
1224     }
1225 
1226     TableInfo newTable;
1227     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1228     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1229         LOGE("Check new schema failed. %d", errCode);
1230         return errCode;
1231     } else {
1232         errCode = table.CompareWithTable(newTable);
1233         if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1234             LOGE("Check schema failed, schema was changed. %d", errCode);
1235             return -E_DISTRIBUTED_SCHEMA_CHANGED;
1236         } else {
1237             errCode = E_OK;
1238         }
1239     }
1240 
1241     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1242     if (errCode != E_OK) {
1243         LOGE("Get query helper for check query failed. %d", errCode);
1244         return errCode;
1245     }
1246 
1247     if (!query.IsQueryForRelationalDB()) {
1248         LOGE("Not support for this query type.");
1249         return -E_NOT_SUPPORT;
1250     }
1251 
1252     SyncTimeRange defaultTimeRange;
1253     sqlite3_stmt *stmt = nullptr;
1254     errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1255         stmt);
1256     if (errCode != E_OK) {
1257         LOGE("Get query statement for check query failed. %d", errCode);
1258     }
1259 
1260     SQLiteUtils::ResetStatement(stmt, true, errCode);
1261     return errCode;
1262 }
1263 
ResetStatements(bool isNeedFinalize)1264 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
1265 {
1266     int errCode = E_OK;
1267     if (saveDataStmt != nullptr) {
1268         SQLiteUtils::ResetStatement(saveDataStmt, isNeedFinalize, errCode);
1269     }
1270     if (saveLogStmt != nullptr) {
1271         SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
1272     }
1273     if (queryStmt != nullptr) {
1274         SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
1275     }
1276     if (rmDataStmt != nullptr) {
1277         SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
1278     }
1279     if (rmLogStmt != nullptr) {
1280         SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
1281     }
1282     return errCode;
1283 }
1284 } // namespace DistributedDB
1285 #endif
1286