1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam ¶m,
28 const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30 auto [db, isMemory] = param;
31 bool stepNext = false;
32 auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33 if (errCode != E_OK) {
34 token.ReleaseCloudQueryStmt();
35 return errCode;
36 }
37 UploadDetail detail;
38 auto &[stepNum, totalSize] = detail;
39 do {
40 if (stepNext) {
41 errCode = SQLiteUtils::StepNext(stmt, isMemory);
42 if (errCode != E_OK) {
43 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44 break;
45 }
46 }
47 stepNext = true;
48 errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49 stepNum++;
50 } while (errCode == E_OK);
51 LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52 data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53 if (errCode != -E_UNFINISHED) {
54 token.ReleaseCloudQueryStmt();
55 } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56 token.ReleaseCloudQueryStmt();
57 }
58 return errCode;
59 }
60
GetMaxTimeStamp(std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(std::vector<VBucket> &dataExtend)
62 {
63 Timestamp maxTimeStamp = 0;
64 VBucket lastRecord = dataExtend.back();
65 auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66 if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67 maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68 }
69 return maxTimeStamp;
70 }
71
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token, CloudSyncData &data)
73 {
74 Timestamp maxTimeStamp = 0;
75 switch (data.mode) {
76 case DistributedDB::CloudWaterType::DELETE:
77 maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
78 break;
79 case DistributedDB::CloudWaterType::UPDATE:
80 maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
81 break;
82 case DistributedDB::CloudWaterType::INSERT:
83 maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
84 break;
85 case DistributedDB::CloudWaterType::BUTT:
86 default:
87 break;
88 }
89 if (maxTimeStamp > token.GetQueryBeginTime()) {
90 token.SetNextBeginTime("", maxTimeStamp);
91 return true;
92 }
93 LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
94 return false;
95 }
96
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)97 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
98 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
99 {
100 auto &[stepNum, totalSize] = detail;
101 VBucket log;
102 VBucket extraLog;
103 uint32_t preSize = totalSize;
104 GetCloudLog(statement, log, totalSize);
105 GetCloudExtraLog(statement, extraLog);
106
107 VBucket data;
108 int64_t flag = 0;
109 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113
114 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
115 errCode = GetCloudKvData(statement, data, totalSize);
116 if (errCode != E_OK) {
117 return errCode;
118 }
119 }
120
121 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
122 errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
123 } else {
124 errCode = -E_UNFINISHED;
125 }
126 if (errCode == -E_IGNORE_DATA) {
127 errCode = E_OK;
128 totalSize = preSize;
129 stepNum--;
130 }
131 return errCode;
132 }
133
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize)134 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
135 uint32_t &totalSize)
136 {
137 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
138 uint64_t curTime = 0;
139 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
140 if (modifyTime > static_cast<int64_t>(curTime)) {
141 modifyTime = static_cast<int64_t>(curTime);
142 }
143 } else {
144 LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
145 }
146 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
147 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
148 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
149 totalSize += sizeof(int64_t) + sizeof(int64_t);
150 if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
151 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
152 sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
153 if (!cloudGid.empty()) {
154 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
155 totalSize += cloudGid.size();
156 }
157 }
158 std::string version;
159 SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
160 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
161 totalSize += version.size();
162 }
163
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)164 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
165 {
166 flags.insert_or_assign(CloudDbConstant::ROWID,
167 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
168 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
169 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
170 flags.insert_or_assign(CloudDbConstant::FLAG,
171 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
172 Bytes hashKey;
173 (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
174 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
175 flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
176 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
177 }
178
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)179 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
180 {
181 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
182 if (errCode != E_OK) {
183 return errCode;
184 }
185 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
186 if (errCode != E_OK) {
187 return errCode;
188 }
189 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
194 totalSize);
195 if (errCode != E_OK) {
196 return errCode;
197 }
198 data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
199 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
200 totalSize += sizeof(int64_t);
201 return E_OK;
202 }
203
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)204 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
205 VBucket &data, uint32_t &totalSize)
206 {
207 std::vector<uint8_t> blob;
208 int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
209 if (errCode != E_OK) {
210 LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
211 return errCode;
212 }
213 std::string tmp = std::string(blob.begin(), blob.end());
214 if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
215 keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
216 if (tmp.empty()) {
217 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
218 if (errCode != E_OK) {
219 return errCode;
220 }
221 tmp = DBCommon::TransferHashString(tmp);
222 }
223 tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
224 }
225 totalSize += tmp.size();
226 data.insert_or_assign(keyStr, tmp);
227 return E_OK;
228 }
229
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData)230 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
231 const VBucket &cloudData)
232 {
233 std::pair<int, DataInfoWithLog> res;
234 int &errCode = res.first;
235 std::string keyStr;
236 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
237 if (errCode == -E_NOT_FOUND) {
238 errCode = E_OK;
239 }
240 if (errCode != E_OK) {
241 LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
242 return res;
243 }
244 Bytes key;
245 DBCommon::StringToVector(keyStr, key);
246 Bytes hashKey;
247 DBCommon::CalcValueHash(key, hashKey);
248 sqlite3_stmt *stmt = nullptr;
249 std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty());
250 if (errCode != E_OK) {
251 LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
252 return res;
253 }
254 std::string gid;
255 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
256 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
257 LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
258 return res;
259 }
260 return GetLogInfoInner(stmt, isMemory, gid, hashKey);
261 }
262
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey)263 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
264 bool existKey)
265 {
266 std::pair<int, sqlite3_stmt*> res;
267 auto &[errCode, stmt] = res;
268 std::string sql = QUERY_CLOUD_SYNC_DATA_LOG;
269 sql += " WHERE cloud_gid = ?";
270 if (existKey) {
271 sql += " UNION ";
272 sql += QUERY_CLOUD_SYNC_DATA_LOG;
273 sql += " WHERE sync_data.hash_key = ?";
274 }
275 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
276 return res;
277 }
278
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key)279 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
280 const std::string &gid, const Bytes &key)
281 {
282 ResFinalizer finalizer([stmt]() {
283 sqlite3_stmt *statement = stmt;
284 int ret = E_OK;
285 SQLiteUtils::ResetStatement(statement, true, ret);
286 if (ret != E_OK) {
287 LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
288 }
289 });
290 std::pair<int, DataInfoWithLog> res;
291 auto &[errCode, logInfo] = res;
292 int index = 1;
293 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
294 if (errCode != E_OK) {
295 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
296 return res;
297 }
298 if (!key.empty()) {
299 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key);
300 if (errCode != E_OK) {
301 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
302 return res;
303 }
304 }
305 errCode = SQLiteUtils::StepNext(stmt, isMemory);
306 if (errCode == -E_FINISHED) {
307 errCode = -E_NOT_FOUND;
308 // not found is ok, just return error
309 return res;
310 }
311 if (errCode != E_OK) {
312 LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
313 return res;
314 }
315 logInfo = FillLogInfoWithStmt(stmt);
316 return res;
317 }
318
FillLogInfoWithStmt(sqlite3_stmt * stmt)319 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
320 {
321 DataInfoWithLog dataInfoWithLog;
322 int index = 0;
323 dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
324 dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
325 std::vector<uint8_t> device;
326 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
327 DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
328 std::vector<uint8_t> oriDev;
329 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
330 DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
331 dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
332 dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
333 std::string gid;
334 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
335 dataInfoWithLog.logInfo.cloudGid = gid;
336 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
337 Bytes key;
338 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
339 std::string keyStr(key.begin(), key.end());
340 dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
341 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
342 dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
343 return dataInfoWithLog;
344 }
345
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)346 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
347 {
348 if (downloadData.data.size() != downloadData.opType.size()) {
349 LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
350 downloadData.opType.size());
351 return -E_CLOUD_ERROR;
352 }
353 std::map<int, int> statisticMap = {};
354 int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
355 LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
356 " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
357 " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
358 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
359 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
360 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
361 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
362 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
363 statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
364 return errCode;
365 }
366
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)367 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
368 DownloadData &downloadData, std::map<int, int> &statisticMap)
369 {
370 int index = 0;
371 int errCode = E_OK;
372 for (OpType op : downloadData.opType) {
373 switch (op) {
374 case OpType::INSERT: // fallthrough
375 case OpType::UPDATE: // fallthrough
376 case OpType::DELETE: // fallthrough
377 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
378 break;
379 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
380 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
381 case OpType::UPDATE_TIMESTAMP: // fallthrough
382 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
383 if (errCode != E_OK) {
384 break;
385 }
386 [[fallthrough]];
387 case OpType::ONLY_UPDATE_GID: // fallthrough
388 case OpType::NOT_HANDLE: // fallthrough
389 case OpType::CLEAR_GID: // fallthrough
390 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
391 break;
392 default:
393 errCode = -E_CLOUD_ERROR;
394 break;
395 }
396 if (errCode != E_OK) {
397 LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
398 return errCode;
399 }
400 statisticMap[static_cast<int>(op)]++;
401 index++;
402 }
403 return errCode;
404 }
405
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)406 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
407 DownloadData &downloadData)
408 {
409 sqlite3_stmt *logStmt = nullptr;
410 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
411 if (errCode != E_OK) {
412 LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
413 return errCode;
414 }
415 sqlite3_stmt *dataStmt = nullptr;
416 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
417 if (errCode != E_OK) {
418 int ret = E_OK;
419 SQLiteUtils::ResetStatement(logStmt, true, ret);
420 LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
421 return errCode;
422 }
423 ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
424 sqlite3_stmt *statement = logStmt;
425 int ret = E_OK;
426 SQLiteUtils::ResetStatement(statement, true, ret);
427 if (ret != E_OK) {
428 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
429 }
430 statement = dataStmt;
431 SQLiteUtils::ResetStatement(statement, true, ret);
432 if (ret != E_OK) {
433 LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
434 }
435 });
436 errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
437 if (errCode != E_OK) {
438 return errCode;
439 }
440 return StepStmt(logStmt, dataStmt, isMemory);
441 }
442
GetOperateDataSql(OpType opType)443 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
444 {
445 switch (opType) {
446 case OpType::INSERT:
447 return INSERT_SYNC_SQL;
448 case OpType::UPDATE: // fallthrough
449 case OpType::DELETE:
450 return UPDATE_SYNC_SQL;
451 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
452 return SET_SYNC_DATA_NO_FORCE_PUSH;
453 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
454 return SET_SYNC_DATA_FORCE_PUSH;
455 case OpType::UPDATE_TIMESTAMP:
456 return UPDATE_TIMESTAMP;
457 default:
458 return "";
459 }
460 }
461
GetOperateLogSql(OpType opType)462 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
463 {
464 switch (opType) {
465 case OpType::INSERT: // fallthrough
466 case OpType::UPDATE:
467 return INSERT_CLOUD_SYNC_DATA_LOG;
468 case OpType::DELETE:
469 return UPDATE_CLOUD_SYNC_DATA_LOG;
470 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
471 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
472 case OpType::UPDATE_TIMESTAMP: // fallthrough
473 case OpType::ONLY_UPDATE_GID: // fallthrough
474 case OpType::NOT_HANDLE: // fallthrough
475 case OpType::CLEAR_GID: // fallthrough
476 return UPSERT_CLOUD_SYNC_DATA_LOG;
477 default:
478 return "";
479 }
480 }
481
TransToOpType(const CloudWaterType type)482 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
483 {
484 switch (type) {
485 case CloudWaterType::INSERT:
486 return OpType::INSERT;
487 case CloudWaterType::UPDATE:
488 return OpType::UPDATE;
489 case CloudWaterType::DELETE:
490 return OpType::DELETE;
491 default:
492 return OpType::NOT_HANDLE;
493 }
494 }
495
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)496 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
497 const DataItem &dataItem)
498 {
499 int index = 0;
500 int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
501 if (errCode != E_OK) {
502 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
503 return errCode;
504 }
505 errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
506 if (errCode != E_OK) {
507 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
508 }
509 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
510 if (errCode != E_OK) {
511 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
512 return errCode;
513 }
514 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
515 if (errCode != E_OK) {
516 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
517 return errCode;
518 }
519 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
520 if (errCode != E_OK) {
521 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
522 return errCode;
523 }
524 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
525 if (errCode != E_OK) {
526 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
527 return errCode;
528 }
529 return errCode;
530 }
531
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)532 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
533 DownloadData &downloadData)
534 {
535 auto [errCode, dataItem] = GetDataItem(index, downloadData);
536 if (errCode != E_OK) {
537 return errCode;
538 }
539 switch (opType) {
540 case OpType::INSERT:
541 return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
542 case OpType::UPDATE:
543 return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
544 case OpType::DELETE:
545 dataItem.hashKey = downloadData.existDataHashKey[index];
546 dataItem.gid.clear();
547 dataItem.version.clear();
548 return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
549 default:
550 return E_OK;
551 }
552 }
553
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)554 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
555 const std::string &user, const DataItem &dataItem)
556 {
557 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
558 if (errCode != E_OK) {
559 return errCode;
560 }
561 return BindDataStmt(dataStmt, dataItem, true);
562 }
563
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)564 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
565 const DataItem &dataItem)
566 {
567 int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
568 if (errCode != E_OK) {
569 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
570 return errCode;
571 }
572 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
573 if (errCode != E_OK) {
574 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
575 return errCode;
576 }
577 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
578 if (errCode != E_OK) {
579 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
580 return errCode;
581 }
582 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
583 if (errCode != E_OK) {
584 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
585 return errCode;
586 }
587 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, dataItem.cloud_flag);
588 if (errCode != E_OK) {
589 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
590 }
591 return errCode;
592 }
593
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)594 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
595 const DataItem &dataItem)
596 {
597 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
598 if (errCode != E_OK) {
599 return errCode;
600 }
601 errCode = BindDataStmt(dataStmt, dataItem, false);
602 if (errCode != E_OK) {
603 return errCode;
604 }
605 return E_OK;
606 }
607
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)608 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
609 const DataItem &dataItem)
610 {
611 int index = 1;
612 int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
613 if (errCode != E_OK) {
614 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
615 return errCode;
616 }
617 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
618 if (errCode != E_OK) {
619 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
620 return errCode;
621 }
622 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
623 if (errCode != E_OK) {
624 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
625 return errCode;
626 }
627 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
628 if (errCode != E_OK) {
629 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
630 return errCode;
631 }
632 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
633 if (errCode != E_OK) {
634 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
635 }
636 return errCode;
637 }
638
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)639 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
640 DataItem &dataItem)
641 {
642 dataItem.key = {};
643 dataItem.value = {};
644 dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
645 int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
646 if (errCode != E_OK) {
647 return errCode;
648 }
649 errCode = BindDataStmt(dataStmt, dataItem, false);
650 if (errCode != E_OK) {
651 return errCode;
652 }
653 return E_OK;
654 }
655
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)656 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
657 {
658 int index = 1;
659 int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
660 if (errCode != E_OK) {
661 return errCode;
662 }
663 errCode = BindCloudDataStmt(dataStmt, dataItem, index);
664 if (errCode != E_OK) {
665 return errCode;
666 }
667 if (!isInsert) {
668 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
669 if (errCode != E_OK) {
670 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
671 }
672 }
673 return errCode;
674 }
675
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)676 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
677 int &index)
678 {
679 int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
680 if (errCode != E_OK) {
681 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
682 return errCode;
683 }
684 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
685 if (errCode != E_OK) {
686 LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
687 return errCode;
688 }
689 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
690 if (errCode != E_OK) {
691 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
692 return errCode;
693 }
694 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
695 if (errCode != E_OK) {
696 LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
697 return errCode;
698 }
699 Bytes bytes;
700 DBCommon::StringToVector(dataItem.dev, bytes);
701 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
702 if (errCode != E_OK) {
703 LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
704 return errCode;
705 }
706 DBCommon::StringToVector(dataItem.origDev, bytes);
707 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
708 if (errCode != E_OK) {
709 LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
710 return errCode;
711 }
712 if (isInsert) {
713 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
714 if (errCode != E_OK) {
715 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
716 return errCode;
717 }
718 }
719 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
720 if (errCode != E_OK) {
721 LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
722 }
723 return errCode;
724 }
725
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)726 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
727 {
728 int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
729 if (errCode != E_OK) {
730 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
731 return errCode;
732 }
733 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
734 if (errCode != E_OK) {
735 LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
736 }
737 return errCode;
738 }
739
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)740 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
741 {
742 int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
743 if (errCode != -E_FINISHED) {
744 LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
745 return errCode;
746 }
747 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
748 if (errCode != -E_FINISHED) {
749 LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
750 return errCode;
751 }
752 return E_OK;
753 }
754
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)755 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam ¶m, OpType opType, const CloudSyncData &data,
756 const std::string &user, CloudUploadRecorder &recorder)
757 {
758 if (param.first == nullptr) {
759 LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
760 return -E_INVALID_ARGS;
761 }
762 if (data.isCloudVersionRecord) {
763 int errCode = FillCloudVersionRecord(param.first, opType, data);
764 if (errCode != E_OK) {
765 return errCode;
766 }
767 }
768 switch (opType) {
769 case OpType::INSERT:
770 return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
771 case OpType::UPDATE:
772 return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
773 case OpType::DELETE:
774 return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
775 default:
776 return E_OK;
777 }
778 }
779
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)780 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
781 DownloadData &downloadData)
782 {
783 if (downloadData.existDataHashKey[index].empty()) {
784 return E_OK;
785 }
786 sqlite3_stmt *logStmt = nullptr;
787 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
788 if (errCode != E_OK) {
789 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
790 return errCode;
791 }
792 ResFinalizer finalizerData([logStmt]() {
793 sqlite3_stmt *statement = logStmt;
794 int ret = E_OK;
795 SQLiteUtils::ResetStatement(statement, true, ret);
796 if (ret != E_OK) {
797 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
798 }
799 });
800 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
801 if (res.first != E_OK) {
802 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
803 return res.first;
804 }
805 bool clearCloudInfo = (op == OpType::CLEAR_GID);
806 if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
807 res.second.hashKey = downloadData.existDataHashKey[index];
808 clearCloudInfo = true;
809 }
810 if (clearCloudInfo) {
811 res.second.gid.clear();
812 res.second.version.clear();
813 }
814 errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
815 if (errCode != E_OK) {
816 return errCode;
817 }
818 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
819 if (errCode == -E_FINISHED) {
820 errCode = E_OK;
821 }
822 return errCode;
823 }
824
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)825 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam ¶m, const CloudSyncBatch &data,
826 const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
827 {
828 auto [db, ignoreEmptyGid] = param;
829 sqlite3_stmt *logStmt = nullptr;
830 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
831 ResFinalizer finalizerData([logStmt]() {
832 sqlite3_stmt *statement = logStmt;
833 int ret = E_OK;
834 SQLiteUtils::ResetStatement(statement, true, ret);
835 if (ret != E_OK) {
836 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
837 }
838 });
839 for (size_t i = 0; i < data.hashKey.size(); ++i) {
840 if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
841 continue;
842 }
843 DataItem dataItem;
844 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
845 if (dataItem.gid.empty() && ignoreEmptyGid) {
846 continue;
847 }
848 if (errCode != E_OK) {
849 return errCode;
850 }
851 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
852 dataItem.hashKey = data.hashKey[i];
853 errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
854 if (errCode != E_OK) {
855 return errCode;
856 }
857 errCode = SQLiteUtils::StepNext(logStmt, false);
858 if (errCode == -E_FINISHED) {
859 errCode = E_OK;
860 }
861 if (errCode != E_OK) {
862 LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
863 return errCode;
864 }
865 SQLiteUtils::ResetStatement(logStmt, false, errCode);
866 MarkUploadSuccess(param, data, user, i);
867 // ignored version record
868 if (i >= data.timestamp.size()) {
869 continue;
870 }
871 recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
872 }
873 return E_OK;
874 }
875
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)876 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
877 DownloadData &downloadData)
878 {
879 if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
880 opType != OpType::UPDATE_TIMESTAMP) {
881 LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
882 return E_OK;
883 }
884 sqlite3_stmt *dataStmt = nullptr;
885 int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
886 if (errCode != E_OK) {
887 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
888 return errCode;
889 }
890 ResFinalizer finalizerData([dataStmt]() {
891 sqlite3_stmt *statement = dataStmt;
892 int ret = E_OK;
893 SQLiteUtils::ResetStatement(statement, true, ret);
894 if (ret != E_OK) {
895 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
896 }
897 });
898 errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
899 if (errCode != E_OK) {
900 LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
901 return errCode;
902 }
903 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
904 if (errCode == -E_FINISHED) {
905 errCode = E_OK;
906 }
907 return errCode;
908 }
909
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)910 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
911 DownloadData &downloadData)
912 {
913 switch (opType) {
914 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
915 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
916 return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
917 case OpType::UPDATE_TIMESTAMP:
918 return BindUpdateTimestampStmt(dataStmt, index, downloadData);
919 default:
920 return E_OK;
921 }
922 }
923
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)924 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
925 {
926 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
927 auto &[errCode, dataItem] = res;
928 if (errCode != E_OK) {
929 return errCode;
930 }
931 int currentBindIndex = 1; // bind sql index start at 1
932 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
933 if (errCode != E_OK) {
934 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
935 return errCode;
936 }
937 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.modifyTime);
938 if (errCode != E_OK) {
939 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
940 }
941 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
942 if (errCode != E_OK) {
943 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
944 return errCode;
945 }
946 return E_OK;
947 }
948
GetDataItem(int index,DownloadData & downloadData)949 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
950 {
951 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
952 auto &[errCode, dataItem] = res;
953 if (errCode != E_OK) {
954 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
955 return res;
956 }
957 std::string dev;
958 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
959 if (errCode != E_OK) {
960 return res;
961 }
962 dev = DBCommon::TransferHashString(dev);
963 auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
964 if (!decodeDevice.empty()) {
965 dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
966 }
967 if (dataItem.dev == dev) {
968 dataItem.dev = "";
969 }
970 decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
971 if (!decodeDevice.empty()) {
972 dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
973 }
974 if (dataItem.origDev == dev) {
975 dataItem.origDev = "";
976 }
977 dataItem.timestamp = static_cast<Timestamp>(static_cast<int64_t>(dataItem.modifyTime) + downloadData.timeOffset);
978 dataItem.writeTimestamp = dataItem.timestamp; // writeTimestamp is process conflict time
979 return res;
980 }
981
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,std::string & sql)982 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
983 const Timestamp ×tamp, const std::string &user, std::string &sql)
984 {
985 std::pair<int, int64_t> res;
986 auto &[errCode, count] = res;
987 sqlite3_stmt *stmt = nullptr;
988 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
989 if (errCode != E_OK) {
990 LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
991 return res;
992 }
993 ResFinalizer finalizer([stmt]() {
994 sqlite3_stmt *statement = stmt;
995 int ret = E_OK;
996 SQLiteUtils::ResetStatement(statement, true, ret);
997 if (ret != E_OK) {
998 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
999 }
1000 });
1001 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1002 if (errCode != E_OK) {
1003 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1004 return res;
1005 }
1006 errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1007 if (errCode != E_OK) {
1008 LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1009 return res;
1010 }
1011 errCode = SQLiteUtils::StepNext(stmt, isMemory);
1012 if (errCode == -E_FINISHED) {
1013 count = 0;
1014 return res;
1015 }
1016 count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1017 LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1018 return res;
1019 }
1020
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1021 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1022 const Timestamp ×tamp, const std::string &user, bool forcePush)
1023 {
1024 std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1025 return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1026 }
1027
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1028 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam ¶m,
1029 const std::vector<Timestamp> ×tampVec, const std::string &user, bool forcePush,
1030 QuerySyncObject &querySyncObject)
1031 {
1032 std::pair<int, int64_t> res = { E_OK, 0 };
1033 auto &[errCode, count] = res;
1034 if (timestampVec.size() != 3) { // 3 is the number of three mode.
1035 errCode = -E_INVALID_ARGS;
1036 return res;
1037 }
1038 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1039 SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1040 if (errCode != E_OK) {
1041 return res;
1042 }
1043 for (size_t i = 0; i < typeVec.size(); i++) {
1044 sqlite3_stmt *stmt = nullptr;
1045 errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1046 if (errCode != E_OK) {
1047 return res;
1048 }
1049 // count no use watermark
1050 auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1051 if (err != E_OK) {
1052 return { err, 0 };
1053 }
1054 count += cnt;
1055 }
1056 return res;
1057 }
1058
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1059 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1060 {
1061 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1062 return E_OK;
1063 }
1064 bool isInsert = (opType == OpType::INSERT);
1065 CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1066 if (syncBatch.record.empty()) {
1067 LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1068 return E_OK;
1069 }
1070 syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1071 auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1072 auto &[errCode, dataItem] = res;
1073 sqlite3_stmt *dataStmt = nullptr;
1074 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1075 if (errCode != E_OK) {
1076 LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1077 return errCode;
1078 }
1079 ResFinalizer finalizerData([dataStmt]() {
1080 int ret = E_OK;
1081 sqlite3_stmt *statement = dataStmt;
1082 SQLiteUtils::ResetStatement(statement, true, ret);
1083 if (ret != E_OK) {
1084 LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1085 }
1086 });
1087 errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1088 if (errCode != E_OK) {
1089 return errCode;
1090 }
1091 errCode = SQLiteUtils::StepNext(dataStmt, false);
1092 if (errCode != -E_FINISHED) {
1093 LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1094 return errCode;
1095 }
1096 return E_OK;
1097 }
1098
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1099 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1100 const std::string &user)
1101 {
1102 auto res = GetLocalCloudVersionInner(db, isMemory, user);
1103 if (res.first != E_OK) {
1104 LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1105 }
1106 return res;
1107 }
1108
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1109 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1110 const std::string &user)
1111 {
1112 std::pair<int, CloudSyncData> res;
1113 auto &[errCode, syncData] = res;
1114 auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1115 sqlite3_stmt *stmt = nullptr;
1116 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1117 if (errCode != E_OK) {
1118 return res;
1119 }
1120 ResFinalizer finalizerData([stmt]() {
1121 int ret = E_OK;
1122 sqlite3_stmt *statement = stmt;
1123 SQLiteUtils::ResetStatement(statement, true, ret);
1124 if (ret != E_OK) {
1125 LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1126 }
1127 });
1128 std::string hashDev;
1129 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1130 if (errCode != E_OK) {
1131 return res;
1132 }
1133 std::string tempDev = DBCommon::TransferHashString(hashDev);
1134 hashDev = DBCommon::TransferStringToHex(tempDev);
1135 std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1136 Key keyVec;
1137 DBCommon::StringToVector(key, keyVec);
1138 errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1139 if (errCode != E_OK) {
1140 return res;
1141 }
1142 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1143 if (errCode != E_OK) {
1144 return res;
1145 }
1146 errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1147 if (errCode == -E_NOT_FOUND) {
1148 InitDefaultCloudVersionRecord(key, tempDev, syncData);
1149 errCode = E_OK;
1150 }
1151 return res;
1152 }
1153
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1154 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1155 {
1156 int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1157 if (errCode == -E_FINISHED) {
1158 return -E_NOT_FOUND;
1159 }
1160 if (errCode != E_OK) {
1161 LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1162 return errCode;
1163 }
1164 CloudSyncConfig config;
1165 config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1166 config.maxUploadCount = 1;
1167 CloudUploadRecorder recorder; // ignore last record
1168 UploadDetail detail;
1169 errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1170 return errCode;
1171 }
1172
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1173 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1174 CloudSyncData &syncData)
1175 {
1176 LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1177 VBucket defaultRecord;
1178 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1179 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1180 auto encodeDev = DBBase64Utils::Encode(dev);
1181 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1182 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1183 syncData.insData.record.push_back(std::move(defaultRecord));
1184 VBucket defaultExtend;
1185 defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1186 syncData.insData.extend.push_back(std::move(defaultExtend));
1187 syncData.insData.assets.emplace_back();
1188 Bytes bytesHashKey;
1189 DBCommon::StringToVector(key, bytesHashKey);
1190 syncData.insData.hashKey.push_back(bytesHashKey);
1191 }
1192
BindVersionStmt(const std::string & device,const std::string & user,sqlite3_stmt * dataStmt)1193 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, const std::string &user,
1194 sqlite3_stmt *dataStmt)
1195 {
1196 std::string hashDevice;
1197 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1198 if (errCode != E_OK) {
1199 return errCode;
1200 }
1201 Bytes bytes;
1202 if (device == hashDevice) {
1203 DBCommon::StringToVector("", bytes);
1204 } else {
1205 hashDevice = DBCommon::TransferHashString(device);
1206 DBCommon::StringToVector(hashDevice, bytes);
1207 }
1208 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1209 if (errCode != E_OK) {
1210 LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1211 }
1212 return errCode;
1213 }
1214
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & user,const std::string & device,std::vector<VBucket> & dataVector)1215 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &user,
1216 const std::string &device, std::vector<VBucket> &dataVector)
1217 {
1218 sqlite3_stmt *dataStmt = nullptr;
1219 bool isDeviceEmpty = device.empty();
1220 std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1221 int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1222 if (errCode != E_OK) {
1223 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1224 return errCode;
1225 }
1226 ResFinalizer finalizerData([dataStmt]() {
1227 int ret = E_OK;
1228 sqlite3_stmt *statement = dataStmt;
1229 SQLiteUtils::ResetStatement(statement, true, ret);
1230 if (ret != E_OK) {
1231 LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1232 }
1233 });
1234 if (!isDeviceEmpty) {
1235 errCode = BindVersionStmt(device, user, dataStmt);
1236 if (errCode != E_OK) {
1237 return errCode;
1238 }
1239 }
1240 uint32_t totalSize = 0;
1241 do {
1242 errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1243 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1244 errCode = E_OK;
1245 break;
1246 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1247 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1248 break;
1249 }
1250 VBucket data;
1251 errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1252 dataVector.push_back(data);
1253 } while (errCode == E_OK);
1254 return errCode;
1255 }
1256
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1257 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1258 {
1259 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1260 if (errCode != E_OK) {
1261 return errCode;
1262 }
1263 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1264 if (errCode != E_OK) {
1265 return errCode;
1266 }
1267 return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1268 }
1269
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1270 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1271 {
1272 sqlite3_stmt *stmt = nullptr;
1273 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1274 if (errCode != E_OK) {
1275 LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1276 return errCode;
1277 }
1278 ResFinalizer finalizerData([stmt]() {
1279 int ret = E_OK;
1280 sqlite3_stmt *statement = stmt;
1281 SQLiteUtils::ResetStatement(statement, true, ret);
1282 if (ret != E_OK) {
1283 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1284 }
1285 });
1286 uint32_t totalSize = 0;
1287 do {
1288 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1289 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1290 errCode = E_OK;
1291 break;
1292 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1293 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1294 break;
1295 }
1296 VBucket key;
1297 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1298 if (errCode != E_OK) {
1299 return errCode;
1300 }
1301 data.push_back(key);
1302 } while (errCode == E_OK);
1303 return errCode;
1304 }
1305
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1306 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1307 QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1308 {
1309 int errCode = E_OK;
1310 QuerySyncObject query = querySyncObject;
1311 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1312 if (errCode != E_OK) {
1313 return errCode;
1314 }
1315 sqlite3_stmt *stmt = nullptr;
1316 errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1317 if (errCode != E_OK) {
1318 return errCode;
1319 }
1320 do {
1321 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1322 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1323 errCode = E_OK;
1324 break;
1325 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1326 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1327 break;
1328 }
1329 std::string gid;
1330 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1331 cloudGid.push_back(gid);
1332 } while (errCode == E_OK);
1333 int ret = E_OK;
1334 SQLiteUtils::ResetStatement(stmt, true, ret);
1335 return errCode == E_OK ? ret : errCode;
1336 }
1337
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1338 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1339 const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1340 {
1341 DataItem wItem = dataItem;
1342 if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type)) {
1343 wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1344 }
1345 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1346 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1347 wItem.gid = {};
1348 wItem.version = {};
1349 }
1350 int errCode = E_OK;
1351 if (type == CloudWaterType::DELETE) {
1352 if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1353 errCode = BindUpdateLogStmt(logStmt, user, wItem);
1354 }
1355 } else {
1356 errCode = BindInsertLogStmt(logStmt, user, wItem);
1357 }
1358 if (errCode != E_OK) {
1359 LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1360 }
1361 return errCode;
1362 }
1363
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1364 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam ¶m, const CloudSyncBatch &data,
1365 const std::string &user, size_t dataIndex)
1366 {
1367 if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1368 LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1369 return;
1370 }
1371 if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1372 return;
1373 }
1374 if (CheckDataChanged(param, data, dataIndex)) {
1375 LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1376 return;
1377 }
1378 MarkUploadSuccessInner(param, data, user, dataIndex);
1379 }
1380
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1381 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam ¶m,
1382 const CloudSyncBatch &data, size_t dataIndex)
1383 {
1384 sqlite3_stmt *checkStmt = nullptr;
1385 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1386 ResFinalizer finalizerData([checkStmt]() {
1387 sqlite3_stmt *statement = checkStmt;
1388 int ret = E_OK;
1389 SQLiteUtils::ResetStatement(statement, true, ret);
1390 if (ret != E_OK) {
1391 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1392 }
1393 });
1394 int index = 1;
1395 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1396 if (errCode != E_OK) {
1397 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1398 return true;
1399 }
1400 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1401 if (errCode != E_OK) {
1402 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1403 return true;
1404 }
1405 errCode = SQLiteUtils::StepNext(checkStmt);
1406 if (errCode != E_OK) {
1407 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1408 return true;
1409 }
1410 return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1411 }
1412
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1413 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam ¶m,
1414 const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1415 {
1416 sqlite3_stmt *logStmt = nullptr;
1417 int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1418 ResFinalizer finalizerData([logStmt]() {
1419 sqlite3_stmt *statement = logStmt;
1420 int ret = E_OK;
1421 SQLiteUtils::ResetStatement(statement, true, ret);
1422 if (ret != E_OK) {
1423 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1424 }
1425 });
1426 int index = 1;
1427 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1428 if (errCode != E_OK) {
1429 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1430 return;
1431 }
1432 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1433 if (errCode != E_OK) {
1434 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1435 return;
1436 }
1437 errCode = SQLiteUtils::StepNext(logStmt);
1438 if (errCode != E_OK && errCode != -E_FINISHED) {
1439 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1440 return;
1441 }
1442 }
1443 }