• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "relational_sync_data_inserter.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "data_transformer.h"
19 #include "db_common.h"
20 #include "sqlite_relational_utils.h"
21 #include "sqlite_utils.h"
22 
23 namespace DistributedDB {
ResetStatements(bool isNeedFinalize)24 int SaveSyncDataStmt::ResetStatements(bool isNeedFinalize)
25 {
26     int errCode = E_OK;
27     if (insertDataStmt != nullptr) {
28         SQLiteUtils::ResetStatement(insertDataStmt, isNeedFinalize, errCode);
29     }
30     if (updateDataStmt != nullptr) {
31         SQLiteUtils::ResetStatement(updateDataStmt, isNeedFinalize, errCode);
32     }
33     if (saveLogStmt != nullptr) {
34         SQLiteUtils::ResetStatement(saveLogStmt, isNeedFinalize, errCode);
35     }
36     if (queryStmt != nullptr) {
37         SQLiteUtils::ResetStatement(queryStmt, isNeedFinalize, errCode);
38     }
39     if (rmDataStmt != nullptr) {
40         SQLiteUtils::ResetStatement(rmDataStmt, isNeedFinalize, errCode);
41     }
42     if (rmLogStmt != nullptr) {
43         SQLiteUtils::ResetStatement(rmLogStmt, isNeedFinalize, errCode);
44     }
45     if (queryByFieldStmt != nullptr) {
46         SQLiteUtils::ResetStatement(queryByFieldStmt, isNeedFinalize, errCode);
47     }
48     return errCode;
49 }
50 
CreateInserter(const std::string & deviceName,const QueryObject & query,const SchemaInfo & schemaInfo,const std::vector<FieldInfo> & remoteFields,const StoreInfo & info)51 RelationalSyncDataInserter RelationalSyncDataInserter::CreateInserter(const std::string &deviceName,
52     const QueryObject &query, const SchemaInfo &schemaInfo, const std::vector<FieldInfo> &remoteFields,
53     const StoreInfo &info)
54 {
55     RelationalSyncDataInserter inserter;
56     inserter.SetHashDevId(DBCommon::TransferStringToHex(DBCommon::TransferHashString(deviceName)));
57     inserter.SetRemoteFields(remoteFields);
58     inserter.SetQuery(query);
59     TableInfo localTable = schemaInfo.localSchema.GetTable(query.GetTableName());
60     localTable.SetTrackerTable(schemaInfo.trackerSchema.GetTrackerTable(localTable.GetTableName()));
61     localTable.SetDistributedTable(schemaInfo.localSchema.GetDistributedTable(query.GetTableName()));
62     inserter.SetLocalTable(localTable);
63     inserter.SetTableMode(schemaInfo.localSchema.GetTableMode());
64     if (schemaInfo.localSchema.GetTableMode() == DistributedTableMode::COLLABORATION) {
65         inserter.SetInsertTableName(localTable.GetTableName());
66     } else {
67         inserter.SetInsertTableName(DBCommon::GetDistributedTableName(deviceName, localTable.GetTableName(), info));
68     }
69     return inserter;
70 }
71 
SetHashDevId(const std::string & hashDevId)72 void RelationalSyncDataInserter::SetHashDevId(const std::string &hashDevId)
73 {
74     hashDevId_ = hashDevId;
75 }
76 
SetRemoteFields(std::vector<FieldInfo> remoteFields)77 void RelationalSyncDataInserter::SetRemoteFields(std::vector<FieldInfo> remoteFields)
78 {
79     remoteFields_ = std::move(remoteFields);
80 }
81 
SetEntries(std::vector<DataItem> entries)82 void RelationalSyncDataInserter::SetEntries(std::vector<DataItem> entries)
83 {
84     entries_ = std::move(entries);
85 }
86 
SetLocalTable(TableInfo localTable)87 void RelationalSyncDataInserter::SetLocalTable(TableInfo localTable)
88 {
89     localTable_ = std::move(localTable);
90 }
91 
GetLocalTable() const92 const TableInfo &RelationalSyncDataInserter::GetLocalTable() const
93 {
94     return localTable_;
95 }
96 
SetQuery(QueryObject query)97 void RelationalSyncDataInserter::SetQuery(QueryObject query)
98 {
99     query_ = std::move(query);
100 }
101 
SetInsertTableName(std::string tableName)102 void RelationalSyncDataInserter::SetInsertTableName(std::string tableName)
103 {
104     insertTableName_ = std::move(tableName);
105 }
106 
SetTableMode(DistributedTableMode mode)107 void RelationalSyncDataInserter::SetTableMode(DistributedTableMode mode)
108 {
109     mode_ = mode;
110 }
111 
GetInsertStatement(sqlite3 * db,sqlite3_stmt * & stmt)112 int RelationalSyncDataInserter::GetInsertStatement(sqlite3 *db, sqlite3_stmt *&stmt)
113 {
114     if (stmt != nullptr) {
115         return -E_INVALID_ARGS;
116     }
117 
118     const auto &localTableFields = localTable_.GetFields();
119     std::string colName;
120     std::string dataFormat;
121     for (const auto &it : remoteFields_) {
122         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
123             continue; // skip fields which is orphaned in remote
124         }
125         colName += "'" + it.GetFieldName() + "',";
126         dataFormat += "?,";
127     }
128     colName.pop_back();
129     dataFormat.pop_back();
130     std::string sql = "INSERT ";
131     if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
132         sql += "OR REPLACE ";
133     }
134     sql += "INTO '" + insertTableName_ + "'" +
135         "(" + colName + ") VALUES(" + dataFormat + ");";
136     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
137     if (errCode != E_OK) {
138         LOGE("Get insert data statement fail! errCode:%d", errCode);
139     }
140     return errCode;
141 }
142 
GetDbValueByRowId(sqlite3 * db,const std::vector<std::string> & fieldList,const int64_t rowid,std::vector<Type> & values)143 int RelationalSyncDataInserter::GetDbValueByRowId(sqlite3 *db, const std::vector<std::string> &fieldList,
144     const int64_t rowid, std::vector<Type> &values)
145 {
146     if (fieldList.empty()) {
147         LOGW("[RelationalSyncDataInserter][GetDbValueByRowId] fieldList is empty");
148         return E_OK;
149     }
150     sqlite3_stmt *getValueStmt = nullptr;
151     std::string sql = "SELECT ";
152     for (const auto &col : fieldList) {
153         sql += "data.'" + col + "',";
154     }
155     sql.pop_back();
156     sql += " FROM '" + localTable_.GetTableName() + "' as data WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
157         " = ?;";
158     int errCode = SQLiteUtils::GetStatement(db, sql, getValueStmt);
159     if (errCode != E_OK) {
160         LOGE("[RelationalSyncDataInserter][GetDbValueByRowId] failed to prepare statmement");
161         return errCode;
162     }
163 
164     SQLiteUtils::BindInt64ToStatement(getValueStmt, 1, rowid);
165     errCode = SQLiteUtils::StepWithRetry(getValueStmt, false);
166     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
167         VBucket bucket;
168         errCode = SQLiteRelationalUtils::GetSelectVBucket(getValueStmt, bucket);
169         if (errCode != E_OK) {
170             LOGE("[RelationalSyncDataInserter][GetDbValueByRowId] failed to convert sql result to values");
171             int ret = E_OK;
172             SQLiteUtils::ResetStatement(getValueStmt, true, ret);
173             return errCode;
174         }
175         for (auto value : bucket) {
176             values.push_back(value.second);
177         }
178     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
179         LOGW("[RelationalSyncDataInserter][GetDbValueByRowId] found no data in db");
180         errCode = E_OK;
181     }
182     int ret = E_OK;
183     SQLiteUtils::ResetStatement(getValueStmt, true, ret);
184     return errCode;
185 }
186 
GetObserverDataByRowId(sqlite3 * db,int64_t rowid,ChangeType type)187 int RelationalSyncDataInserter::GetObserverDataByRowId(sqlite3 *db, int64_t rowid, ChangeType type)
188 {
189     std::vector<Type> primaryValues;
190     std::vector<std::string> primaryKeys;
191 
192     if (localTable_.IsMultiPkTable() || localTable_.IsNoPkTable()) {
193         primaryKeys.push_back(DBConstant::ROWID);
194         primaryValues.push_back(rowid);
195     }
196     std::vector<std::string> pkList;
197     for (const auto &primaryKey : localTable_.GetPrimaryKey()) {
198         if (primaryKey.second != DBConstant::ROWID) {
199             pkList.push_back(primaryKey.second);
200         }
201     }
202 
203     data_.field = primaryKeys;
204     if (pkList.empty()) {
205         data_.primaryData[type].push_back(primaryValues);
206         return E_OK;
207     }
208 
209     std::vector<Type> dbValues;
210     int errCode = GetDbValueByRowId(db, pkList, rowid, dbValues);
211     if (errCode != E_OK) {
212         LOGE("[RelationalSyncDataInserter][GetObserverDataByRowId] failed to get db values");
213         data_.primaryData[type].push_back(primaryValues);
214         return errCode;
215     }
216     data_.field.insert(data_.field.end(), pkList.begin(), pkList.end());
217     primaryValues.insert(primaryValues.end(), dbValues.begin(), dbValues.end());
218     data_.primaryData[type].push_back(primaryValues);
219     return errCode;
220 }
221 
SaveData(bool isUpdate,const DataItem & dataItem,SaveSyncDataStmt & saveSyncDataStmt,std::map<std::string,Type> & saveVals)222 int RelationalSyncDataInserter::SaveData(bool isUpdate, const DataItem &dataItem,
223     SaveSyncDataStmt &saveSyncDataStmt, std::map<std::string, Type> &saveVals)
224 {
225     sqlite3_stmt *&stmt = isUpdate ? saveSyncDataStmt.updateDataStmt : saveSyncDataStmt.insertDataStmt;
226     std::set<std::string> filterSet;
227     if (isUpdate) {
228         for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
229             filterSet.insert(primaryKey);
230         }
231         auto distributedPk = localTable_.GetSyncDistributedPk();
232         if (!distributedPk.empty()) {
233             filterSet.insert(distributedPk.begin(), distributedPk.end());
234         }
235     }
236     if (stmt == nullptr) {
237         LOGW("skip save data %s", DBCommon::StringMiddleMasking(DBCommon::VectorToHexString(dataItem.hashKey)).c_str());
238         return E_OK;
239     }
240 
241     int errCode = BindSaveDataStatement(isUpdate, dataItem, filterSet, stmt, saveVals);
242     if (errCode != E_OK) {
243         LOGE("Bind data failed, errCode=%d.", errCode);
244         int ret = E_OK;
245         SQLiteUtils::ResetStatement(stmt, false, ret);
246         return errCode;
247     }
248 
249     errCode = SQLiteUtils::StepWithRetry(stmt, false);
250     int ret = E_OK;
251     SQLiteUtils::ResetStatement(stmt, false, true, ret);
252     return errCode;
253 }
254 
BindSaveDataStatement(bool isExist,const DataItem & dataItem,const std::set<std::string> & filterSet,sqlite3_stmt * stmt,std::map<std::string,Type> & saveVals)255 int RelationalSyncDataInserter::BindSaveDataStatement(bool isExist, const DataItem &dataItem,
256     const std::set<std::string> &filterSet, sqlite3_stmt *stmt, std::map<std::string, Type> &saveVals)
257 {
258     OptRowDataWithLog data;
259     // deserialize by remote field info
260     int errCode = DataTransformer::DeSerializeDataItem(dataItem, data, remoteFields_);
261     if (errCode != E_OK) {
262         LOGE("DeSerialize dataItem failed! errCode = [%d]", errCode);
263         return errCode;
264     }
265 
266     size_t dataIdx = 0;
267     int bindIdx = 1;
268     const auto &localTableFields = localTable_.GetFields();
269     for (const auto &it : remoteFields_) {
270         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
271             LOGD("field %s[%zu] not found in local schema.", DBCommon::StringMiddleMasking(it.GetFieldName()).c_str(),
272                 it.GetFieldName().size());
273             dataIdx++;
274             continue; // skip fields which is orphaned in remote
275         }
276         if (dataIdx >= data.optionalData.size()) {
277             LOGD("field over size. cnt:%d, data size:%d", dataIdx, data.optionalData.size());
278             break; // cnt should less than optionalData size.
279         }
280         Type saveVal;
281         CloudStorageUtils::SaveChangedDataByType(data.optionalData[dataIdx], saveVal);
282         saveVals[it.GetFieldName()] = saveVal;
283         if (filterSet.find(it.GetFieldName()) != filterSet.end()) {
284             dataIdx++;
285             continue; // skip fields when update
286         }
287         errCode = SQLiteUtils::BindDataValueByType(stmt, data.optionalData[dataIdx], bindIdx++);
288         if (errCode != E_OK) {
289             LOGE("Bind data failed, errCode:%d, cid:%zu.", errCode, dataIdx);
290             return errCode;
291         }
292         dataIdx++;
293     }
294     return isExist ? BindHashKeyAndDev(dataItem, stmt, bindIdx) : E_OK;
295 }
296 
GetDeleteLogStmt(sqlite3 * db,sqlite3_stmt * & stmt)297 int RelationalSyncDataInserter::GetDeleteLogStmt(sqlite3 *db, sqlite3_stmt *&stmt)
298 {
299     std::string sql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + localTable_.GetTableName() +
300         "_log ";
301     if (mode_ == DistributedTableMode::COLLABORATION) {
302         sql += "WHERE hash_key=?";
303     } else {
304         sql += "WHERE hash_key=? AND device=?";
305     }
306 
307     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
308     if (errCode != E_OK) {
309         LOGE("[DeleteSyncLog] Get statement fail!");
310     }
311     return errCode;
312 }
313 
GetDeleteSyncDataStmt(sqlite3 * db,sqlite3_stmt * & stmt)314 int RelationalSyncDataInserter::GetDeleteSyncDataStmt(sqlite3 *db, sqlite3_stmt *&stmt)
315 {
316     std::string sql = "DELETE FROM '" + insertTableName_ + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
317         " IN (SELECT data_key FROM " + DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
318     if (mode_ == DistributedTableMode::COLLABORATION) {
319         sql += "WHERE hash_key=?);";
320     } else {
321         sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
322     }
323     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
324     if (errCode != E_OK) {
325         LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
326     }
327     return errCode;
328 }
329 
GetSaveLogStatement(sqlite3 * db,sqlite3_stmt * & logStmt,sqlite3_stmt * & queryStmt)330 int RelationalSyncDataInserter::GetSaveLogStatement(sqlite3 *db, sqlite3_stmt *&logStmt, sqlite3_stmt *&queryStmt)
331 {
332     const std::string tableName = DBConstant::RELATIONAL_PREFIX + query_.GetTableName() + "_log";
333     std::string dataFormat = "?, '" + hashDevId_ + "', ?, ?, ?, ?, ?";
334     TrackerTable trackerTable = localTable_.GetTrackerTable();
335     if (trackerTable.GetExtendNames().empty()) {
336         dataFormat += ", ?";
337     } else {
338         dataFormat += ", (select json_object(";
339         for (const auto &extendColName : trackerTable.GetExtendNames()) {
340             dataFormat += "'" + extendColName + "'," + extendColName + ",";
341         }
342         dataFormat.pop_back();
343         dataFormat += ") from ";
344         dataFormat += localTable_.GetTableName() + " where _rowid_ = ?)";
345     }
346     std::string columnList = "data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, extend_field";
347     std::string sql = "INSERT OR REPLACE INTO " + tableName +
348         " (" + columnList + ", cursor) VALUES (" + dataFormat + "," +
349         CloudStorageUtils::GetSelectIncCursorSql(query_.GetTableName()) +");";
350     int errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
351     if (errCode != E_OK) {
352         LOGE("[info statement] Get log statement fail! errCode:%d", errCode);
353         return errCode;
354     }
355     std::string selectSql = "select " + columnList + " from " + tableName;
356     if (mode_ == DistributedTableMode::COLLABORATION) {
357         selectSql += " where hash_key = ?;";
358     } else {
359         selectSql += " where hash_key = ? and device = ?;";
360     }
361     errCode = SQLiteUtils::GetStatement(db, selectSql, queryStmt);
362     if (errCode != E_OK) {
363         int ret = E_OK;
364         SQLiteUtils::ResetStatement(logStmt, true, ret);
365         LOGE("[info statement] Get query statement fail! errCode:%d", errCode);
366     }
367     return errCode;
368 }
369 
PrepareStatement(sqlite3 * db,SaveSyncDataStmt & stmt)370 int RelationalSyncDataInserter::PrepareStatement(sqlite3 *db, SaveSyncDataStmt &stmt)
371 {
372     int errCode = GetSaveLogStatement(db, stmt.saveLogStmt, stmt.queryStmt);
373     if (errCode != E_OK) {
374         LOGE("Get save log statement failed. err=%d", errCode);
375         return errCode;
376     }
377     errCode = GetInsertStatement(db, stmt.insertDataStmt);
378     if (errCode != E_OK) {
379         LOGE("Get insert statement failed. err=%d", errCode);
380         return errCode;
381     }
382     errCode = GetUpdateStatement(db, stmt.updateDataStmt);
383     if (errCode != E_OK) {
384         LOGE("Get update statement failed. err=%d", errCode);
385         return errCode;
386     }
387     errCode = GetQueryLogByFieldStmt(db, stmt.queryByFieldStmt);
388     if (errCode != E_OK) {
389         LOGE("Get query by field statement failed. err=%d", errCode);
390     }
391     return errCode;
392 }
393 
Iterate(const std::function<int (DataItem &)> & saveSyncDataItem)394 int RelationalSyncDataInserter::Iterate(const std::function<int (DataItem &)> &saveSyncDataItem)
395 {
396     int errCode = E_OK;
397     for (auto &it : entries_) {
398         it.dev = hashDevId_;
399         int ret = saveSyncDataItem(it);
400         errCode = errCode == E_OK ? ret : errCode;
401     }
402     return errCode;
403 }
404 
GetUpdateStatement(sqlite3 * db,sqlite3_stmt * & stmt)405 int RelationalSyncDataInserter::GetUpdateStatement(sqlite3 *db, sqlite3_stmt *&stmt)
406 {
407     if (stmt != nullptr) {
408         return -E_INVALID_ARGS;
409     }
410 
411     std::set<std::string> identifyKeySet;
412     for (const auto &primaryKey : localTable_.GetIdentifyKey()) {
413         identifyKeySet.insert(primaryKey);
414     }
415     auto distributedPk = localTable_.GetSyncDistributedPk();
416     if (!distributedPk.empty()) {
417         identifyKeySet.insert(distributedPk.begin(), distributedPk.end());
418     }
419     std::string updateValue;
420     const auto &localTableFields = localTable_.GetFields();
421     for (const auto &it : remoteFields_) {
422         if (localTableFields.find(it.GetFieldName()) == localTableFields.end()) {
423             continue; // skip fields which is orphaned in remote
424         }
425         if (identifyKeySet.find(it.GetFieldName()) == identifyKeySet.end()) {
426             if (updateValue.empty()) {
427                 updateValue.append(" SET ");
428             } else {
429                 updateValue.append(", ");
430             }
431             updateValue.append("'").append(it.GetFieldName()).append("'=?");
432         }
433     }
434     if (updateValue.empty()) {
435         // only sync pk no need update
436         return E_OK;
437     }
438     std::string sql = "UPDATE '" + insertTableName_ + "'" + updateValue + " WHERE " +
439         std::string(DBConstant::SQLITE_INNER_ROWID) + " IN (SELECT data_key FROM " +
440         DBConstant::RELATIONAL_PREFIX + localTable_.GetTableName() + "_log ";
441     if (mode_ == DistributedTableMode::COLLABORATION) {
442         sql += "WHERE hash_key=?);";
443     } else {
444         sql += "WHERE hash_key=? AND device=? AND flag&0x01=0);";
445     }
446     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
447     if (errCode != E_OK) {
448         LOGE("Get update data statement fail! errCode:%d", errCode);
449     }
450     return errCode;
451 }
452 
BindHashKeyAndDev(const DataItem & dataItem,sqlite3_stmt * stmt,int beginIndex)453 int RelationalSyncDataInserter::BindHashKeyAndDev(const DataItem &dataItem, sqlite3_stmt *stmt,
454     int beginIndex)
455 {
456     int errCode = SQLiteUtils::BindBlobToStatement(stmt, beginIndex++, dataItem.hashKey);
457     if (errCode != E_OK) {
458         LOGE("[RelationalSyncDataInserter] bind hash key failed %d", errCode);
459         return errCode;
460     }
461     if (mode_ != DistributedTableMode::COLLABORATION) {
462         errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, dataItem.dev);
463         if (errCode != E_OK) {
464             LOGE("[RelationalSyncDataInserter] bind dev failed %d", errCode);
465         }
466     }
467     return errCode;
468 }
469 
SaveSyncLog(sqlite3 * db,const DataItem & dataItem,const DeviceSyncSaveDataInfo & deviceSyncSaveDataInfo,std::map<std::string,Type> & saveVals,SaveSyncDataStmt & saveStmt)470 int RelationalSyncDataInserter::SaveSyncLog(sqlite3 *db, const DataItem &dataItem,
471     const DeviceSyncSaveDataInfo &deviceSyncSaveDataInfo, std::map<std::string, Type> &saveVals,
472     SaveSyncDataStmt &saveStmt)
473 {
474     if (std::get_if<int64_t>(&saveVals[DBConstant::ROWID]) == nullptr) {
475         LOGE("[RelationalSyncDataInserter] Invalid args because of no rowid!");
476         return -E_INVALID_ARGS;
477     }
478     auto updateCursor = CloudStorageUtils::GetCursorIncSql(query_.GetTableName());
479     int errCode = SQLiteUtils::ExecuteRawSQL(db, updateCursor);
480     if (errCode != E_OK) {
481         LOGE("[RelationalSyncDataInserter] update cursor failed %d", errCode);
482         return errCode;
483     }
484     LogInfo logInfoBind;
485     logInfoBind.hashKey = dataItem.hashKey;
486     logInfoBind.device = dataItem.dev;
487     logInfoBind.timestamp = dataItem.timestamp;
488     logInfoBind.flag = dataItem.flag;
489 
490     if (!deviceSyncSaveDataInfo.isExist) { // insert
491         logInfoBind.wTimestamp = dataItem.writeTimestamp;
492         logInfoBind.originDev = dataItem.dev;
493     } else { // update
494         logInfoBind.wTimestamp = deviceSyncSaveDataInfo.localLogInfo.wTimestamp;
495         logInfoBind.originDev = deviceSyncSaveDataInfo.localLogInfo.originDev;
496     }
497     auto statement = saveStmt.saveLogStmt;
498     // bind
499     int bindIndex = 0;
500     // 1 means dataKey index
501     SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, std::get<int64_t>(saveVals[DBConstant::ROWID]));
502     std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
503     SQLiteUtils::BindBlobToStatement(statement, ++bindIndex, originDev); // 2 means ori_dev index
504     SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.timestamp); // 3 means timestamp index
505     SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.wTimestamp); // 4 means w_timestamp index
506     SQLiteUtils::BindInt64ToStatement(statement, ++bindIndex, logInfoBind.flag); // 5 means flag index
507     SQLiteUtils::BindBlobToStatement(statement, ++bindIndex, logInfoBind.hashKey); // 6 means hashKey index
508     BindExtendFieldOrRowid(statement, saveVals, bindIndex); // bind extend_field or rowid
509     errCode = SQLiteUtils::StepWithRetry(statement, false);
510     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
511         return E_OK;
512     }
513     return errCode;
514 }
515 
GetChangedData()516 ChangedData &RelationalSyncDataInserter::GetChangedData()
517 {
518     return data_;
519 }
520 
BindExtendFieldOrRowid(sqlite3_stmt * & stmt,std::map<std::string,Type> & saveVals,int bindIndex)521 void RelationalSyncDataInserter::BindExtendFieldOrRowid(sqlite3_stmt *&stmt, std::map<std::string, Type> &saveVals,
522     int bindIndex)
523 {
524     TrackerTable trackerTable = localTable_.GetTrackerTable();
525     if (trackerTable.GetExtendNames().empty()) {
526         SQLiteUtils::BindTextToStatement(stmt, ++bindIndex, "");
527     } else {
528         SQLiteUtils::BindInt64ToStatement(stmt, ++bindIndex, std::get<int64_t>(saveVals[DBConstant::ROWID]));
529     }
530 }
531 
GetRemoteFields() const532 std::vector<FieldInfo> RelationalSyncDataInserter::GetRemoteFields() const
533 {
534     return remoteFields_;
535 }
536 
GetQueryLogByFieldStmt(sqlite3 * db,sqlite3_stmt * & stmt)537 int RelationalSyncDataInserter::GetQueryLogByFieldStmt(sqlite3 *db, sqlite3_stmt *&stmt)
538 {
539     if (mode_ != DistributedTableMode::COLLABORATION) {
540         stmt = nullptr;
541         return E_OK;
542     }
543     auto syncPk = localTable_.GetSyncDistributedPk();
544     if (syncPk.empty()) {
545         stmt = nullptr;
546         return E_OK;
547     }
548     std::string sql("SELECT ");
549     std::string columnList = "log.data_key, log.device, log.ori_device, log.timestamp, log.wtimestamp, log.flag,"
550         " log.hash_key, log.extend_field";
551     auto table = localTable_.GetTableName();
552     sql.append(columnList).append(" FROM ").append(DBCommon::GetLogTableName(table))
553         .append(" AS log, ").append(table)
554         .append(" AS data WHERE log.data_key = data._rowid_ AND ");
555     for (size_t i = 0; i < syncPk.size(); ++i) {
556         if (i != 0) {
557             sql.append(", ");
558         }
559         sql.append("data.'").append(syncPk[i]).append("' = ?");
560     }
561     auto errCode = SQLiteUtils::GetStatement(db, sql, stmt);
562     if (errCode != E_OK) {
563         LOGE("[RelationalSyncDataInserter] Get query [%s][%zu] log stmt failed[%d]",
564             DBCommon::StringMiddleMasking(table).c_str(), table.size(), errCode);
565     }
566     return errCode;
567 }
568 } // namespace DistributedDB