1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "relational_sync_data_inserter.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "data_transformer.h"
19 #include "db_common.h"
20 #include "sqlite_relational_utils.h"
21 #include "sqlite_utils.h"
22
23 namespace DistributedDB {
ResetStatements(bool isNeedFinalize)24 int SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
25 {
26 int errCode = E_OK;
27 if (insertDataStmt != nullptr) {
28 SQLiteUtils::ResetStatement(insertDataStmt, isNeedFinalize, errCode);
29 }
30 if (updateDataStmt != nullptr) {
31 SQLiteUtils::ResetStatement(updateDataStmt, isNeedFinalize, errCode);
32 }
33 if (saveLogStmt != nullptr) {
34 SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
35 }
36 if (queryStmt != nullptr) {
37 SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
38 }
39 if (rmDataStmt != nullptr) {
40 SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
41 }
42 if (rmLogStmt != nullptr) {
43 SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
44 }
45 if (queryByFieldStmt != nullptr) {
46 SQLiteUtils::ResetStatement(queryByFieldStmt, isNeedFinalize, errCode);
47 }
48 return errCode;
49 }
50
CreateInserter(const std::string & deviceName,const QueryObject & query,const SchemaInfo & schemaInfo,const std::vector<FieldInfo> & remoteFields,const StoreInfo & info)51 RelationalSyncDataInserter RelationalSyncDataInserter::CreateInserter(const std::string &deviceName,
52 const QueryObject &query, const SchemaInfo &schemaInfo, const std::vector<FieldInfo> &remoteFields,
53 const StoreInfo &info)
54 {
55 RelationalSyncDataInserter inserter;
56 inserter.SetHashDevId(DBCommon::TransferStringToHex(DBCommon::TransferHashString(deviceName)));
57 inserter.SetRemoteFields(remoteFields);
58 inserter.SetQuery(query);
59 TableInfo localTable = schemaInfo.localSchema.GetTable(query.GetTableName());
60 localTable.SetTrackerTable(schemaInfo.trackerSchema.GetTrackerTable(localTable.GetTableName()));
61 localTable.SetDistributedTable(schemaInfo.localSchema.GetDistributedTable(query.GetTableName()));
62 inserter.SetLocalTable(localTable);
63 inserter.SetTableMode(schemaInfo.localSchema.GetTableMode());
64 if (schemaInfo.localSchema.GetTableMode() == DistributedTableMode::COLLABORATION) {
65 inserter.SetInsertTableName(localTable.GetTableName());
66 } else {
67 inserter.SetInsertTableName(DBCommon::GetDistributedTableName(deviceName, localTable.GetTableName(), info));
68 }
69 return inserter;
70 }
71
SetHashDevId(const std::string & hashDevId)72 void RelationalSyncDataInserter::SetHashDevId(const std::string &hashDevId)
73 {
74 hashDevId_ = hashDevId;
75 }
76
SetRemoteFields(std::vector<FieldInfo> remoteFields)77 void RelationalSyncDataInserter::SetRemoteFields(std::vector<FieldInfo> remoteFields)
78 {
79 remoteFields_ = std::move(remoteFields);
80 }
81
SetEntries(std::vector<DataItem> entries)82 void RelationalSyncDataInserter::SetEntries(std::vector<DataItem> entries)
83 {
84 entries_ = std::move(entries);
85 }
86
SetLocalTable(TableInfo localTable)87 void RelationalSyncDataInserter::SetLocalTable(TableInfo localTable)
88 {
89 localTable_ = std::move(localTable);
90 }
91
GetLocalTable() const92 const TableInfo &RelationalSyncDataInserter::GetLocalTable() const
93 {
94 return localTable_;
95 }
96
SetQuery(QueryObject query)97 void RelationalSyncDataInserter::SetQuery(QueryObject query)
98 {
99 query_ = std::move(query);
100 }
101
SetInsertTableName(std::string tableName)102 void RelationalSyncDataInserter::SetInsertTableName(std::string tableName)
103 {
104 insertTableName_ = std::move(tableName);
105 }
106
SetTableMode(DistributedTableMode mode)107 void RelationalSyncDataInserter::SetTableMode(DistributedTableMode mode)
108 {
109 mode_ = mode;
110 }
111
GetInsertStatement(sqlite3 * db,sqlite3_stmt * & stmt)112 int RelationalSyncDataInserter::GetInsertStatement(sqlite3 *db, sqlite3_stmt *&stmt)
113 {
114 if (stmt != nullptr) {
115 return -E_INVALID_ARGS;
116 }
117
118 const auto &localTableFields = localTable_.GetFields();
119 std::string colName;
120 std::string dataFormat;
121 for (const auto &it : remoteFields_) {
122 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
123 continue; // skip fields which is orphaned in remote
124 }
125 colName += "'" + it.GetFieldName() + "',";
126 dataFormat += "?,";
127 }
128 colName.pop_back();
129 dataFormat.pop_back();
130 std::string sql = "INSERT ";
131 if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
132 sql += "OR REPLACE ";
133 }
134 sql += "INTO '" + insertTableName_ + "'" +
135 "(" + colName + ") VALUES(" + dataFormat + ");";
136 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
137 if (errCode != E_OK) {
138 LOGE("Get insert data statement fail! errCode:%d", errCode);
139 }
140 return errCode;
141 }
142
GetDbValueByRowId(sqlite3 * db,const std::vector<std::string> & fieldList,const int64_t rowid,std::vector<Type> & values)143 int RelationalSyncDataInserter::GetDbValueByRowId(sqlite3 *db, const std::vector<std::string> &fieldList,
144 const int64_t rowid, std::vector<Type> &values)
145 {
146 if (fieldList.empty()) {
147 LOGW("[RelationalSyncDataInserter][GetDbValueByRowId] fieldList is empty");
148 return E_OK;
149 }
150 sqlite3_stmt *getValueStmt = nullptr;
151 std::string sql = "SELECT ";
152 for (const auto &col : fieldList) {
153 sql += "data.'" + col + "',";
154 }
155 sql.pop_back();
156 sql += " FROM '" + localTable_.GetTableName() + "' as data WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
157 " = ?;";
158 int errCode = SQLiteUtils::GetStatement(db, sql, getValueStmt);
159 if (errCode != E_OK) {
160 LOGE("[RelationalSyncDataInserter][GetDbValueByRowId] failed to prepare statmement");
161 return errCode;
162 }
163
164 SQLiteUtils::BindInt64ToStatement(getValueStmt, 1, rowid);
165 errCode = SQLiteUtils::StepWithRetry(getValueStmt, false);
166 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
167 VBucket bucket;
168 errCode = SQLiteRelationalUtils::GetSelectVBucket(getValueStmt, bucket);
169 if (errCode != E_OK) {
170 LOGE("[RelationalSyncDataInserter][GetDbValueByRowId] failed to convert sql result to values");
171 int ret = E_OK;
172 SQLiteUtils::ResetStatement(getValueStmt, true, ret);
173 return errCode;
174 }
175 for (auto value : bucket) {
176 values.push_back(value.second);
177 }
178 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
179 LOGW("[RelationalSyncDataInserter][GetDbValueByRowId] found no data in db");
180 errCode = E_OK;
181 }
182 int ret = E_OK;
183 SQLiteUtils::ResetStatement(getValueStmt, true, ret);
184 return errCode;
185 }
186
GetObserverDataByRowId(sqlite3 * db,int64_t rowid,ChangeType type)187 int RelationalSyncDataInserter::GetObserverDataByRowId(sqlite3 *db, int64_t rowid, ChangeType type)
188 {
189 std::vector<Type> primaryValues;
190 std::vector<std::string> primaryKeys;
191
192 if (localTable_.IsMultiPkTable() || localTable_.IsNoPkTable()) {
193 primaryKeys.push_back(DBConstant::ROWID);
194 primaryValues.push_back(rowid);
195 }
196 std::vector<std::string> pkList;
197 for (const auto &primaryKey : localTable_.GetPrimaryKey()) {
198 if (primaryKey.second != DBConstant::ROWID) {
199 pkList.push_back(primaryKey.second);
200 }
201 }
202
203 data_.field = primaryKeys;
204 if (pkList.empty()) {
205 data_.primaryData[type].push_back(primaryValues);
206 return E_OK;
207 }
208
209 std::vector<Type> dbValues;
210 int errCode = GetDbValueByRowId(db, pkList, rowid, dbValues);
211 if (errCode != E_OK) {
212 LOGE("[RelationalSyncDataInserter][GetObserverDataByRowId] failed to get db values");
213 data_.primaryData[type].push_back(primaryValues);
214 return errCode;
215 }
216 data_.field.insert(data_.field.end(), pkList.begin(), pkList.end());
217 primaryValues.insert(primaryValues.end(), dbValues.begin(), dbValues.end());
218 data_.primaryData[type].push_back(primaryValues);
219 return errCode;
220 }
221
SaveData(bool isUpdate,const DataItem & dataItem,SaveSyncDataStmt & saveSyncDataStmt,std::map<std::string,Type> & saveVals)222 int RelationalSyncDataInserter::SaveData(bool isUpdate, const DataItem &dataItem,
223 SaveSyncDataStmt &saveSyncDataStmt, std::map<std::string, Type> &saveVals)
224 {
225 sqlite3_stmt *&stmt = isUpdate ? saveSyncDataStmt.updateDataStmt : saveSyncDataStmt.insertDataStmt;
226 std::set<std::string> filterSet;
227 if (isUpdate) {
228 for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
229 filterSet.insert(primaryKey);
230 }
231 auto distributedPk = localTable_.GetSyncDistributedPk();
232 if (!distributedPk.empty()) {
233 filterSet.insert(distributedPk.begin(), distributedPk.end());
234 }
235 }
236 if (stmt == nullptr) {
237 LOGW("skip save data %s", DBCommon::StringMiddleMasking(DBCommon::VectorToHexString(dataItem.hashKey)).c_str());
238 return E_OK;
239 }
240
241 int errCode = BindSaveDataStatement(isUpdate, dataItem, filterSet, stmt, saveVals);
242 if (errCode != E_OK) {
243 LOGE("Bind data failed, errCode=%d.", errCode);
244 int ret = E_OK;
245 SQLiteUtils::ResetStatement(stmt, false, ret);
246 return errCode;
247 }
248
249 errCode = SQLiteUtils::StepWithRetry(stmt, false);
250 int ret = E_OK;
251 SQLiteUtils::ResetStatement(stmt, false, true, ret);
252 return errCode;
253 }
254
BindSaveDataStatement(bool isExist,const DataItem & dataItem,const std::set<std::string> & filterSet,sqlite3_stmt * stmt,std::map<std::string,Type> & saveVals)255 int RelationalSyncDataInserter::BindSaveDataStatement(bool isExist, const DataItem &dataItem,
256 const std::set<std::string> &filterSet, sqlite3_stmt *stmt, std::map<std::string, Type> &saveVals)
257 {
258 OptRowDataWithLog data;
259 // deserialize by remote field info
260 int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, remoteFields_);
261 if (errCode != E_OK) {
262 LOGE("DeSerialize dataItem failed! errCode = [%d]", errCode);
263 return errCode;
264 }
265
266 size_t dataIdx = 0;
267 int bindIdx = 1;
268 const auto &localTableFields = localTable_.GetFields();
269 for (const auto &it : remoteFields_) {
270 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
271 LOGD("field %s[%zu] not found in local schema.", DBCommon::StringMiddleMasking(it.GetFieldName()).c_str(),
272 it.GetFieldName().size());
273 dataIdx++;
274 continue; // skip fields which is orphaned in remote
275 }
276 if (dataIdx >= data.optionalData.size()) {
277 LOGD("field over size. cnt:%d, data size:%d", dataIdx, data.optionalData.size());
278 break; // cnt should less than optionalData size.
279 }
280 Type saveVal;
281 CloudStorageUtils::SaveChangedDataByType(data.optionalData[dataIdx], saveVal);
282 saveVals[it.GetFieldName()] = saveVal;
283 if (filterSet.find(it.GetFieldName()) != filterSet.end()) {
284 dataIdx++;
285 continue; // skip fields when update
286 }
287 errCode = SQLiteUtils::BindDataValueByType(stmt, data.optionalData[dataIdx], bindIdx++);
288 if (errCode != E_OK) {
289 LOGE("Bind data failed, errCode:%d, cid:%zu.", errCode, dataIdx);
290 return errCode;
291 }
292 dataIdx++;
293 }
294 return isExist ? BindHashKeyAndDev(dataItem, stmt, bindIdx) : E_OK;
295 }
296
GetDeleteLogStmt(sqlite3 * db,sqlite3_stmt * & stmt)297 int RelationalSyncDataInserter::GetDeleteLogStmt(sqlite3 *db, sqlite3_stmt *&stmt)
298 {
299 std::string sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + localTable_.GetTableName() +
300 "_log ";
301 if (mode_ == DistributedTableMode::COLLABORATION) {
302 sql += "WHERE hash_key=?";
303 } else {
304 sql += "WHERE hash_key=? AND device=?";
305 }
306
307 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
308 if (errCode != E_OK) {
309 LOGE("[DeleteSyncLog] Get statement fail!");
310 }
311 return errCode;
312 }
313
GetDeleteSyncDataStmt(sqlite3 * db,sqlite3_stmt * & stmt)314 int RelationalSyncDataInserter::GetDeleteSyncDataStmt(sqlite3 *db, sqlite3_stmt *&stmt)
315 {
316 std::string sql = "DELETE FROM '" + insertTableName_ + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
317 " IN (SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
318 if (mode_ == DistributedTableMode::COLLABORATION) {
319 sql += "WHERE hash_key=?);";
320 } else {
321 sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
322 }
323 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
324 if (errCode != E_OK) {
325 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
326 }
327 return errCode;
328 }
329
GetSaveLogStatement(sqlite3 * db,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt)330 int RelationalSyncDataInserter::GetSaveLogStatement(sqlite3 *db, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt)
331 {
332 const std::string tableName = DBConstant::RELATIONAL_PREFIX + query_.GetTableName() + "_log";
333 std::string dataFormat = "?, '" + hashDevId_ + "', ?, ?, ?, ?, ?";
334 TrackerTable trackerTable = localTable_.GetTrackerTable();
335 if (trackerTable.GetExtendNames().empty()) {
336 dataFormat += ", ?";
337 } else {
338 dataFormat += ", (select json_object(";
339 for (const auto &extendColName : trackerTable.GetExtendNames()) {
340 dataFormat += "'" + extendColName + "'," + extendColName + ",";
341 }
342 dataFormat.pop_back();
343 dataFormat += ") from ";
344 dataFormat += localTable_.GetTableName() + " where _rowid_ = ?)";
345 }
346 std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, extend_field";
347 std::string sql = "INSERT OR REPLACE INTO " + tableName +
348 " (" + columnList + ", cursor) VALUES (" + dataFormat + "," +
349 CloudStorageUtils::GetSelectIncCursorSql(query_.GetTableName()) +");";
350 int errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
351 if (errCode != E_OK) {
352 LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
353 return errCode;
354 }
355 std::string selectSql = "select " + columnList + " from " + tableName;
356 if (mode_ == DistributedTableMode::COLLABORATION) {
357 selectSql += " where hash_key = ?;";
358 } else {
359 selectSql += " where hash_key = ? and device = ?;";
360 }
361 errCode = SQLiteUtils::GetStatement(db, selectSql, queryStmt);
362 if (errCode != E_OK) {
363 int ret = E_OK;
364 SQLiteUtils::ResetStatement(logStmt, true, ret);
365 LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
366 }
367 return errCode;
368 }
369
PrepareStatement(sqlite3 * db,SaveSyncDataStmt & stmt)370 int RelationalSyncDataInserter::PrepareStatement(sqlite3 *db, SaveSyncDataStmt &stmt)
371 {
372 int errCode = GetSaveLogStatement(db, stmt.saveLogStmt, stmt.queryStmt);
373 if (errCode != E_OK) {
374 LOGE("Get save log statement failed. err=%d", errCode);
375 return errCode;
376 }
377 errCode = GetInsertStatement(db, stmt.insertDataStmt);
378 if (errCode != E_OK) {
379 LOGE("Get insert statement failed. err=%d", errCode);
380 return errCode;
381 }
382 errCode = GetUpdateStatement(db, stmt.updateDataStmt);
383 if (errCode != E_OK) {
384 LOGE("Get update statement failed. err=%d", errCode);
385 return errCode;
386 }
387 errCode = GetQueryLogByFieldStmt(db, stmt.queryByFieldStmt);
388 if (errCode != E_OK) {
389 LOGE("Get query by field statement failed. err=%d", errCode);
390 }
391 return errCode;
392 }
393
Iterate(const std::function<int (DataItem &)> & saveSyncDataItem)394 int RelationalSyncDataInserter::Iterate(const std::function<int (DataItem &)> &saveSyncDataItem)
395 {
396 int errCode = E_OK;
397 for (auto &it : entries_) {
398 it.dev = hashDevId_;
399 int ret = saveSyncDataItem(it);
400 errCode = errCode == E_OK ? ret : errCode;
401 }
402 return errCode;
403 }
404
GetUpdateStatement(sqlite3 * db,sqlite3_stmt * & stmt)405 int RelationalSyncDataInserter::GetUpdateStatement(sqlite3 *db, sqlite3_stmt *&stmt)
406 {
407 if (stmt != nullptr) {
408 return -E_INVALID_ARGS;
409 }
410
411 std::set<std::string> identifyKeySet;
412 for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
413 identifyKeySet.insert(primaryKey);
414 }
415 auto distributedPk = localTable_.GetSyncDistributedPk();
416 if (!distributedPk.empty()) {
417 identifyKeySet.insert(distributedPk.begin(), distributedPk.end());
418 }
419 std::string updateValue;
420 const auto &localTableFields = localTable_.GetFields();
421 for (const auto &it : remoteFields_) {
422 if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
423 continue; // skip fields which is orphaned in remote
424 }
425 if (identifyKeySet.find(it.GetFieldName()) == identifyKeySet.end()) {
426 if (updateValue.empty()) {
427 updateValue.append(" SET ");
428 } else {
429 updateValue.append(", ");
430 }
431 updateValue.append("'").append(it.GetFieldName()).append("'=?");
432 }
433 }
434 if (updateValue.empty()) {
435 // only sync pk no need update
436 return E_OK;
437 }
438 std::string sql = "UPDATE '" + insertTableName_ + "'" + updateValue + " WHERE " +
439 std::string(DBConstant::SQLITE_INNER_ROWID) + " IN (SELECT data_key FROM " +
440 DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
441 if (mode_ == DistributedTableMode::COLLABORATION) {
442 sql += "WHERE hash_key=?);";
443 } else {
444 sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
445 }
446 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
447 if (errCode != E_OK) {
448 LOGE("Get update data statement fail! errCode:%d", errCode);
449 }
450 return errCode;
451 }
452
BindHashKeyAndDev(const DataItem & dataItem,sqlite3_stmt * stmt,int beginIndex)453 int RelationalSyncDataInserter::BindHashKeyAndDev(const DataItem &dataItem, sqlite3_stmt *stmt,
454 int beginIndex)
455 {
456 int errCode = SQLiteUtils::BindBlobToStatement(stmt, beginIndex++, dataItem.hashKey);
457 if (errCode != E_OK) {
458 LOGE("[RelationalSyncDataInserter] bind hash key failed %d", errCode);
459 return errCode;
460 }
461 if (mode_ != DistributedTableMode::COLLABORATION) {
462 errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, dataItem.dev);
463 if (errCode != E_OK) {
464 LOGE("[RelationalSyncDataInserter] bind dev failed %d", errCode);
465 }
466 }
467 return errCode;
468 }
469
SaveSyncLog(sqlite3 * db,const DataItem & dataItem,const DeviceSyncSaveDataInfo & deviceSyncSaveDataInfo,std::map<std::string,Type> & saveVals,SaveSyncDataStmt & saveStmt)470 int RelationalSyncDataInserter::SaveSyncLog(sqlite3 *db, const DataItem &dataItem,
471 const DeviceSyncSaveDataInfo &deviceSyncSaveDataInfo, std::map<std::string, Type> &saveVals,
472 SaveSyncDataStmt &saveStmt)
473 {
474 if (std::get_if<int64_t>(&saveVals[DBConstant::ROWID]) == nullptr) {
475 LOGE("[RelationalSyncDataInserter] Invalid args because of no rowid!");
476 return -E_INVALID_ARGS;
477 }
478 auto updateCursor = CloudStorageUtils::GetCursorIncSql(query_.GetTableName());
479 int errCode = SQLiteUtils::ExecuteRawSQL(db, updateCursor);
480 if (errCode != E_OK) {
481 LOGE("[RelationalSyncDataInserter] update cursor failed %d", errCode);
482 return errCode;
483 }
484 LogInfo logInfoBind;
485 logInfoBind.hashKey = dataItem.hashKey;
486 logInfoBind.device = dataItem.dev;
487 logInfoBind.timestamp = dataItem.timestamp;
488 logInfoBind.flag = dataItem.flag;
489
490 if (!deviceSyncSaveDataInfo.isExist) { // insert
491 logInfoBind.wTimestamp = dataItem.writeTimestamp;
492 logInfoBind.originDev = dataItem.dev;
493 } else { // update
494 logInfoBind.wTimestamp = deviceSyncSaveDataInfo.localLogInfo.wTimestamp;
495 logInfoBind.originDev = deviceSyncSaveDataInfo.localLogInfo.originDev;
496 }
497 auto statement = saveStmt.saveLogStmt;
498 // bind
499 int bindIndex = 0;
500 // 1 means dataKey index
501 SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, std::get<int64_t>(saveVals[DBConstant::ROWID]));
502 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
503 SQLiteUtils::BindBlobToStatement(statement, ++bindIndex, originDev); // 2 means ori_dev index
504 SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.timestamp); // 3 means timestamp index
505 SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.wTimestamp); // 4 means w_timestamp index
506 SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.flag); // 5 means flag index
507 SQLiteUtils::BindBlobToStatement(statement, ++bindIndex, logInfoBind.hashKey); // 6 means hashKey index
508 BindExtendFieldOrRowid(statement, saveVals, bindIndex); // bind extend_field or rowid
509 errCode = SQLiteUtils::StepWithRetry(statement, false);
510 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
511 return E_OK;
512 }
513 return errCode;
514 }
515
GetChangedData()516 ChangedData &RelationalSyncDataInserter::GetChangedData()
517 {
518 return data_;
519 }
520
BindExtendFieldOrRowid(sqlite3_stmt * & stmt,std::map<std::string,Type> & saveVals,int bindIndex)521 void RelationalSyncDataInserter::BindExtendFieldOrRowid(sqlite3_stmt *&stmt, std::map<std::string, Type> &saveVals,
522 int bindIndex)
523 {
524 TrackerTable trackerTable = localTable_.GetTrackerTable();
525 if (trackerTable.GetExtendNames().empty()) {
526 SQLiteUtils::BindTextToStatement(stmt, ++bindIndex, "");
527 } else {
528 SQLiteUtils::BindInt64ToStatement(stmt, ++bindIndex, std::get<int64_t>(saveVals[DBConstant::ROWID]));
529 }
530 }
531
GetRemoteFields() const532 std::vector<FieldInfo> RelationalSyncDataInserter::GetRemoteFields() const
533 {
534 return remoteFields_;
535 }
536
GetQueryLogByFieldStmt(sqlite3 * db,sqlite3_stmt * & stmt)537 int RelationalSyncDataInserter::GetQueryLogByFieldStmt(sqlite3 *db, sqlite3_stmt *&stmt)
538 {
539 if (mode_ != DistributedTableMode::COLLABORATION) {
540 stmt = nullptr;
541 return E_OK;
542 }
543 auto syncPk = localTable_.GetSyncDistributedPk();
544 if (syncPk.empty()) {
545 stmt = nullptr;
546 return E_OK;
547 }
548 std::string sql("SELECT ");
549 std::string columnList = "log.data_key, log.device, log.ori_device, log.timestamp, log.wtimestamp, log.flag,"
550 " log.hash_key, log.extend_field";
551 auto table = localTable_.GetTableName();
552 sql.append(columnList).append(" FROM ").append(DBCommon::GetLogTableName(table))
553 .append(" AS log, ").append(table)
554 .append(" AS data WHERE log.data_key = data._rowid_ AND ");
555 for (size_t i = 0; i < syncPk.size(); ++i) {
556 if (i != 0) {
557 sql.append(", ");
558 }
559 sql.append("data.'").append(syncPk[i]).append("' = ?");
560 }
561 auto errCode = SQLiteUtils::GetStatement(db, sql, stmt);
562 if (errCode != E_OK) {
563 LOGE("[RelationalSyncDataInserter] Get query [%s][%zu] log stmt failed[%d]",
564 DBCommon::StringMiddleMasking(table).c_str(), table.size(), errCode);
565 }
566 return errCode;
567 }
568 } // namespace DistributedDB