• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25 
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam &param,
28     const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30     auto [db, isMemory] = param;
31     bool stepNext = false;
32     auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33     if (errCode != E_OK) {
34         token.ReleaseCloudQueryStmt();
35         return errCode;
36     }
37     UploadDetail detail;
38     auto &[stepNum, totalSize] = detail;
39     do {
40         if (stepNext) {
41             errCode = SQLiteUtils::StepNext(stmt, isMemory);
42             if (errCode != E_OK) {
43                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44                 break;
45             }
46         }
47         stepNext = true;
48         errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49         stepNum++;
50     } while (errCode == E_OK);
51     LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52          data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53     if (errCode != -E_UNFINISHED) {
54         token.ReleaseCloudQueryStmt();
55     } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56         token.ReleaseCloudQueryStmt();
57     }
58     return errCode;
59 }
60 
GetMaxTimeStamp(std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(std::vector<VBucket> &dataExtend)
62 {
63     Timestamp maxTimeStamp = 0;
64     VBucket lastRecord = dataExtend.back();
65     auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66     if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67         maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68     }
69     return maxTimeStamp;
70 }
71 
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token, CloudSyncData &data)
73 {
74     Timestamp maxTimeStamp = 0;
75     switch (data.mode) {
76         case DistributedDB::CloudWaterType::DELETE:
77             maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
78             break;
79         case DistributedDB::CloudWaterType::UPDATE:
80             maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
81             break;
82         case DistributedDB::CloudWaterType::INSERT:
83             maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
84             break;
85         case DistributedDB::CloudWaterType::BUTT:
86         default:
87             break;
88     }
89     if (maxTimeStamp > token.GetQueryBeginTime()) {
90         token.SetNextBeginTime("", maxTimeStamp);
91         return true;
92     }
93     LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
94     return false;
95 }
96 
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)97 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
98     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
99 {
100     auto &[stepNum, totalSize] = detail;
101     VBucket log;
102     VBucket extraLog;
103     uint32_t preSize = totalSize;
104     GetCloudLog(statement, log, totalSize);
105     GetCloudExtraLog(statement, extraLog);
106 
107     VBucket data;
108     int64_t flag = 0;
109     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
110     if (errCode != E_OK) {
111         return errCode;
112     }
113 
114     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
115         errCode = GetCloudKvData(statement, data, totalSize);
116         if (errCode != E_OK) {
117             return errCode;
118         }
119     }
120 
121     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
122         errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
123     } else {
124         errCode = -E_UNFINISHED;
125     }
126     if (errCode == -E_IGNORE_DATA) {
127         errCode = E_OK;
128         totalSize = preSize;
129         stepNum--;
130     }
131     return errCode;
132 }
133 
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize)134 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
135     uint32_t &totalSize)
136 {
137     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
138     uint64_t curTime = 0;
139     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
140         if (modifyTime > static_cast<int64_t>(curTime)) {
141             modifyTime = static_cast<int64_t>(curTime);
142         }
143     } else {
144         LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
145     }
146     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
147     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
148         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
149     totalSize += sizeof(int64_t) + sizeof(int64_t);
150     if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
151         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
152             sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
153         if (!cloudGid.empty()) {
154             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
155             totalSize += cloudGid.size();
156         }
157     }
158     std::string version;
159     SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
160     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
161     totalSize += version.size();
162 }
163 
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)164 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
165 {
166     flags.insert_or_assign(CloudDbConstant::ROWID,
167         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
168     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
169         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
170     flags.insert_or_assign(CloudDbConstant::FLAG,
171         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
172     Bytes hashKey;
173     (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
174     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
175     flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
176         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
177 }
178 
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)179 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
180 {
181     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
182     if (errCode != E_OK) {
183         return errCode;
184     }
185     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
186     if (errCode != E_OK) {
187         return errCode;
188     }
189     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
190     if (errCode != E_OK) {
191         return errCode;
192     }
193     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
194         totalSize);
195     if (errCode != E_OK) {
196         return errCode;
197     }
198     data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
199         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
200     totalSize += sizeof(int64_t);
201     return E_OK;
202 }
203 
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)204 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
205     VBucket &data, uint32_t &totalSize)
206 {
207     std::vector<uint8_t> blob;
208     int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
209     if (errCode != E_OK) {
210         LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
211         return errCode;
212     }
213     std::string tmp = std::string(blob.begin(), blob.end());
214     if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
215         keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
216         if (tmp.empty()) {
217             errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
218             if (errCode != E_OK) {
219                 return errCode;
220             }
221             tmp = DBCommon::TransferHashString(tmp);
222         }
223         tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
224     }
225     totalSize += tmp.size();
226     data.insert_or_assign(keyStr, tmp);
227     return E_OK;
228 }
229 
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData)230 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
231     const VBucket &cloudData)
232 {
233     std::pair<int, DataInfoWithLog> res;
234     int &errCode = res.first;
235     std::string keyStr;
236     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
237     if (errCode == -E_NOT_FOUND) {
238         errCode = E_OK;
239     }
240     if (errCode != E_OK) {
241         LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
242         return res;
243     }
244     Bytes key;
245     DBCommon::StringToVector(keyStr, key);
246     Bytes hashKey;
247     DBCommon::CalcValueHash(key, hashKey);
248     sqlite3_stmt *stmt = nullptr;
249     std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty());
250     if (errCode != E_OK) {
251         LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
252         return res;
253     }
254     std::string gid;
255     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
256     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
257         LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
258         return res;
259     }
260     return GetLogInfoInner(stmt, isMemory, gid, hashKey);
261 }
262 
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey)263 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
264     bool existKey)
265 {
266     std::pair<int, sqlite3_stmt*> res;
267     auto &[errCode, stmt] = res;
268     std::string sql = QUERY_CLOUD_SYNC_DATA_LOG;
269     sql += " WHERE cloud_gid = ?";
270     if (existKey) {
271         sql += " UNION ";
272         sql += QUERY_CLOUD_SYNC_DATA_LOG;
273         sql += " WHERE sync_data.hash_key = ?";
274     }
275     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
276     return res;
277 }
278 
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key)279 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
280     const std::string &gid, const Bytes &key)
281 {
282     ResFinalizer finalizer([stmt]() {
283         sqlite3_stmt *statement = stmt;
284         int ret = E_OK;
285         SQLiteUtils::ResetStatement(statement, true, ret);
286         if (ret != E_OK) {
287             LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
288         }
289     });
290     std::pair<int, DataInfoWithLog> res;
291     auto &[errCode, logInfo] = res;
292     int index = 1;
293     errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
294     if (errCode != E_OK) {
295         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
296         return res;
297     }
298     if (!key.empty()) {
299         errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key);
300         if (errCode != E_OK) {
301             LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
302             return res;
303         }
304     }
305     errCode = SQLiteUtils::StepNext(stmt, isMemory);
306     if (errCode == -E_FINISHED) {
307         errCode = -E_NOT_FOUND;
308         // not found is ok, just return error
309         return res;
310     }
311     if (errCode != E_OK) {
312         LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
313         return res;
314     }
315     logInfo = FillLogInfoWithStmt(stmt);
316     return res;
317 }
318 
FillLogInfoWithStmt(sqlite3_stmt * stmt)319 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
320 {
321     DataInfoWithLog dataInfoWithLog;
322     int index = 0;
323     dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
324     dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
325     std::vector<uint8_t> device;
326     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
327     DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
328     std::vector<uint8_t> oriDev;
329     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
330     DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
331     dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
332     dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
333     std::string gid;
334     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
335     dataInfoWithLog.logInfo.cloudGid = gid;
336     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
337     Bytes key;
338     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
339     std::string keyStr(key.begin(), key.end());
340     dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
341     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
342     dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
343     return dataInfoWithLog;
344 }
345 
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)346 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
347 {
348     if (downloadData.data.size() != downloadData.opType.size()) {
349         LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
350             downloadData.opType.size());
351         return -E_CLOUD_ERROR;
352     }
353     std::map<int, int> statisticMap = {};
354     int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
355     LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
356         " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
357         " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
358         errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
359         statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
360         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
361         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
362         statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
363         statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
364     return errCode;
365 }
366 
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)367 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
368     DownloadData &downloadData, std::map<int, int> &statisticMap)
369 {
370     int index = 0;
371     int errCode = E_OK;
372     for (OpType op : downloadData.opType) {
373         switch (op) {
374             case OpType::INSERT: // fallthrough
375             case OpType::UPDATE: // fallthrough
376             case OpType::DELETE: // fallthrough
377                 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
378                 break;
379             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
380             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
381             case OpType::UPDATE_TIMESTAMP:               // fallthrough
382                 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
383                 if (errCode != E_OK) {
384                     break;
385                 }
386                 [[fallthrough]];
387             case OpType::ONLY_UPDATE_GID:                // fallthrough
388             case OpType::NOT_HANDLE:                     // fallthrough
389             case OpType::CLEAR_GID:                      // fallthrough
390                 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
391                 break;
392             default:
393                 errCode = -E_CLOUD_ERROR;
394                 break;
395         }
396         if (errCode != E_OK) {
397             LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
398             return errCode;
399         }
400         statisticMap[static_cast<int>(op)]++;
401         index++;
402     }
403     return errCode;
404 }
405 
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)406 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
407     DownloadData &downloadData)
408 {
409     sqlite3_stmt *logStmt = nullptr;
410     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
411     if (errCode != E_OK) {
412         LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
413         return errCode;
414     }
415     sqlite3_stmt *dataStmt = nullptr;
416     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
417     if (errCode != E_OK) {
418         int ret = E_OK;
419         SQLiteUtils::ResetStatement(logStmt, true, ret);
420         LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
421         return errCode;
422     }
423     ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
424         sqlite3_stmt *statement = logStmt;
425         int ret = E_OK;
426         SQLiteUtils::ResetStatement(statement, true, ret);
427         if (ret != E_OK) {
428             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
429         }
430         statement = dataStmt;
431         SQLiteUtils::ResetStatement(statement, true, ret);
432         if (ret != E_OK) {
433             LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
434         }
435     });
436     errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
437     if (errCode != E_OK) {
438         return errCode;
439     }
440     return StepStmt(logStmt, dataStmt, isMemory);
441 }
442 
GetOperateDataSql(OpType opType)443 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
444 {
445     switch (opType) {
446         case OpType::INSERT:
447             return INSERT_SYNC_SQL;
448         case OpType::UPDATE: // fallthrough
449         case OpType::DELETE:
450             return UPDATE_SYNC_SQL;
451         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
452             return SET_SYNC_DATA_NO_FORCE_PUSH;
453         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
454             return SET_SYNC_DATA_FORCE_PUSH;
455         case OpType::UPDATE_TIMESTAMP:
456             return UPDATE_TIMESTAMP;
457         default:
458             return "";
459     }
460 }
461 
GetOperateLogSql(OpType opType)462 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
463 {
464     switch (opType) {
465         case OpType::INSERT: // fallthrough
466         case OpType::UPDATE:
467             return INSERT_CLOUD_SYNC_DATA_LOG;
468         case OpType::DELETE:
469             return UPDATE_CLOUD_SYNC_DATA_LOG;
470         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
471         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
472         case OpType::UPDATE_TIMESTAMP:               // fallthrough
473         case OpType::ONLY_UPDATE_GID:                // fallthrough
474         case OpType::NOT_HANDLE:                     // fallthrough
475         case OpType::CLEAR_GID:                      // fallthrough
476             return UPSERT_CLOUD_SYNC_DATA_LOG;
477         default:
478             return "";
479     }
480 }
481 
TransToOpType(const CloudWaterType type)482 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
483 {
484     switch (type) {
485         case CloudWaterType::INSERT:
486             return OpType::INSERT;
487         case CloudWaterType::UPDATE:
488             return OpType::UPDATE;
489         case CloudWaterType::DELETE:
490             return OpType::DELETE;
491         default:
492             return OpType::NOT_HANDLE;
493     }
494 }
495 
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)496 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
497     const DataItem &dataItem)
498 {
499     int index = 0;
500     int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
501     if (errCode != E_OK) {
502         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
503         return errCode;
504     }
505     errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
506     if (errCode != E_OK) {
507         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
508     }
509     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
510     if (errCode != E_OK) {
511         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
512         return errCode;
513     }
514     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
515     if (errCode != E_OK) {
516         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
517         return errCode;
518     }
519     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
520     if (errCode != E_OK) {
521         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
522         return errCode;
523     }
524     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
525     if (errCode != E_OK) {
526         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
527         return errCode;
528     }
529     return errCode;
530 }
531 
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)532 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
533     DownloadData &downloadData)
534 {
535     auto [errCode, dataItem] = GetDataItem(index, downloadData);
536     if (errCode != E_OK) {
537         return errCode;
538     }
539     switch (opType) {
540         case OpType::INSERT:
541             return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
542         case OpType::UPDATE:
543             return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
544         case OpType::DELETE:
545             dataItem.hashKey = downloadData.existDataHashKey[index];
546             dataItem.gid.clear();
547             dataItem.version.clear();
548             return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
549         default:
550             return E_OK;
551     }
552 }
553 
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)554 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
555     const std::string &user, const DataItem &dataItem)
556 {
557     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
558     if (errCode != E_OK) {
559         return errCode;
560     }
561     return BindDataStmt(dataStmt, dataItem, true);
562 }
563 
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)564 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
565     const DataItem &dataItem)
566 {
567     int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
568     if (errCode != E_OK) {
569         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
570         return errCode;
571     }
572     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
573     if (errCode != E_OK) {
574         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
575         return errCode;
576     }
577     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
578     if (errCode != E_OK) {
579         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
580         return errCode;
581     }
582     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
583     if (errCode != E_OK) {
584         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
585         return errCode;
586     }
587     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, dataItem.cloud_flag);
588     if (errCode != E_OK) {
589         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
590     }
591     return errCode;
592 }
593 
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)594 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
595     const DataItem &dataItem)
596 {
597     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
598     if (errCode != E_OK) {
599         return errCode;
600     }
601     errCode = BindDataStmt(dataStmt, dataItem, false);
602     if (errCode != E_OK) {
603         return errCode;
604     }
605     return E_OK;
606 }
607 
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)608 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
609     const DataItem &dataItem)
610 {
611     int index = 1;
612     int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
613     if (errCode != E_OK) {
614         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
615         return errCode;
616     }
617     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
618     if (errCode != E_OK) {
619         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
620         return errCode;
621     }
622     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
623     if (errCode != E_OK) {
624         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
625         return errCode;
626     }
627     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
628     if (errCode != E_OK) {
629         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
630         return errCode;
631     }
632     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
633     if (errCode != E_OK) {
634         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
635     }
636     return errCode;
637 }
638 
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)639 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
640     DataItem &dataItem)
641 {
642     dataItem.key = {};
643     dataItem.value = {};
644     dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
645     int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
646     if (errCode != E_OK) {
647         return errCode;
648     }
649     errCode = BindDataStmt(dataStmt, dataItem, false);
650     if (errCode != E_OK) {
651         return errCode;
652     }
653     return E_OK;
654 }
655 
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)656 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
657 {
658     int index = 1;
659     int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
660     if (errCode != E_OK) {
661         return errCode;
662     }
663     errCode = BindCloudDataStmt(dataStmt, dataItem, index);
664     if (errCode != E_OK) {
665         return errCode;
666     }
667     if (!isInsert) {
668         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
669         if (errCode != E_OK) {
670             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
671         }
672     }
673     return errCode;
674 }
675 
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)676 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
677     int &index)
678 {
679     int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
680     if (errCode != E_OK) {
681         LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
682         return errCode;
683     }
684     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
685     if (errCode != E_OK) {
686         LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
687         return errCode;
688     }
689     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
690     if (errCode != E_OK) {
691         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
692         return errCode;
693     }
694     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
695     if (errCode != E_OK) {
696         LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
697         return errCode;
698     }
699     Bytes bytes;
700     DBCommon::StringToVector(dataItem.dev, bytes);
701     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
702     if (errCode != E_OK) {
703         LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
704         return errCode;
705     }
706     DBCommon::StringToVector(dataItem.origDev, bytes);
707     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
708     if (errCode != E_OK) {
709         LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
710         return errCode;
711     }
712     if (isInsert) {
713         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
714         if (errCode != E_OK) {
715             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
716             return errCode;
717         }
718     }
719     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
720     if (errCode != E_OK) {
721         LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
722     }
723     return errCode;
724 }
725 
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)726 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
727 {
728     int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
729     if (errCode != E_OK) {
730         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
731         return errCode;
732     }
733     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
734     if (errCode != E_OK) {
735         LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
736     }
737     return errCode;
738 }
739 
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)740 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
741 {
742     int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
743     if (errCode != -E_FINISHED) {
744         LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
745         return errCode;
746     }
747     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
748     if (errCode != -E_FINISHED) {
749         LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
750         return errCode;
751     }
752     return E_OK;
753 }
754 
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)755 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam &param, OpType opType, const CloudSyncData &data,
756     const std::string &user, CloudUploadRecorder &recorder)
757 {
758     if (param.first == nullptr) {
759         LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
760         return -E_INVALID_ARGS;
761     }
762     if (data.isCloudVersionRecord) {
763         int errCode = FillCloudVersionRecord(param.first, opType, data);
764         if (errCode != E_OK) {
765             return errCode;
766         }
767     }
768     switch (opType) {
769         case OpType::INSERT:
770             return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
771         case OpType::UPDATE:
772             return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
773         case OpType::DELETE:
774             return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
775         default:
776             return E_OK;
777     }
778 }
779 
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)780 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
781     DownloadData &downloadData)
782 {
783     if (downloadData.existDataHashKey[index].empty()) {
784         return E_OK;
785     }
786     sqlite3_stmt *logStmt = nullptr;
787     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
788     if (errCode != E_OK) {
789         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
790         return errCode;
791     }
792     ResFinalizer finalizerData([logStmt]() {
793         sqlite3_stmt *statement = logStmt;
794         int ret = E_OK;
795         SQLiteUtils::ResetStatement(statement, true, ret);
796         if (ret != E_OK) {
797             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
798         }
799     });
800     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
801     if (res.first != E_OK) {
802         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
803         return res.first;
804     }
805     bool clearCloudInfo = (op == OpType::CLEAR_GID);
806     if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
807         res.second.hashKey = downloadData.existDataHashKey[index];
808         clearCloudInfo = true;
809     }
810     if (clearCloudInfo) {
811         res.second.gid.clear();
812         res.second.version.clear();
813     }
814     errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
815     if (errCode != E_OK) {
816         return errCode;
817     }
818     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
819     if (errCode == -E_FINISHED) {
820         errCode = E_OK;
821     }
822     return errCode;
823 }
824 
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)825 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam &param, const CloudSyncBatch &data,
826     const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
827 {
828     auto [db, ignoreEmptyGid] = param;
829     sqlite3_stmt *logStmt = nullptr;
830     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
831     ResFinalizer finalizerData([logStmt]() {
832         sqlite3_stmt *statement = logStmt;
833         int ret = E_OK;
834         SQLiteUtils::ResetStatement(statement, true, ret);
835         if (ret != E_OK) {
836             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
837         }
838     });
839     for (size_t i = 0; i < data.hashKey.size(); ++i) {
840         if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
841             continue;
842         }
843         DataItem dataItem;
844         errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
845         if (dataItem.gid.empty() && ignoreEmptyGid) {
846             continue;
847         }
848         if (errCode != E_OK) {
849             return errCode;
850         }
851         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
852         dataItem.hashKey = data.hashKey[i];
853         errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
854         if (errCode != E_OK) {
855             return errCode;
856         }
857         errCode = SQLiteUtils::StepNext(logStmt, false);
858         if (errCode == -E_FINISHED) {
859             errCode = E_OK;
860         }
861         if (errCode != E_OK) {
862             LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
863             return errCode;
864         }
865         SQLiteUtils::ResetStatement(logStmt, false, errCode);
866         MarkUploadSuccess(param, data, user, i);
867         // ignored version record
868         if (i >= data.timestamp.size()) {
869             continue;
870         }
871         recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
872     }
873     return E_OK;
874 }
875 
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)876 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
877     DownloadData &downloadData)
878 {
879     if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
880         opType != OpType::UPDATE_TIMESTAMP) {
881         LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
882         return E_OK;
883     }
884     sqlite3_stmt *dataStmt = nullptr;
885     int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
886     if (errCode != E_OK) {
887         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
888         return errCode;
889     }
890     ResFinalizer finalizerData([dataStmt]() {
891         sqlite3_stmt *statement = dataStmt;
892         int ret = E_OK;
893         SQLiteUtils::ResetStatement(statement, true, ret);
894         if (ret != E_OK) {
895             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
896         }
897     });
898     errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
899     if (errCode != E_OK) {
900         LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
901         return errCode;
902     }
903     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
904     if (errCode == -E_FINISHED) {
905         errCode = E_OK;
906     }
907     return errCode;
908 }
909 
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)910 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
911     DownloadData &downloadData)
912 {
913     switch (opType) {
914         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
915         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
916             return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
917         case OpType::UPDATE_TIMESTAMP:
918             return BindUpdateTimestampStmt(dataStmt, index, downloadData);
919         default:
920             return E_OK;
921     }
922 }
923 
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)924 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
925 {
926     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
927     auto &[errCode, dataItem] = res;
928     if (errCode != E_OK) {
929         return errCode;
930     }
931     int currentBindIndex = 1; // bind sql index start at 1
932     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
933     if (errCode != E_OK) {
934         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
935         return errCode;
936     }
937     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.modifyTime);
938     if (errCode != E_OK) {
939         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
940     }
941     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
942     if (errCode != E_OK) {
943         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
944         return errCode;
945     }
946     return E_OK;
947 }
948 
GetDataItem(int index,DownloadData & downloadData)949 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
950 {
951     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
952     auto &[errCode, dataItem] = res;
953     if (errCode != E_OK) {
954         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
955         return res;
956     }
957     std::string dev;
958     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
959     if (errCode != E_OK) {
960         return res;
961     }
962     dev = DBCommon::TransferHashString(dev);
963     auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
964     if (!decodeDevice.empty()) {
965         dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
966     }
967     if (dataItem.dev == dev) {
968         dataItem.dev = "";
969     }
970     decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
971     if (!decodeDevice.empty()) {
972         dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
973     }
974     if (dataItem.origDev == dev) {
975         dataItem.origDev = "";
976     }
977     dataItem.timestamp = static_cast<Timestamp>(static_cast<int64_t>(dataItem.modifyTime) + downloadData.timeOffset);
978     dataItem.writeTimestamp = dataItem.timestamp; // writeTimestamp is process conflict time
979     return res;
980 }
981 
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,std::string & sql)982 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
983     const Timestamp &timestamp, const std::string &user, std::string &sql)
984 {
985     std::pair<int, int64_t> res;
986     auto &[errCode, count] = res;
987     sqlite3_stmt *stmt = nullptr;
988     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
989     if (errCode != E_OK) {
990         LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
991         return res;
992     }
993     ResFinalizer finalizer([stmt]() {
994         sqlite3_stmt *statement = stmt;
995         int ret = E_OK;
996         SQLiteUtils::ResetStatement(statement, true, ret);
997         if (ret != E_OK) {
998             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
999         }
1000     });
1001     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1002     if (errCode != E_OK) {
1003         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1004         return res;
1005     }
1006     errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1007     if (errCode != E_OK) {
1008         LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1009         return res;
1010     }
1011     errCode = SQLiteUtils::StepNext(stmt, isMemory);
1012     if (errCode == -E_FINISHED) {
1013         count = 0;
1014         return res;
1015     }
1016     count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1017     LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1018     return res;
1019 }
1020 
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1021 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1022     const Timestamp &timestamp, const std::string &user, bool forcePush)
1023 {
1024     std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1025     return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1026 }
1027 
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1028 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam &param,
1029     const std::vector<Timestamp> &timestampVec, const std::string &user, bool forcePush,
1030     QuerySyncObject &querySyncObject)
1031 {
1032     std::pair<int, int64_t> res = { E_OK, 0 };
1033     auto &[errCode, count] = res;
1034     if (timestampVec.size() != 3) { // 3 is the number of three mode.
1035         errCode = -E_INVALID_ARGS;
1036         return res;
1037     }
1038     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1039     SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1040     if (errCode != E_OK) {
1041         return res;
1042     }
1043     for (size_t i = 0; i < typeVec.size(); i++) {
1044         sqlite3_stmt *stmt = nullptr;
1045         errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1046         if (errCode != E_OK) {
1047             return res;
1048         }
1049         // count no use watermark
1050         auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1051         if (err != E_OK) {
1052             return { err, 0 };
1053         }
1054         count += cnt;
1055     }
1056     return res;
1057 }
1058 
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1059 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1060 {
1061     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1062         return E_OK;
1063     }
1064     bool isInsert = (opType == OpType::INSERT);
1065     CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1066     if (syncBatch.record.empty()) {
1067         LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1068         return E_OK;
1069     }
1070     syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1071     auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1072     auto &[errCode, dataItem] = res;
1073     sqlite3_stmt *dataStmt = nullptr;
1074     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1075     if (errCode != E_OK) {
1076         LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1077         return errCode;
1078     }
1079     ResFinalizer finalizerData([dataStmt]() {
1080         int ret = E_OK;
1081         sqlite3_stmt *statement = dataStmt;
1082         SQLiteUtils::ResetStatement(statement, true, ret);
1083         if (ret != E_OK) {
1084             LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1085         }
1086     });
1087     errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1088     if (errCode != E_OK) {
1089         return errCode;
1090     }
1091     errCode = SQLiteUtils::StepNext(dataStmt, false);
1092     if (errCode != -E_FINISHED) {
1093         LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1094         return errCode;
1095     }
1096     return E_OK;
1097 }
1098 
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1099 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1100     const std::string &user)
1101 {
1102     auto res = GetLocalCloudVersionInner(db, isMemory, user);
1103     if (res.first != E_OK) {
1104         LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1105     }
1106     return res;
1107 }
1108 
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1109 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1110     const std::string &user)
1111 {
1112     std::pair<int, CloudSyncData> res;
1113     auto &[errCode, syncData] = res;
1114     auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1115     sqlite3_stmt *stmt = nullptr;
1116     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1117     if (errCode != E_OK) {
1118         return res;
1119     }
1120     ResFinalizer finalizerData([stmt]() {
1121         int ret = E_OK;
1122         sqlite3_stmt *statement = stmt;
1123         SQLiteUtils::ResetStatement(statement, true, ret);
1124         if (ret != E_OK) {
1125             LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1126         }
1127     });
1128     std::string hashDev;
1129     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1130     if (errCode != E_OK) {
1131         return res;
1132     }
1133     std::string tempDev = DBCommon::TransferHashString(hashDev);
1134     hashDev = DBCommon::TransferStringToHex(tempDev);
1135     std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1136     Key keyVec;
1137     DBCommon::StringToVector(key, keyVec);
1138     errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1139     if (errCode != E_OK) {
1140         return res;
1141     }
1142     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1143     if (errCode != E_OK) {
1144         return res;
1145     }
1146     errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1147     if (errCode == -E_NOT_FOUND) {
1148         InitDefaultCloudVersionRecord(key, tempDev, syncData);
1149         errCode = E_OK;
1150     }
1151     return res;
1152 }
1153 
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1154 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1155 {
1156     int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1157     if (errCode == -E_FINISHED) {
1158         return -E_NOT_FOUND;
1159     }
1160     if (errCode != E_OK) {
1161         LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1162         return errCode;
1163     }
1164     CloudSyncConfig config;
1165     config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1166     config.maxUploadCount = 1;
1167     CloudUploadRecorder recorder; // ignore last record
1168     UploadDetail detail;
1169     errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1170     return errCode;
1171 }
1172 
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1173 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1174     CloudSyncData &syncData)
1175 {
1176     LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1177     VBucket defaultRecord;
1178     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1179     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1180     auto encodeDev = DBBase64Utils::Encode(dev);
1181     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1182     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1183     syncData.insData.record.push_back(std::move(defaultRecord));
1184     VBucket defaultExtend;
1185     defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1186     syncData.insData.extend.push_back(std::move(defaultExtend));
1187     syncData.insData.assets.emplace_back();
1188     Bytes bytesHashKey;
1189     DBCommon::StringToVector(key, bytesHashKey);
1190     syncData.insData.hashKey.push_back(bytesHashKey);
1191 }
1192 
BindVersionStmt(const std::string & device,const std::string & user,sqlite3_stmt * dataStmt)1193 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, const std::string &user,
1194     sqlite3_stmt *dataStmt)
1195 {
1196     std::string hashDevice;
1197     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1198     if (errCode != E_OK) {
1199         return errCode;
1200     }
1201     Bytes bytes;
1202     if (device == hashDevice) {
1203         DBCommon::StringToVector("", bytes);
1204     } else {
1205         hashDevice = DBCommon::TransferHashString(device);
1206         DBCommon::StringToVector(hashDevice, bytes);
1207     }
1208     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1209     if (errCode != E_OK) {
1210         LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1211     }
1212     return errCode;
1213 }
1214 
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & user,const std::string & device,std::vector<VBucket> & dataVector)1215 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &user,
1216     const std::string &device, std::vector<VBucket> &dataVector)
1217 {
1218     sqlite3_stmt *dataStmt = nullptr;
1219     bool isDeviceEmpty = device.empty();
1220     std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1221     int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1222     if (errCode != E_OK) {
1223         LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1224         return errCode;
1225     }
1226     ResFinalizer finalizerData([dataStmt]() {
1227         int ret = E_OK;
1228         sqlite3_stmt *statement = dataStmt;
1229         SQLiteUtils::ResetStatement(statement, true, ret);
1230         if (ret != E_OK) {
1231             LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1232         }
1233     });
1234     if (!isDeviceEmpty) {
1235         errCode = BindVersionStmt(device, user, dataStmt);
1236         if (errCode != E_OK) {
1237             return errCode;
1238         }
1239     }
1240     uint32_t totalSize = 0;
1241     do {
1242         errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1243         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1244             errCode = E_OK;
1245             break;
1246         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1247             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1248             break;
1249         }
1250         VBucket data;
1251         errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1252         dataVector.push_back(data);
1253     } while (errCode == E_OK);
1254     return errCode;
1255 }
1256 
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1257 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1258 {
1259     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1260     if (errCode != E_OK) {
1261         return errCode;
1262     }
1263     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1264     if (errCode != E_OK) {
1265         return errCode;
1266     }
1267     return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1268 }
1269 
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1270 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1271 {
1272     sqlite3_stmt *stmt = nullptr;
1273     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1274     if (errCode != E_OK) {
1275         LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1276         return errCode;
1277     }
1278     ResFinalizer finalizerData([stmt]() {
1279         int ret = E_OK;
1280         sqlite3_stmt *statement = stmt;
1281         SQLiteUtils::ResetStatement(statement, true, ret);
1282         if (ret != E_OK) {
1283             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1284         }
1285     });
1286     uint32_t totalSize = 0;
1287     do {
1288         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1289         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1290             errCode = E_OK;
1291             break;
1292         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1293             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1294             break;
1295         }
1296         VBucket key;
1297         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1298         if (errCode != E_OK) {
1299             return errCode;
1300         }
1301         data.push_back(key);
1302     } while (errCode == E_OK);
1303     return errCode;
1304 }
1305 
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1306 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1307     QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1308 {
1309     int errCode = E_OK;
1310     QuerySyncObject query = querySyncObject;
1311     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1312     if (errCode != E_OK) {
1313         return errCode;
1314     }
1315     sqlite3_stmt *stmt = nullptr;
1316     errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1317     if (errCode != E_OK) {
1318         return errCode;
1319     }
1320     do {
1321         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1322         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1323             errCode = E_OK;
1324             break;
1325         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1326             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1327             break;
1328         }
1329         std::string gid;
1330         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1331         cloudGid.push_back(gid);
1332     } while (errCode == E_OK);
1333     int ret = E_OK;
1334     SQLiteUtils::ResetStatement(stmt, true, ret);
1335     return errCode == E_OK ? ret : errCode;
1336 }
1337 
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1338 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1339     const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1340 {
1341     DataItem wItem = dataItem;
1342     if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type)) {
1343         wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1344     }
1345     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1346         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1347         wItem.gid = {};
1348         wItem.version = {};
1349     }
1350     int errCode = E_OK;
1351     if (type == CloudWaterType::DELETE) {
1352         if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1353             errCode = BindUpdateLogStmt(logStmt, user, wItem);
1354         }
1355     } else {
1356         errCode = BindInsertLogStmt(logStmt, user, wItem);
1357     }
1358     if (errCode != E_OK) {
1359         LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1360     }
1361     return errCode;
1362 }
1363 
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1364 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam &param, const CloudSyncBatch &data,
1365     const std::string &user, size_t dataIndex)
1366 {
1367     if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1368         LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1369         return;
1370     }
1371     if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1372         return;
1373     }
1374     if (CheckDataChanged(param, data, dataIndex)) {
1375         LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1376         return;
1377     }
1378     MarkUploadSuccessInner(param, data, user, dataIndex);
1379 }
1380 
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1381 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam &param,
1382     const CloudSyncBatch &data, size_t dataIndex)
1383 {
1384     sqlite3_stmt *checkStmt = nullptr;
1385     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1386     ResFinalizer finalizerData([checkStmt]() {
1387         sqlite3_stmt *statement = checkStmt;
1388         int ret = E_OK;
1389         SQLiteUtils::ResetStatement(statement, true, ret);
1390         if (ret != E_OK) {
1391             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1392         }
1393     });
1394     int index = 1;
1395     errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1396     if (errCode != E_OK) {
1397         LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1398         return true;
1399     }
1400     errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1401     if (errCode != E_OK) {
1402         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1403         return true;
1404     }
1405     errCode = SQLiteUtils::StepNext(checkStmt);
1406     if (errCode != E_OK) {
1407         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1408         return true;
1409     }
1410     return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1411 }
1412 
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1413 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam &param,
1414     const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1415 {
1416     sqlite3_stmt *logStmt = nullptr;
1417     int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1418     ResFinalizer finalizerData([logStmt]() {
1419         sqlite3_stmt *statement = logStmt;
1420         int ret = E_OK;
1421         SQLiteUtils::ResetStatement(statement, true, ret);
1422         if (ret != E_OK) {
1423             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1424         }
1425     });
1426     int index = 1;
1427     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1428     if (errCode != E_OK) {
1429         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1430         return;
1431     }
1432     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1433     if (errCode != E_OK) {
1434         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1435         return;
1436     }
1437     errCode = SQLiteUtils::StepNext(logStmt);
1438     if (errCode != E_OK && errCode != -E_FINISHED) {
1439         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1440         return;
1441     }
1442 }
1443 }