• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "relational_sync_data_inserter.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "data_transformer.h"
19 #include "db_common.h"
20 #include "sqlite_relational_utils.h"
21 #include "sqlite_utils.h"
22 
23 namespace DistributedDB {
ResetStatements(bool isNeedFinalize)24 int SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
25 {
26     int errCode = E_OK;
27     if (insertDataStmt != nullptr) {
28         SQLiteUtils::ResetStatement(insertDataStmt, isNeedFinalize, errCode);
29     }
30     if (updateDataStmt != nullptr) {
31         SQLiteUtils::ResetStatement(updateDataStmt, isNeedFinalize, errCode);
32     }
33     if (saveLogStmt != nullptr) {
34         SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
35     }
36     if (queryStmt != nullptr) {
37         SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
38     }
39     if (rmDataStmt != nullptr) {
40         SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
41     }
42     if (rmLogStmt != nullptr) {
43         SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
44     }
45     return errCode;
46 }
47 
CreateInserter(const std::string & deviceName,const QueryObject & query,const RelationalSchemaObject & localSchema,const std::vector<FieldInfo> & remoteFields,const StoreInfo & info)48 RelationalSyncDataInserter RelationalSyncDataInserter::CreateInserter(const std::string &deviceName,
49     const QueryObject &query, const RelationalSchemaObject &localSchema, const std::vector<FieldInfo> &remoteFields,
50     const StoreInfo &info)
51 {
52     RelationalSyncDataInserter inserter;
53     inserter.SetHashDevId(DBCommon::TransferStringToHex(DBCommon::TransferHashString(deviceName)));
54     inserter.SetRemoteFields(remoteFields);
55     inserter.SetQuery(query);
56     TableInfo localTable = localSchema.GetTable(query.GetTableName());
57     localTable.SetDistributedTable(localSchema.GetDistributedTable(query.GetTableName()));
58     inserter.SetLocalTable(localTable);
59     inserter.SetTableMode(localSchema.GetTableMode());
60     if (localSchema.GetTableMode() == DistributedTableMode::COLLABORATION) {
61         inserter.SetInsertTableName(localTable.GetTableName());
62     } else {
63         inserter.SetInsertTableName(DBCommon::GetDistributedTableName(deviceName, localTable.GetTableName(), info));
64     }
65     return inserter;
66 }
67 
SetHashDevId(const std::string & hashDevId)68 void RelationalSyncDataInserter::SetHashDevId(const std::string &hashDevId)
69 {
70     hashDevId_ = hashDevId;
71 }
72 
SetRemoteFields(std::vector<FieldInfo> remoteFields)73 void RelationalSyncDataInserter::SetRemoteFields(std::vector<FieldInfo> remoteFields)
74 {
75     remoteFields_ = std::move(remoteFields);
76 }
77 
SetEntries(std::vector<DataItem> entries)78 void RelationalSyncDataInserter::SetEntries(std::vector<DataItem> entries)
79 {
80     entries_ = std::move(entries);
81 }
82 
SetLocalTable(TableInfo localTable)83 void RelationalSyncDataInserter::SetLocalTable(TableInfo localTable)
84 {
85     localTable_ = std::move(localTable);
86 }
87 
GetLocalTable() const88 const TableInfo &RelationalSyncDataInserter::GetLocalTable() const
89 {
90     return localTable_;
91 }
92 
SetQuery(QueryObject query)93 void RelationalSyncDataInserter::SetQuery(QueryObject query)
94 {
95     query_ = std::move(query);
96 }
97 
SetInsertTableName(std::string tableName)98 void RelationalSyncDataInserter::SetInsertTableName(std::string tableName)
99 {
100     insertTableName_ = std::move(tableName);
101 }
102 
SetTableMode(DistributedTableMode mode)103 void RelationalSyncDataInserter::SetTableMode(DistributedTableMode mode)
104 {
105     mode_ = mode;
106 }
107 
GetInsertStatement(sqlite3 * db,sqlite3_stmt * & stmt)108 int RelationalSyncDataInserter::GetInsertStatement(sqlite3 *db, sqlite3_stmt *&stmt)
109 {
110     if (stmt != nullptr) {
111         return -E_INVALID_ARGS;
112     }
113 
114     const auto &localTableFields = localTable_.GetFields();
115     std::string colName;
116     std::string dataFormat;
117     for (const auto &it : remoteFields_) {
118         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
119             continue; // skip fields which is orphaned in remote
120         }
121         colName += "'" + it.GetFieldName() + "',";
122         dataFormat += "?,";
123     }
124     colName.pop_back();
125     dataFormat.pop_back();
126     std::string sql = "INSERT ";
127     if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
128         sql += "OR REPLACE ";
129     }
130     sql += "INTO '" + insertTableName_ + "'" +
131         "(" + colName + ") VALUES(" + dataFormat + ");";
132     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
133     if (errCode != E_OK) {
134         LOGE("Get insert data statement fail! errCode:%d", errCode);
135     }
136     return errCode;
137 }
138 
SaveData(bool isUpdate,const DataItem & dataItem,SaveSyncDataStmt & saveSyncDataStmt,std::map<std::string,Type> & pkVals)139 int RelationalSyncDataInserter::SaveData(bool isUpdate, const DataItem &dataItem,
140     SaveSyncDataStmt &saveSyncDataStmt, std::map<std::string, Type> &pkVals)
141 {
142     sqlite3_stmt *&stmt = isUpdate ? saveSyncDataStmt.updateDataStmt : saveSyncDataStmt.insertDataStmt;
143     std::set<std::string> filterSet;
144     if (isUpdate) {
145         for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
146             filterSet.insert(primaryKey);
147         }
148         auto distributedPk = localTable_.GetSyncDistributedPk();
149         if (!distributedPk.empty()) {
150             filterSet.insert(distributedPk.begin(), distributedPk.end());
151         }
152     }
153     if (stmt == nullptr) {
154         LOGW("skip save data %s", DBCommon::StringMiddleMasking(DBCommon::VectorToHexString(dataItem.hashKey)).c_str());
155         return E_OK;
156     }
157 
158     int errCode = BindSaveDataStatement(isUpdate, dataItem, filterSet, stmt, pkVals);
159     if (errCode != E_OK) {
160         LOGE("Bind data failed, errCode=%d.", errCode);
161         int ret = E_OK;
162         SQLiteUtils::ResetStatement(stmt, false, ret);
163         return errCode;
164     }
165 
166     errCode = SQLiteUtils::StepWithRetry(stmt, false);
167     int ret = E_OK;
168     SQLiteUtils::ResetStatement(stmt, false, true, ret);
169     return errCode;
170 }
171 
BindSaveDataStatement(bool isExist,const DataItem & dataItem,const std::set<std::string> & filterSet,sqlite3_stmt * stmt,std::map<std::string,Type> & pkVals)172 int RelationalSyncDataInserter::BindSaveDataStatement(bool isExist, const DataItem &dataItem,
173     const std::set<std::string> &filterSet, sqlite3_stmt *stmt, std::map<std::string, Type> &pkVals)
174 {
175     OptRowDataWithLog data;
176     // deserialize by remote field info
177     int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, remoteFields_);
178     if (errCode != E_OK) {
179         LOGE("DeSerialize dataItem failed! errCode = [%d]", errCode);
180         return errCode;
181     }
182 
183     size_t dataIdx = 0;
184     int bindIdx = 1;
185     const auto &localTableFields = localTable_.GetFields();
186     for (const auto &it : remoteFields_) {
187         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
188             LOGD("field %s[%zu] not found in local schema.", DBCommon::StringMiddleMasking(it.GetFieldName()).c_str(),
189                 it.GetFieldName().size());
190             dataIdx++;
191             continue; // skip fields which is orphaned in remote
192         }
193         if (localTable_.IsPrimaryKey(it.GetFieldName())) {
194             Type pkVal;
195             CloudStorageUtils::SaveChangedDataByType(data.optionalData[dataIdx], pkVal);
196             pkVals[it.GetFieldName()] = pkVal;
197         }
198         if (filterSet.find(it.GetFieldName()) != filterSet.end()) {
199             dataIdx++;
200             continue; // skip fields when update
201         }
202         if (dataIdx >= data.optionalData.size()) {
203             LOGD("field over size. cnt:%d, data size:%d", dataIdx, data.optionalData.size());
204             break; // cnt should less than optionalData size.
205         }
206         errCode = SQLiteUtils::BindDataValueByType(stmt, data.optionalData[dataIdx], bindIdx++);
207         if (errCode != E_OK) {
208             LOGE("Bind data failed, errCode:%d, cid:%zu.", errCode, dataIdx);
209             return errCode;
210         }
211         dataIdx++;
212     }
213     return isExist ? BindHashKeyAndDev(dataItem, stmt, bindIdx) : E_OK;
214 }
215 
GetDeleteLogStmt(sqlite3 * db,sqlite3_stmt * & stmt)216 int RelationalSyncDataInserter::GetDeleteLogStmt(sqlite3 *db, sqlite3_stmt *&stmt)
217 {
218     std::string sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + localTable_.GetTableName() +
219         "_log ";
220     if (mode_ == DistributedTableMode::COLLABORATION) {
221         sql += "WHERE hash_key=?";
222     } else {
223         sql += "WHERE hash_key=? AND device=?";
224     }
225 
226     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
227     if (errCode != E_OK) {
228         LOGE("[DeleteSyncLog] Get statement fail!");
229     }
230     return errCode;
231 }
232 
GetDeleteSyncDataStmt(sqlite3 * db,sqlite3_stmt * & stmt)233 int RelationalSyncDataInserter::GetDeleteSyncDataStmt(sqlite3 *db, sqlite3_stmt *&stmt)
234 {
235     std::string sql = "DELETE FROM '" + insertTableName_ + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
236         " IN (SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
237     if (mode_ == DistributedTableMode::COLLABORATION) {
238         sql += "WHERE hash_key=?);";
239     } else {
240         sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
241     }
242     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
243     if (errCode != E_OK) {
244         LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
245     }
246     return errCode;
247 }
248 
GetSaveLogStatement(sqlite3 * db,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt)249 int RelationalSyncDataInserter::GetSaveLogStatement(sqlite3 *db, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt)
250 {
251     std::string conflictPk;
252     std::string selCondition;
253     if (mode_ == DistributedTableMode::COLLABORATION) {
254         conflictPk = "ON CONFLICT(hash_key)";
255         selCondition = " WHERE hash_key = ?;";
256     } else {
257         conflictPk = "ON CONFLICT(hash_key, device)";
258         selCondition = " WHERE hash_key = ? AND device = ?;";
259     }
260     const std::string tableName = DBConstant::RELATIONAL_PREFIX + query_.GetTableName() + "_log";
261     std::string dataFormat = "?, '" + hashDevId_ + "', ?, ?, ?, ?, ?";
262     std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key";
263     std::string sql = "INSERT INTO " + tableName +
264         " (" + columnList + ", cursor) VALUES (" + dataFormat + "," +
265         CloudStorageUtils::GetSelectIncCursorSql(query_.GetTableName()) +") " + conflictPk +
266         " DO UPDATE SET data_key = excluded.data_key, device = excluded.device,"
267         " ori_device = excluded.ori_device, timestamp = excluded.timestamp, wtimestamp = excluded.wtimestamp,"
268         " flag = excluded.flag, cursor = excluded.cursor;";
269     int errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
270     if (errCode != E_OK) {
271         LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
272         return errCode;
273     }
274     std::string selectSql = "SELECT " + columnList + " FROM " + tableName + selCondition;
275     errCode = SQLiteUtils::GetStatement(db, selectSql, queryStmt);
276     if (errCode != E_OK) {
277         SQLiteUtils::ResetStatement(logStmt, true, errCode);
278         LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
279     }
280     return errCode;
281 }
282 
PrepareStatement(sqlite3 * db,SaveSyncDataStmt & stmt)283 int RelationalSyncDataInserter::PrepareStatement(sqlite3 *db, SaveSyncDataStmt &stmt)
284 {
285     int errCode = GetSaveLogStatement(db, stmt.saveLogStmt, stmt.queryStmt);
286     if (errCode != E_OK) {
287         LOGE("Get save log statement failed. err=%d", errCode);
288         return errCode;
289     }
290     errCode = GetInsertStatement(db, stmt.insertDataStmt);
291     if (errCode != E_OK) {
292         LOGE("Get insert statement failed. err=%d", errCode);
293         return errCode;
294     }
295     errCode = GetUpdateStatement(db, stmt.updateDataStmt);
296     if (errCode != E_OK) {
297         LOGE("Get update statement failed. err=%d", errCode);
298     }
299     return errCode;
300 }
301 
Iterate(const std::function<int (DataItem &)> & saveSyncDataItem)302 int RelationalSyncDataInserter::Iterate(const std::function<int (DataItem &)> &saveSyncDataItem)
303 {
304     int errCode = E_OK;
305     for (auto &it : entries_) {
306         it.dev = hashDevId_;
307         int ret = saveSyncDataItem(it);
308         errCode = errCode == E_OK ? ret : errCode;
309     }
310     return errCode;
311 }
312 
GetUpdateStatement(sqlite3 * db,sqlite3_stmt * & stmt)313 int RelationalSyncDataInserter::GetUpdateStatement(sqlite3 *db, sqlite3_stmt *&stmt)
314 {
315     if (stmt != nullptr) {
316         return -E_INVALID_ARGS;
317     }
318 
319     std::set<std::string> identifyKeySet;
320     for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
321         identifyKeySet.insert(primaryKey);
322     }
323     auto distributedPk = localTable_.GetSyncDistributedPk();
324     if (!distributedPk.empty()) {
325         identifyKeySet.insert(distributedPk.begin(), distributedPk.end());
326     }
327     std::string updateValue;
328     const auto &localTableFields = localTable_.GetFields();
329     for (const auto &it : remoteFields_) {
330         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
331             continue; // skip fields which is orphaned in remote
332         }
333         if (identifyKeySet.find(it.GetFieldName()) == identifyKeySet.end()) {
334             if (updateValue.empty()) {
335                 updateValue.append(" SET ");
336             } else {
337                 updateValue.append(", ");
338             }
339             updateValue.append("'").append(it.GetFieldName()).append("'=?");
340         }
341     }
342     if (updateValue.empty()) {
343         // only sync pk no need update
344         return E_OK;
345     }
346     std::string sql = "UPDATE '" + insertTableName_ + "'" + updateValue + " WHERE " +
347         std::string(DBConstant::SQLITE_INNER_ROWID) + " IN (SELECT data_key FROM " +
348         DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
349     if (mode_ == DistributedTableMode::COLLABORATION) {
350         sql += "WHERE hash_key=?);";
351     } else {
352         sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
353     }
354     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
355     if (errCode != E_OK) {
356         LOGE("Get update data statement fail! errCode:%d", errCode);
357     }
358     return errCode;
359 }
360 
BindHashKeyAndDev(const DataItem & dataItem,sqlite3_stmt * stmt,int beginIndex)361 int RelationalSyncDataInserter::BindHashKeyAndDev(const DataItem &dataItem, sqlite3_stmt *stmt,
362     int beginIndex)
363 {
364     int errCode = SQLiteUtils::BindBlobToStatement(stmt, beginIndex++, dataItem.hashKey);
365     if (errCode != E_OK) {
366         LOGE("[RelationalSyncDataInserter] bind hash key failed %d", errCode);
367         return errCode;
368     }
369     if (mode_ != DistributedTableMode::COLLABORATION) {
370         errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, dataItem.dev);
371         if (errCode != E_OK) {
372             LOGE("[RelationalSyncDataInserter] bind dev failed %d", errCode);
373         }
374     }
375     return errCode;
376 }
377 
SaveSyncLog(sqlite3 * db,sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)378 int RelationalSyncDataInserter::SaveSyncLog(sqlite3 *db, sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
379     const DataItem &dataItem, int64_t rowid)
380 {
381     LogInfo logInfoGet;
382     int errCode = SQLiteRelationalUtils::GetLogInfoPre(queryStmt, mode_, dataItem, logInfoGet);
383     LogInfo logInfoBind;
384     logInfoBind.hashKey = dataItem.hashKey;
385     logInfoBind.device = dataItem.dev;
386     logInfoBind.timestamp = dataItem.timestamp;
387     logInfoBind.flag = dataItem.flag;
388 
389     if (errCode == -E_NOT_FOUND) { // insert
390         logInfoBind.wTimestamp = dataItem.writeTimestamp;
391         logInfoBind.originDev = dataItem.dev;
392     } else if (errCode == E_OK) { // update
393         logInfoBind.wTimestamp = logInfoGet.wTimestamp;
394         logInfoBind.originDev = logInfoGet.originDev;
395     } else {
396         LOGE("[RelationalSyncDataInserter] get log info failed %d", errCode);
397         return errCode;
398     }
399 
400     // bind
401     SQLiteUtils::BindInt64ToStatement(statement, 1, rowid);  // 1 means dataKey index
402     std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
403     SQLiteUtils::BindBlobToStatement(statement, 2, originDev);  // 2 means ori_dev index
404     SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp);  // 3 means timestamp index
405     SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp);  // 4 means w_timestamp index
406     SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag);  // 5 means flag index
407     SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey);  // 6 means hashKey index
408     errCode = SQLiteUtils::StepWithRetry(statement, false);
409     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
410         return E_OK;
411     }
412     return errCode;
413 }
414 
GetChangedData()415 ChangedData &RelationalSyncDataInserter::GetChangedData()
416 {
417     return data_;
418 }
419 } // namespace DistributedDB