1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "relational_sync_data_inserter.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "data_transformer.h"
19 #include "db_common.h"
20 #include "sqlite_relational_utils.h"
21 #include "sqlite_utils.h"
22
23 namespace DistributedDB {
ResetStatements(bool isNeedFinalize)24 int SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
25 {
26 int errCode = E_OK;
27 if (insertDataStmt != nullptr) {
28 SQLiteUtils::ResetStatement(insertDataStmt, isNeedFinalize, errCode);
29 }
30 if (updateDataStmt != nullptr) {
31 SQLiteUtils::ResetStatement(updateDataStmt, isNeedFinalize, errCode);
32 }
33 if (saveLogStmt != nullptr) {
34 SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
35 }
36 if (queryStmt != nullptr) {
37 SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
38 }
39 if (rmDataStmt != nullptr) {
40 SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
41 }
42 if (rmLogStmt != nullptr) {
43 SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
44 }
45 return errCode;
46 }
47
CreateInserter(const std::string & deviceName,const QueryObject & query,const RelationalSchemaObject & localSchema,const std::vector<FieldInfo> & remoteFields,const StoreInfo & info)48 RelationalSyncDataInserter RelationalSyncDataInserter::CreateInserter(const std::string &deviceName,
49 const QueryObject &query, const RelationalSchemaObject &localSchema, const std::vector<FieldInfo> &remoteFields,
50 const StoreInfo &info)
51 {
52 RelationalSyncDataInserter inserter;
53 inserter.SetHashDevId(DBCommon::TransferStringToHex(DBCommon::TransferHashString(deviceName)));
54 inserter.SetRemoteFields(remoteFields);
55 inserter.SetQuery(query);
56 TableInfo localTable = localSchema.GetTable(query.GetTableName());
57 localTable.SetDistributedTable(localSchema.GetDistributedTable(query.GetTableName()));
58 inserter.SetLocalTable(localTable);
59 inserter.SetTableMode(localSchema.GetTableMode());
60 if (localSchema.GetTableMode() == DistributedTableMode::COLLABORATION) {
61 inserter.SetInsertTableName(localTable.GetTableName());
62 } else {
63 inserter.SetInsertTableName(DBCommon::GetDistributedTableName(deviceName, localTable.GetTableName(), info));
64 }
65 return inserter;
66 }
67
SetHashDevId(const std::string & hashDevId)68 void RelationalSyncDataInserter::SetHashDevId(const std::string &hashDevId)
69 {
70 hashDevId_ = hashDevId;
71 }
72
SetRemoteFields(std::vector<FieldInfo> remoteFields)73 void RelationalSyncDataInserter::SetRemoteFields(std::vector<FieldInfo> remoteFields)
74 {
75 remoteFields_ = std::move(remoteFields);
76 }
77
SetEntries(std::vector<DataItem> entries)78 void RelationalSyncDataInserter::SetEntries(std::vector<DataItem> entries)
79 {
80 entries_ = std::move(entries);
81 }
82
SetLocalTable(TableInfo localTable)83 void RelationalSyncDataInserter::SetLocalTable(TableInfo localTable)
84 {
85 localTable_ = std::move(localTable);
86 }
87
GetLocalTable() const88 const TableInfo &RelationalSyncDataInserter::GetLocalTable() const
89 {
90 return localTable_;
91 }
92
SetQuery(QueryObject query)93 void RelationalSyncDataInserter::SetQuery(QueryObject query)
94 {
95 query_ = std::move(query);
96 }
97
SetInsertTableName(std::string tableName)98 void RelationalSyncDataInserter::SetInsertTableName(std::string tableName)
99 {
100 insertTableName_ = std::move(tableName);
101 }
102
SetTableMode(DistributedTableMode mode)103 void RelationalSyncDataInserter::SetTableMode(DistributedTableMode mode)
104 {
105 mode_ = mode;
106 }
107
GetInsertStatement(sqlite3 * db,sqlite3_stmt * & stmt)108 int RelationalSyncDataInserter::GetInsertStatement(sqlite3 *db, sqlite3_stmt *&stmt)
109 {
110 if (stmt != nullptr) {
111 return -E_INVALID_ARGS;
112 }
113
114 const auto &localTableFields = localTable_.GetFields();
115 std::string colName;
116 std::string dataFormat;
117 for (const auto &it : remoteFields_) {
118 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
119 continue; // skip fields which is orphaned in remote
120 }
121 colName += "'" + it.GetFieldName() + "',";
122 dataFormat += "?,";
123 }
124 colName.pop_back();
125 dataFormat.pop_back();
126 std::string sql = "INSERT ";
127 if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
128 sql += "OR REPLACE ";
129 }
130 sql += "INTO '" + insertTableName_ + "'" +
131 "(" + colName + ") VALUES(" + dataFormat + ");";
132 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
133 if (errCode != E_OK) {
134 LOGE("Get insert data statement fail! errCode:%d", errCode);
135 }
136 return errCode;
137 }
138
SaveData(bool isUpdate,const DataItem & dataItem,SaveSyncDataStmt & saveSyncDataStmt,std::map<std::string,Type> & pkVals)139 int RelationalSyncDataInserter::SaveData(bool isUpdate, const DataItem &dataItem,
140 SaveSyncDataStmt &saveSyncDataStmt, std::map<std::string, Type> &pkVals)
141 {
142 sqlite3_stmt *&stmt = isUpdate ? saveSyncDataStmt.updateDataStmt : saveSyncDataStmt.insertDataStmt;
143 std::set<std::string> filterSet;
144 if (isUpdate) {
145 for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
146 filterSet.insert(primaryKey);
147 }
148 auto distributedPk = localTable_.GetSyncDistributedPk();
149 if (!distributedPk.empty()) {
150 filterSet.insert(distributedPk.begin(), distributedPk.end());
151 }
152 }
153 if (stmt == nullptr) {
154 LOGW("skip save data %s", DBCommon::StringMiddleMasking(DBCommon::VectorToHexString(dataItem.hashKey)).c_str());
155 return E_OK;
156 }
157
158 int errCode = BindSaveDataStatement(isUpdate, dataItem, filterSet, stmt, pkVals);
159 if (errCode != E_OK) {
160 LOGE("Bind data failed, errCode=%d.", errCode);
161 int ret = E_OK;
162 SQLiteUtils::ResetStatement(stmt, false, ret);
163 return errCode;
164 }
165
166 errCode = SQLiteUtils::StepWithRetry(stmt, false);
167 int ret = E_OK;
168 SQLiteUtils::ResetStatement(stmt, false, true, ret);
169 return errCode;
170 }
171
BindSaveDataStatement(bool isExist,const DataItem & dataItem,const std::set<std::string> & filterSet,sqlite3_stmt * stmt,std::map<std::string,Type> & pkVals)172 int RelationalSyncDataInserter::BindSaveDataStatement(bool isExist, const DataItem &dataItem,
173 const std::set<std::string> &filterSet, sqlite3_stmt *stmt, std::map<std::string, Type> &pkVals)
174 {
175 OptRowDataWithLog data;
176 // deserialize by remote field info
177 int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, remoteFields_);
178 if (errCode != E_OK) {
179 LOGE("DeSerialize dataItem failed! errCode = [%d]", errCode);
180 return errCode;
181 }
182
183 size_t dataIdx = 0;
184 int bindIdx = 1;
185 const auto &localTableFields = localTable_.GetFields();
186 for (const auto &it : remoteFields_) {
187 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
188 LOGD("field %s[%zu] not found in local schema.", DBCommon::StringMiddleMasking(it.GetFieldName()).c_str(),
189 it.GetFieldName().size());
190 dataIdx++;
191 continue; // skip fields which is orphaned in remote
192 }
193 if (localTable_.IsPrimaryKey(it.GetFieldName())) {
194 Type pkVal;
195 CloudStorageUtils::SaveChangedDataByType(data.optionalData[dataIdx], pkVal);
196 pkVals[it.GetFieldName()] = pkVal;
197 }
198 if (filterSet.find(it.GetFieldName()) != filterSet.end()) {
199 dataIdx++;
200 continue; // skip fields when update
201 }
202 if (dataIdx >= data.optionalData.size()) {
203 LOGD("field over size. cnt:%d, data size:%d", dataIdx, data.optionalData.size());
204 break; // cnt should less than optionalData size.
205 }
206 errCode = SQLiteUtils::BindDataValueByType(stmt, data.optionalData[dataIdx], bindIdx++);
207 if (errCode != E_OK) {
208 LOGE("Bind data failed, errCode:%d, cid:%zu.", errCode, dataIdx);
209 return errCode;
210 }
211 dataIdx++;
212 }
213 return isExist ? BindHashKeyAndDev(dataItem, stmt, bindIdx) : E_OK;
214 }
215
GetDeleteLogStmt(sqlite3 * db,sqlite3_stmt * & stmt)216 int RelationalSyncDataInserter::GetDeleteLogStmt(sqlite3 *db, sqlite3_stmt *&stmt)
217 {
218 std::string sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + localTable_.GetTableName() +
219 "_log ";
220 if (mode_ == DistributedTableMode::COLLABORATION) {
221 sql += "WHERE hash_key=?";
222 } else {
223 sql += "WHERE hash_key=? AND device=?";
224 }
225
226 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
227 if (errCode != E_OK) {
228 LOGE("[DeleteSyncLog] Get statement fail!");
229 }
230 return errCode;
231 }
232
GetDeleteSyncDataStmt(sqlite3 * db,sqlite3_stmt * & stmt)233 int RelationalSyncDataInserter::GetDeleteSyncDataStmt(sqlite3 *db, sqlite3_stmt *&stmt)
234 {
235 std::string sql = "DELETE FROM '" + insertTableName_ + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
236 " IN (SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
237 if (mode_ == DistributedTableMode::COLLABORATION) {
238 sql += "WHERE hash_key=?);";
239 } else {
240 sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
241 }
242 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
243 if (errCode != E_OK) {
244 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
245 }
246 return errCode;
247 }
248
GetSaveLogStatement(sqlite3 * db,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt)249 int RelationalSyncDataInserter::GetSaveLogStatement(sqlite3 *db, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt)
250 {
251 std::string conflictPk;
252 std::string selCondition;
253 if (mode_ == DistributedTableMode::COLLABORATION) {
254 conflictPk = "ON CONFLICT(hash_key)";
255 selCondition = " WHERE hash_key = ?;";
256 } else {
257 conflictPk = "ON CONFLICT(hash_key, device)";
258 selCondition = " WHERE hash_key = ? AND device = ?;";
259 }
260 const std::string tableName = DBConstant::RELATIONAL_PREFIX + query_.GetTableName() + "_log";
261 std::string dataFormat = "?, '" + hashDevId_ + "', ?, ?, ?, ?, ?";
262 std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key";
263 std::string sql = "INSERT INTO " + tableName +
264 " (" + columnList + ", cursor) VALUES (" + dataFormat + "," +
265 CloudStorageUtils::GetSelectIncCursorSql(query_.GetTableName()) +") " + conflictPk +
266 " DO UPDATE SET data_key = excluded.data_key, device = excluded.device,"
267 " ori_device = excluded.ori_device, timestamp = excluded.timestamp, wtimestamp = excluded.wtimestamp,"
268 " flag = excluded.flag, cursor = excluded.cursor;";
269 int errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
270 if (errCode != E_OK) {
271 LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
272 return errCode;
273 }
274 std::string selectSql = "SELECT " + columnList + " FROM " + tableName + selCondition;
275 errCode = SQLiteUtils::GetStatement(db, selectSql, queryStmt);
276 if (errCode != E_OK) {
277 SQLiteUtils::ResetStatement(logStmt, true, errCode);
278 LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
279 }
280 return errCode;
281 }
282
PrepareStatement(sqlite3 * db,SaveSyncDataStmt & stmt)283 int RelationalSyncDataInserter::PrepareStatement(sqlite3 *db, SaveSyncDataStmt &stmt)
284 {
285 int errCode = GetSaveLogStatement(db, stmt.saveLogStmt, stmt.queryStmt);
286 if (errCode != E_OK) {
287 LOGE("Get save log statement failed. err=%d", errCode);
288 return errCode;
289 }
290 errCode = GetInsertStatement(db, stmt.insertDataStmt);
291 if (errCode != E_OK) {
292 LOGE("Get insert statement failed. err=%d", errCode);
293 return errCode;
294 }
295 errCode = GetUpdateStatement(db, stmt.updateDataStmt);
296 if (errCode != E_OK) {
297 LOGE("Get update statement failed. err=%d", errCode);
298 }
299 return errCode;
300 }
301
Iterate(const std::function<int (DataItem &)> & saveSyncDataItem)302 int RelationalSyncDataInserter::Iterate(const std::function<int (DataItem &)> &saveSyncDataItem)
303 {
304 int errCode = E_OK;
305 for (auto &it : entries_) {
306 it.dev = hashDevId_;
307 int ret = saveSyncDataItem(it);
308 errCode = errCode == E_OK ? ret : errCode;
309 }
310 return errCode;
311 }
312
GetUpdateStatement(sqlite3 * db,sqlite3_stmt * & stmt)313 int RelationalSyncDataInserter::GetUpdateStatement(sqlite3 *db, sqlite3_stmt *&stmt)
314 {
315 if (stmt != nullptr) {
316 return -E_INVALID_ARGS;
317 }
318
319 std::set<std::string> identifyKeySet;
320 for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
321 identifyKeySet.insert(primaryKey);
322 }
323 auto distributedPk = localTable_.GetSyncDistributedPk();
324 if (!distributedPk.empty()) {
325 identifyKeySet.insert(distributedPk.begin(), distributedPk.end());
326 }
327 std::string updateValue;
328 const auto &localTableFields = localTable_.GetFields();
329 for (const auto &it : remoteFields_) {
330 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
331 continue; // skip fields which is orphaned in remote
332 }
333 if (identifyKeySet.find(it.GetFieldName()) == identifyKeySet.end()) {
334 if (updateValue.empty()) {
335 updateValue.append(" SET ");
336 } else {
337 updateValue.append(", ");
338 }
339 updateValue.append("'").append(it.GetFieldName()).append("'=?");
340 }
341 }
342 if (updateValue.empty()) {
343 // only sync pk no need update
344 return E_OK;
345 }
346 std::string sql = "UPDATE '" + insertTableName_ + "'" + updateValue + " WHERE " +
347 std::string(DBConstant::SQLITE_INNER_ROWID) + " IN (SELECT data_key FROM " +
348 DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
349 if (mode_ == DistributedTableMode::COLLABORATION) {
350 sql += "WHERE hash_key=?);";
351 } else {
352 sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
353 }
354 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
355 if (errCode != E_OK) {
356 LOGE("Get update data statement fail! errCode:%d", errCode);
357 }
358 return errCode;
359 }
360
BindHashKeyAndDev(const DataItem & dataItem,sqlite3_stmt * stmt,int beginIndex)361 int RelationalSyncDataInserter::BindHashKeyAndDev(const DataItem &dataItem, sqlite3_stmt *stmt,
362 int beginIndex)
363 {
364 int errCode = SQLiteUtils::BindBlobToStatement(stmt, beginIndex++, dataItem.hashKey);
365 if (errCode != E_OK) {
366 LOGE("[RelationalSyncDataInserter] bind hash key failed %d", errCode);
367 return errCode;
368 }
369 if (mode_ != DistributedTableMode::COLLABORATION) {
370 errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, dataItem.dev);
371 if (errCode != E_OK) {
372 LOGE("[RelationalSyncDataInserter] bind dev failed %d", errCode);
373 }
374 }
375 return errCode;
376 }
377
SaveSyncLog(sqlite3 * db,sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)378 int RelationalSyncDataInserter::SaveSyncLog(sqlite3 *db, sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
379 const DataItem &dataItem, int64_t rowid)
380 {
381 LogInfo logInfoGet;
382 int errCode = SQLiteRelationalUtils::GetLogInfoPre(queryStmt, mode_, dataItem, logInfoGet);
383 LogInfo logInfoBind;
384 logInfoBind.hashKey = dataItem.hashKey;
385 logInfoBind.device = dataItem.dev;
386 logInfoBind.timestamp = dataItem.timestamp;
387 logInfoBind.flag = dataItem.flag;
388
389 if (errCode == -E_NOT_FOUND) { // insert
390 logInfoBind.wTimestamp = dataItem.writeTimestamp;
391 logInfoBind.originDev = dataItem.dev;
392 } else if (errCode == E_OK) { // update
393 logInfoBind.wTimestamp = logInfoGet.wTimestamp;
394 logInfoBind.originDev = logInfoGet.originDev;
395 } else {
396 LOGE("[RelationalSyncDataInserter] get log info failed %d", errCode);
397 return errCode;
398 }
399
400 // bind
401 SQLiteUtils::BindInt64ToStatement(statement, 1, rowid); // 1 means dataKey index
402 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
403 SQLiteUtils::BindBlobToStatement(statement, 2, originDev); // 2 means ori_dev index
404 SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp); // 3 means timestamp index
405 SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp); // 4 means w_timestamp index
406 SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag); // 5 means flag index
407 SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey); // 6 means hashKey index
408 errCode = SQLiteUtils::StepWithRetry(statement, false);
409 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
410 return E_OK;
411 }
412 return errCode;
413 }
414
GetChangedData()415 ChangedData &RelationalSyncDataInserter::GetChangedData()
416 {
417 return data_;
418 }
419 } // namespace DistributedDB