1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam ¶m,
28 const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30 auto [db, isMemory] = param;
31 bool stepNext = false;
32 auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33 if (errCode != E_OK) {
34 token.ReleaseCloudQueryStmt();
35 return errCode;
36 }
37 UploadDetail detail;
38 auto &[stepNum, totalSize] = detail;
39 do {
40 if (stepNext) {
41 errCode = SQLiteUtils::StepNext(stmt, isMemory);
42 if (errCode != E_OK) {
43 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44 break;
45 }
46 }
47 stepNext = true;
48 errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49 stepNum++;
50 } while (errCode == E_OK);
51 LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52 data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53 UpdateBeginTime(token, data);
54 token.ReleaseCloudQueryStmt();
55 return errCode;
56 }
57
GetMaxTimeStamp(const std::vector<VBucket> & dataExtend)58 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(const std::vector<VBucket> &dataExtend)
59 {
60 Timestamp maxTimeStamp = 0;
61 if (dataExtend.empty()) {
62 LOGE("[SqliteCloudKvExecutorUtils] [GetMaxTimeStamp] data extend is empty.");
63 return maxTimeStamp;
64 }
65 VBucket lastRecord = dataExtend.back();
66 auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
67 if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
68 maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
69 }
70 return maxTimeStamp;
71 }
72
UpdateBeginTime(SQLiteSingleVerContinueToken & token,const CloudSyncData & data)73 void SqliteCloudKvExecutorUtils::UpdateBeginTime(SQLiteSingleVerContinueToken &token,
74 const CloudSyncData &data)
75 {
76 Timestamp maxTimeStamp = 0;
77 switch (data.mode) {
78 case DistributedDB::CloudWaterType::DELETE:
79 maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
80 break;
81 case DistributedDB::CloudWaterType::UPDATE:
82 maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
83 break;
84 case DistributedDB::CloudWaterType::INSERT:
85 maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
86 break;
87 case DistributedDB::CloudWaterType::BUTT:
88 default:
89 break;
90 }
91 if (maxTimeStamp > token.GetQueryBeginTime()) {
92 token.SetNextBeginTime("", maxTimeStamp);
93 return;
94 }
95 LOGW("[SqliteCloudKvExecutorUtils] The start time of the database has not been updated.");
96 }
97
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)98 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
99 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
100 {
101 auto &[stepNum, totalSize] = detail;
102 VBucket log;
103 VBucket extraLog;
104 uint32_t preSize = totalSize;
105 int64_t revisedTime = 0;
106 int64_t invalidTime = 0;
107 GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
108 GetCloudExtraLog(statement, extraLog);
109 if (revisedTime != 0) {
110 Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
111 cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
112 }
113
114 VBucket data;
115 int64_t flag = 0;
116 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
117 if (errCode != E_OK) {
118 return errCode;
119 }
120
121 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
122 errCode = GetCloudKvData(statement, data, totalSize);
123 if (errCode != E_OK) {
124 return errCode;
125 }
126 }
127
128 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
129 errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
130 } else {
131 errCode = -E_UNFINISHED;
132 }
133 if (errCode == -E_IGNORE_DATA) {
134 errCode = E_OK;
135 totalSize = preSize;
136 stepNum--;
137 }
138 return errCode;
139 }
140
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)141 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
142 uint32_t &totalSize, int64_t &revisedTime, int64_t &invalidTime)
143 {
144 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
145 uint64_t curTime = 0;
146 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
147 if (modifyTime > static_cast<int64_t>(curTime)) {
148 invalidTime = modifyTime;
149 modifyTime = static_cast<int64_t>(curTime);
150 revisedTime = modifyTime;
151 }
152 } else {
153 LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
154 }
155 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
156 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
157 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
158 totalSize += sizeof(int64_t) + sizeof(int64_t);
159 if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
160 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
161 sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
162 if (!cloudGid.empty()) {
163 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
164 totalSize += cloudGid.size();
165 }
166 }
167 if (revisedTime != 0) {
168 std::string cloudGid;
169 if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
170 cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
171 }
172 LOGW("[SqliteCloudKvExecutorUtils] Found invalid mod time: %lld, curTime: %lld, gid: %s",
173 invalidTime, revisedTime, DBCommon::StringMiddleMasking(cloudGid).c_str());
174 }
175 std::string version;
176 SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
177 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
178 totalSize += version.size();
179 }
180
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)181 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
182 {
183 flags.insert_or_assign(DBConstant::ROWID,
184 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
185 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
186 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
187 flags.insert_or_assign(CloudDbConstant::FLAG,
188 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
189 Bytes hashKey;
190 (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
191 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
192 flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
193 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
194 }
195
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)196 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
197 {
198 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
199 if (errCode != E_OK) {
200 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get key failed:%d", errCode);
201 return errCode;
202 }
203 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
204 if (errCode != E_OK) {
205 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get value failed:%d", errCode);
206 return errCode;
207 }
208 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
209 if (errCode != E_OK) {
210 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get device failed:%d", errCode);
211 return errCode;
212 }
213 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
214 totalSize);
215 if (errCode != E_OK) {
216 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get ori device failed:%d", errCode);
217 return errCode;
218 }
219 data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
220 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
221 totalSize += sizeof(int64_t);
222 return E_OK;
223 }
224
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)225 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
226 VBucket &data, uint32_t &totalSize)
227 {
228 std::vector<uint8_t> blob;
229 int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
230 if (errCode != E_OK) {
231 LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
232 return errCode;
233 }
234 std::string tmp = std::string(blob.begin(), blob.end());
235 if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
236 keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
237 if (tmp.empty()) {
238 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
239 if (errCode != E_OK) {
240 LOGE("[SqliteCloudKvExecutorUtils] Get local identity failed %d", errCode);
241 return errCode;
242 }
243 tmp = DBCommon::TransferHashString(tmp);
244 }
245 tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
246 }
247 totalSize += tmp.size();
248 data.insert_or_assign(keyStr, tmp);
249 return E_OK;
250 }
251
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)252 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
253 const VBucket &cloudData, const std::string &userId)
254 {
255 std::pair<int, DataInfoWithLog> res;
256 int &errCode = res.first;
257 std::string keyStr;
258 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
259 if (errCode == -E_NOT_FOUND) {
260 LOGW("[SqliteCloudKvExecutorUtils] Get key not found.");
261 errCode = E_OK;
262 }
263 if (errCode != E_OK) {
264 LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
265 return res;
266 }
267 Bytes key;
268 DBCommon::StringToVector(keyStr, key);
269 Bytes hashKey;
270 DBCommon::CalcValueHash(key, hashKey);
271 sqlite3_stmt *stmt = nullptr;
272 std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
273 if (errCode != E_OK) {
274 LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
275 return res;
276 }
277 std::string gid;
278 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
279 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
280 LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
281 return res;
282 }
283 return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
284 }
285
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)286 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
287 bool existKey, bool emptyUserId)
288 {
289 std::pair<int, sqlite3_stmt*> res;
290 auto &[errCode, stmt] = res;
291 std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
292 if (!emptyUserId) {
293 querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
294 }
295 std::string sql = querySql;
296 sql += " WHERE cloud_gid = ?";
297 if (existKey) {
298 sql += " UNION ";
299 sql += querySql;
300 sql += " WHERE sync_data.hash_key = ?";
301 }
302 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
303 return res;
304 }
305
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)306 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
307 const std::string &gid, const Bytes &key, const std::string &userId)
308 {
309 ResFinalizer finalizer([stmt]() {
310 sqlite3_stmt *statement = stmt;
311 int ret = E_OK;
312 SQLiteUtils::ResetStatement(statement, true, ret);
313 if (ret != E_OK) {
314 LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
315 }
316 });
317 std::pair<int, DataInfoWithLog> res;
318 auto &[errCode, logInfo] = res;
319 int index = 1;
320 if (!userId.empty()) {
321 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
322 if (errCode != E_OK) {
323 LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
324 return res;
325 }
326 }
327 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
328 if (errCode != E_OK) {
329 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
330 return res;
331 }
332 if (!key.empty()) {
333 if (!userId.empty()) {
334 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
335 if (errCode != E_OK) {
336 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
337 return res;
338 }
339 }
340 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
341 if (errCode != E_OK) {
342 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
343 return res;
344 }
345 }
346 errCode = SQLiteUtils::StepNext(stmt, isMemory);
347 if (errCode == -E_FINISHED) {
348 errCode = -E_NOT_FOUND;
349 // not found is ok, just return error
350 return res;
351 }
352 if (errCode != E_OK) {
353 LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
354 return res;
355 }
356 logInfo = FillLogInfoWithStmt(stmt);
357 return res;
358 }
359
FillLogInfoWithStmt(sqlite3_stmt * stmt)360 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
361 {
362 DataInfoWithLog dataInfoWithLog;
363 int index = 0;
364 dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
365 dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
366 std::vector<uint8_t> device;
367 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
368 DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
369 std::vector<uint8_t> oriDev;
370 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
371 DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
372 dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
373 dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
374 std::string gid;
375 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
376 dataInfoWithLog.logInfo.cloudGid = gid;
377 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
378 Bytes key;
379 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
380 std::string keyStr(key.begin(), key.end());
381 dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
382 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
383 dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
384 return dataInfoWithLog;
385 }
386
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)387 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
388 {
389 if (downloadData.data.size() != downloadData.opType.size()) {
390 LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
391 downloadData.opType.size());
392 return -E_CLOUD_ERROR;
393 }
394 std::map<int, int> statisticMap = {};
395 int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
396 LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
397 " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
398 " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
399 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
400 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
401 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
402 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
403 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
404 statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
405 return errCode;
406 }
407
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)408 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
409 DownloadData &downloadData, std::map<int, int> &statisticMap)
410 {
411 int index = 0;
412 int errCode = E_OK;
413 for (OpType op : downloadData.opType) {
414 switch (op) {
415 case OpType::INSERT: // fallthrough
416 case OpType::UPDATE: // fallthrough
417 case OpType::DELETE: // fallthrough
418 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
419 break;
420 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
421 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
422 case OpType::UPDATE_TIMESTAMP: // fallthrough
423 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
424 if (errCode != E_OK) {
425 break;
426 }
427 [[fallthrough]];
428 case OpType::ONLY_UPDATE_GID: // fallthrough
429 case OpType::NOT_HANDLE: // fallthrough
430 case OpType::CLEAR_GID: // fallthrough
431 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
432 break;
433 default:
434 errCode = -E_CLOUD_ERROR;
435 break;
436 }
437 if (errCode != E_OK) {
438 LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
439 return errCode;
440 }
441 statisticMap[static_cast<int>(op)]++;
442 index++;
443 }
444 return errCode;
445 }
446
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)447 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
448 DownloadData &downloadData)
449 {
450 sqlite3_stmt *logStmt = nullptr;
451 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
452 if (errCode != E_OK) {
453 LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
454 return errCode;
455 }
456 sqlite3_stmt *dataStmt = nullptr;
457 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
458 if (errCode != E_OK) {
459 int ret = E_OK;
460 SQLiteUtils::ResetStatement(logStmt, true, ret);
461 LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
462 return errCode;
463 }
464 ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
465 sqlite3_stmt *statement = logStmt;
466 int ret = E_OK;
467 SQLiteUtils::ResetStatement(statement, true, ret);
468 if (ret != E_OK) {
469 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
470 }
471 statement = dataStmt;
472 SQLiteUtils::ResetStatement(statement, true, ret);
473 if (ret != E_OK) {
474 LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
475 }
476 });
477 errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
478 if (errCode != E_OK) {
479 LOGE("[SqliteCloudKvExecutorUtils] BindStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
480 return errCode;
481 }
482 errCode = StepStmt(logStmt, dataStmt, isMemory);
483 if (errCode != E_OK) {
484 LOGE("[SqliteCloudKvExecutorUtils] StepStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
485 return errCode;
486 }
487 if (opType == OpType::INSERT || opType == OpType::UPDATE || opType == OpType::DELETE) {
488 return OperateOtherUserLog(db, isMemory, index, downloadData);
489 }
490 return errCode;
491 }
492
OperateOtherUserLog(sqlite3 * db,bool isMemory,int index,DownloadData & downloadData)493 int SqliteCloudKvExecutorUtils::OperateOtherUserLog(sqlite3 *db, bool isMemory, int index, DownloadData &downloadData)
494 {
495 auto [errCode, dataItem] = GetDataItem(index, downloadData);
496 if (errCode != E_OK) {
497 return errCode;
498 }
499 sqlite3_stmt *logStmt = nullptr;
500 std::string sql = "UPDATE naturalbase_kv_aux_sync_data_log SET cloud_flag = cloud_flag & ~0x2000 "
501 "WHERE userid != ? AND hash_key = ?";
502 errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
503 if (errCode != E_OK) {
504 LOGE("[SqliteCloudKvExecutorUtils] Get operate other log statement failed %d", errCode);
505 return errCode;
506 }
507 ResFinalizer finalizerData([logStmt]() {
508 sqlite3_stmt *statement = logStmt;
509 int ret = E_OK;
510 SQLiteUtils::ResetStatement(statement, true, ret);
511 if (ret != E_OK) {
512 LOGW("[SqliteCloudKvExecutorUtils] Reset operate other log stmt failed %d ", ret);
513 }
514 });
515 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, downloadData.user);
516 if (errCode != E_OK) {
517 LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log user failed %d when insert", errCode);
518 return errCode;
519 }
520 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
521 if (errCode != E_OK) {
522 LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log hashKey failed %d when insert", errCode);
523 return errCode;
524 }
525 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
526 if (errCode == -E_FINISHED) {
527 return E_OK;
528 }
529 return errCode;
530 }
531
GetOperateDataSql(OpType opType)532 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
533 {
534 switch (opType) {
535 case OpType::INSERT:
536 return INSERT_SYNC_SQL;
537 case OpType::UPDATE: // fallthrough
538 case OpType::DELETE:
539 return UPDATE_SYNC_SQL;
540 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
541 return SET_SYNC_DATA_NO_FORCE_PUSH;
542 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
543 return SET_SYNC_DATA_FORCE_PUSH;
544 case OpType::UPDATE_TIMESTAMP:
545 return UPDATE_TIMESTAMP;
546 default:
547 return "";
548 }
549 }
550
GetOperateLogSql(OpType opType)551 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
552 {
553 switch (opType) {
554 case OpType::INSERT: // fallthrough
555 case OpType::UPDATE:
556 return INSERT_CLOUD_SYNC_DATA_LOG;
557 case OpType::DELETE:
558 return UPDATE_CLOUD_SYNC_DATA_LOG;
559 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
560 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
561 case OpType::UPDATE_TIMESTAMP: // fallthrough
562 case OpType::ONLY_UPDATE_GID: // fallthrough
563 case OpType::NOT_HANDLE: // fallthrough
564 case OpType::CLEAR_GID: // fallthrough
565 return UPSERT_CLOUD_SYNC_DATA_LOG;
566 default:
567 return "";
568 }
569 }
570
TransToOpType(const CloudWaterType type)571 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
572 {
573 switch (type) {
574 case CloudWaterType::INSERT:
575 return OpType::INSERT;
576 case CloudWaterType::UPDATE:
577 return OpType::UPDATE;
578 case CloudWaterType::DELETE:
579 return OpType::DELETE;
580 default:
581 return OpType::NOT_HANDLE;
582 }
583 }
584
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)585 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
586 const DataItem &dataItem)
587 {
588 int index = 0;
589 int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
590 if (errCode != E_OK) {
591 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
592 return errCode;
593 }
594 errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
595 if (errCode != E_OK) {
596 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
597 }
598 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
599 if (errCode != E_OK) {
600 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
601 return errCode;
602 }
603 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
604 if (errCode != E_OK) {
605 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
606 return errCode;
607 }
608 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
609 if (errCode != E_OK) {
610 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
611 return errCode;
612 }
613 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
614 if (errCode != E_OK) {
615 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
616 return errCode;
617 }
618 return errCode;
619 }
620
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)621 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
622 DownloadData &downloadData)
623 {
624 auto [errCode, dataItem] = GetDataItem(index, downloadData);
625 if (errCode != E_OK) {
626 return errCode;
627 }
628 switch (opType) {
629 case OpType::INSERT:
630 return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
631 case OpType::UPDATE:
632 return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
633 case OpType::DELETE:
634 dataItem.hashKey = downloadData.existDataHashKey[index];
635 dataItem.gid.clear();
636 dataItem.version.clear();
637 return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
638 default:
639 return E_OK;
640 }
641 }
642
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)643 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
644 const std::string &user, const DataItem &dataItem)
645 {
646 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
647 if (errCode != E_OK) {
648 return errCode;
649 }
650 return BindDataStmt(dataStmt, dataItem, true);
651 }
652
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,bool isTagLogin)653 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
654 const DataItem &dataItem, bool isTagLogin)
655 {
656 int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
657 if (errCode != E_OK) {
658 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
659 return errCode;
660 }
661 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
662 if (errCode != E_OK) {
663 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
664 return errCode;
665 }
666 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
667 if (errCode != E_OK) {
668 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
669 return errCode;
670 }
671 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
672 if (errCode != E_OK) {
673 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
674 return errCode;
675 }
676 int64_t flag = dataItem.cloud_flag | (isTagLogin ? static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIN_USER) : 0x00);
677 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, flag);
678 if (errCode != E_OK) {
679 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
680 }
681 return errCode;
682 }
683
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)684 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
685 const DataItem &dataItem)
686 {
687 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
688 if (errCode != E_OK) {
689 return errCode;
690 }
691 errCode = BindDataStmt(dataStmt, dataItem, false);
692 if (errCode != E_OK) {
693 return errCode;
694 }
695 return E_OK;
696 }
697
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)698 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
699 const DataItem &dataItem)
700 {
701 int index = 1;
702 int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
703 if (errCode != E_OK) {
704 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
705 return errCode;
706 }
707 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
708 if (errCode != E_OK) {
709 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
710 return errCode;
711 }
712 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
713 if (errCode != E_OK) {
714 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
715 return errCode;
716 }
717 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
718 if (errCode != E_OK) {
719 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
720 return errCode;
721 }
722 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
723 if (errCode != E_OK) {
724 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
725 }
726 return errCode;
727 }
728
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)729 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
730 DataItem &dataItem)
731 {
732 dataItem.key = {};
733 dataItem.value = {};
734 dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
735 int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
736 if (errCode != E_OK) {
737 return errCode;
738 }
739 errCode = BindDataStmt(dataStmt, dataItem, false);
740 if (errCode != E_OK) {
741 return errCode;
742 }
743 return E_OK;
744 }
745
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)746 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
747 {
748 int index = 1;
749 int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
750 if (errCode != E_OK) {
751 return errCode;
752 }
753 errCode = BindCloudDataStmt(dataStmt, dataItem, index);
754 if (errCode != E_OK) {
755 return errCode;
756 }
757 if (!isInsert) {
758 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
759 if (errCode != E_OK) {
760 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
761 }
762 }
763 return errCode;
764 }
765
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)766 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
767 int &index)
768 {
769 int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
770 if (errCode != E_OK) {
771 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
772 return errCode;
773 }
774 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
775 if (errCode != E_OK) {
776 LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
777 return errCode;
778 }
779 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
780 if (errCode != E_OK) {
781 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
782 return errCode;
783 }
784 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
785 if (errCode != E_OK) {
786 LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
787 return errCode;
788 }
789 Bytes bytes;
790 DBCommon::StringToVector(dataItem.dev, bytes);
791 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
792 if (errCode != E_OK) {
793 LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
794 return errCode;
795 }
796 DBCommon::StringToVector(dataItem.origDev, bytes);
797 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
798 if (errCode != E_OK) {
799 LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
800 return errCode;
801 }
802 if (isInsert) {
803 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
804 if (errCode != E_OK) {
805 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
806 return errCode;
807 }
808 }
809 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
810 if (errCode != E_OK) {
811 LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
812 }
813 return errCode;
814 }
815
GetModifyTime(int64_t timestamp)816 int64_t SqliteCloudKvExecutorUtils::GetModifyTime(int64_t timestamp)
817 {
818 uint64_t curTime = 0;
819 int64_t modifyTime = timestamp;
820 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
821 if (curTime < static_cast<uint64_t>(INT64_MAX)) {
822 if (modifyTime > static_cast<int64_t>(curTime)) {
823 modifyTime = static_cast<int64_t>(curTime);
824 }
825 }
826 }
827 return modifyTime;
828 }
829
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)830 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
831 {
832 int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
833 if (errCode != E_OK) {
834 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
835 return errCode;
836 }
837 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
838 if (errCode != E_OK) {
839 LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
840 }
841 return errCode;
842 }
843
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)844 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
845 {
846 int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
847 if (errCode != -E_FINISHED) {
848 LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
849 return errCode;
850 }
851 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
852 if (errCode != -E_FINISHED) {
853 LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
854 return errCode;
855 }
856 return E_OK;
857 }
858
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)859 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam ¶m, OpType opType, const CloudSyncData &data,
860 const std::string &user, CloudUploadRecorder &recorder)
861 {
862 if (param.first == nullptr) {
863 LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
864 return -E_INVALID_ARGS;
865 }
866 if (data.isCloudVersionRecord) {
867 int errCode = FillCloudVersionRecord(param.first, opType, data);
868 if (errCode != E_OK) {
869 return errCode;
870 }
871 }
872 switch (opType) {
873 case OpType::INSERT:
874 return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
875 case OpType::UPDATE:
876 return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
877 case OpType::DELETE:
878 return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
879 default:
880 return E_OK;
881 }
882 }
883
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)884 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
885 DownloadData &downloadData)
886 {
887 if (downloadData.existDataHashKey[index].empty()) {
888 return E_OK;
889 }
890 sqlite3_stmt *logStmt = nullptr;
891 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
892 if (errCode != E_OK) {
893 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
894 return errCode;
895 }
896 ResFinalizer finalizerData([logStmt]() {
897 sqlite3_stmt *statement = logStmt;
898 int ret = E_OK;
899 SQLiteUtils::ResetStatement(statement, true, ret);
900 if (ret != E_OK) {
901 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
902 }
903 });
904 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
905 if (res.first != E_OK) {
906 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
907 return res.first;
908 }
909 bool clearCloudInfo = (op == OpType::CLEAR_GID);
910 if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
911 res.second.hashKey = downloadData.existDataHashKey[index];
912 clearCloudInfo = true;
913 }
914 if (clearCloudInfo) {
915 res.second.gid.clear();
916 res.second.version.clear();
917 }
918 errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
919 if (errCode != E_OK) {
920 return errCode;
921 }
922 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
923 if (errCode == -E_FINISHED) {
924 errCode = E_OK;
925 }
926 return errCode;
927 }
928
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)929 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam ¶m, const CloudSyncBatch &data,
930 const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
931 {
932 auto [db, ignoreEmptyGid] = param;
933 sqlite3_stmt *logStmt = nullptr;
934 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
935 if (errCode != E_OK) {
936 return errCode;
937 }
938 ResFinalizer finalizerData([logStmt]() {
939 sqlite3_stmt *statement = logStmt;
940 int ret = E_OK;
941 SQLiteUtils::ResetStatement(statement, true, ret);
942 if (ret != E_OK) {
943 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
944 }
945 });
946 for (size_t i = 0; i < data.hashKey.size(); ++i) {
947 if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
948 continue;
949 }
950 DataItem dataItem;
951 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
952 if (dataItem.gid.empty() && ignoreEmptyGid) {
953 continue;
954 }
955 if (errCode != E_OK) {
956 return errCode;
957 }
958 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
959 dataItem.hashKey = data.hashKey[i];
960 dataItem.dataDelete = CheckDataDelete(param, data, i);
961 errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
962 if (errCode != E_OK) {
963 return errCode;
964 }
965 errCode = SQLiteUtils::StepNext(logStmt, false);
966 if (errCode == -E_FINISHED) {
967 errCode = E_OK;
968 }
969 if (errCode != E_OK) {
970 LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
971 return errCode;
972 }
973 SQLiteUtils::ResetStatement(logStmt, false, errCode);
974 MarkUploadSuccess(param, data, user, i);
975 // ignored version record
976 if (i < data.timestamp.size()) {
977 recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
978 }
979 }
980 return E_OK;
981 }
982
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)983 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
984 DownloadData &downloadData)
985 {
986 if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
987 opType != OpType::UPDATE_TIMESTAMP) {
988 LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
989 return E_OK;
990 }
991 sqlite3_stmt *dataStmt = nullptr;
992 int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
993 if (errCode != E_OK) {
994 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
995 return errCode;
996 }
997 ResFinalizer finalizerData([dataStmt]() {
998 sqlite3_stmt *statement = dataStmt;
999 int ret = E_OK;
1000 SQLiteUtils::ResetStatement(statement, true, ret);
1001 if (ret != E_OK) {
1002 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
1003 }
1004 });
1005 errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
1006 if (errCode != E_OK) {
1007 LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
1008 return errCode;
1009 }
1010 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
1011 if (errCode == -E_FINISHED) {
1012 errCode = E_OK;
1013 }
1014 return errCode;
1015 }
1016
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)1017 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
1018 DownloadData &downloadData)
1019 {
1020 switch (opType) {
1021 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1022 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1023 return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
1024 case OpType::UPDATE_TIMESTAMP:
1025 return BindUpdateTimestampStmt(dataStmt, index, downloadData);
1026 default:
1027 return E_OK;
1028 }
1029 }
1030
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)1031 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
1032 {
1033 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1034 auto &[errCode, dataItem] = res;
1035 if (errCode != E_OK) {
1036 return errCode;
1037 }
1038 int currentBindIndex = 1; // bind sql index start at 1
1039 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
1040 if (errCode != E_OK) {
1041 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
1042 return errCode;
1043 }
1044 int64_t modifyTime = GetModifyTime(dataItem.modifyTime);
1045 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, modifyTime);
1046 if (errCode != E_OK) {
1047 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
1048 return errCode;
1049 }
1050 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
1051 if (errCode != E_OK) {
1052 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
1053 return errCode;
1054 }
1055 return E_OK;
1056 }
1057
GetDataItem(int index,DownloadData & downloadData)1058 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
1059 {
1060 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1061 auto &[errCode, dataItem] = res;
1062 if (errCode != E_OK) {
1063 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1064 return res;
1065 }
1066 std::string dev;
1067 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
1068 if (errCode != E_OK) {
1069 return res;
1070 }
1071 dev = DBCommon::TransferHashString(dev);
1072 auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
1073 if (!decodeDevice.empty()) {
1074 dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
1075 }
1076 if (dataItem.dev == dev) {
1077 dataItem.dev = "";
1078 }
1079 decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
1080 if (!decodeDevice.empty()) {
1081 dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
1082 }
1083 if (dataItem.origDev == dev) {
1084 dataItem.origDev = "";
1085 }
1086 int64_t timestamp = GetModifyTime(dataItem.modifyTime);
1087 dataItem.timestamp = static_cast<Timestamp>(timestamp + downloadData.timeOffset);
1088 dataItem.writeTimestamp = static_cast<Timestamp>(timestamp); // writeTimestamp is process conflict time
1089 return res;
1090 }
1091
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,const std::string & sql)1092 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1093 const Timestamp ×tamp, const std::string &user, const std::string &sql)
1094 {
1095 std::pair<int, int64_t> res;
1096 auto &[errCode, count] = res;
1097 sqlite3_stmt *stmt = nullptr;
1098 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1099 if (errCode != E_OK) {
1100 LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1101 return res;
1102 }
1103 ResFinalizer finalizer([stmt]() {
1104 sqlite3_stmt *statement = stmt;
1105 int ret = E_OK;
1106 SQLiteUtils::ResetStatement(statement, true, ret);
1107 if (ret != E_OK) {
1108 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1109 }
1110 });
1111 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1112 if (errCode != E_OK) {
1113 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1114 return res;
1115 }
1116 errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1117 if (errCode != E_OK) {
1118 LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1119 return res;
1120 }
1121 errCode = SQLiteUtils::StepNext(stmt, isMemory);
1122 if (errCode == -E_FINISHED) {
1123 count = 0;
1124 return res;
1125 }
1126 count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1127 LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1128 return res;
1129 }
1130
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1131 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1132 const Timestamp ×tamp, const std::string &user, bool forcePush)
1133 {
1134 std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1135 return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1136 }
1137
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1138 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam ¶m,
1139 const std::vector<Timestamp> ×tampVec, const std::string &user, bool forcePush,
1140 QuerySyncObject &querySyncObject)
1141 {
1142 std::pair<int, int64_t> res = { E_OK, 0 };
1143 auto &[errCode, count] = res;
1144 if (timestampVec.size() != 3) { // 3 is the number of three mode.
1145 errCode = -E_INVALID_ARGS;
1146 return res;
1147 }
1148 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1149 SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1150 if (errCode != E_OK) {
1151 return res;
1152 }
1153 for (size_t i = 0; i < typeVec.size(); i++) {
1154 sqlite3_stmt *stmt = nullptr;
1155 errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1156 if (errCode != E_OK) {
1157 return res;
1158 }
1159 // count no use watermark
1160 auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1161 if (err != E_OK) {
1162 return { err, 0 };
1163 }
1164 count += cnt;
1165 }
1166 return res;
1167 }
1168
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1169 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1170 {
1171 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1172 return E_OK;
1173 }
1174 bool isInsert = (opType == OpType::INSERT);
1175 CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1176 if (syncBatch.record.empty()) {
1177 LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1178 return E_OK;
1179 }
1180 syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1181 auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1182 auto &[errCode, dataItem] = res;
1183 sqlite3_stmt *dataStmt = nullptr;
1184 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1185 if (errCode != E_OK) {
1186 LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1187 return errCode;
1188 }
1189 ResFinalizer finalizerData([dataStmt]() {
1190 int ret = E_OK;
1191 sqlite3_stmt *statement = dataStmt;
1192 SQLiteUtils::ResetStatement(statement, true, ret);
1193 if (ret != E_OK) {
1194 LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1195 }
1196 });
1197 errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1198 if (errCode != E_OK) {
1199 return errCode;
1200 }
1201 errCode = SQLiteUtils::StepNext(dataStmt, false);
1202 if (errCode != -E_FINISHED) {
1203 LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1204 return errCode;
1205 }
1206 return E_OK;
1207 }
1208
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1209 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1210 const std::string &user)
1211 {
1212 auto res = GetLocalCloudVersionInner(db, isMemory, user);
1213 if (res.first != E_OK) {
1214 LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1215 }
1216 return res;
1217 }
1218
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1219 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1220 const std::string &user)
1221 {
1222 std::pair<int, CloudSyncData> res;
1223 auto &[errCode, syncData] = res;
1224 auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1225 sqlite3_stmt *stmt = nullptr;
1226 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1227 if (errCode != E_OK) {
1228 return res;
1229 }
1230 ResFinalizer finalizerData([stmt]() {
1231 int ret = E_OK;
1232 sqlite3_stmt *statement = stmt;
1233 SQLiteUtils::ResetStatement(statement, true, ret);
1234 if (ret != E_OK) {
1235 LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1236 }
1237 });
1238 std::string hashDev;
1239 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1240 if (errCode != E_OK) {
1241 return res;
1242 }
1243 std::string tempDev = DBCommon::TransferHashString(hashDev);
1244 hashDev = DBCommon::TransferStringToHex(tempDev);
1245 std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1246 Key keyVec;
1247 DBCommon::StringToVector(key, keyVec);
1248 errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1249 if (errCode != E_OK) {
1250 return res;
1251 }
1252 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1253 if (errCode != E_OK) {
1254 return res;
1255 }
1256 errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1257 if (errCode == -E_NOT_FOUND) {
1258 InitDefaultCloudVersionRecord(key, tempDev, syncData);
1259 errCode = E_OK;
1260 }
1261 return res;
1262 }
1263
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1264 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1265 {
1266 int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1267 if (errCode == -E_FINISHED) {
1268 return -E_NOT_FOUND;
1269 }
1270 if (errCode != E_OK) {
1271 LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1272 return errCode;
1273 }
1274 CloudSyncConfig config;
1275 config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1276 config.maxUploadCount = 1;
1277 CloudUploadRecorder recorder; // ignore last record
1278 UploadDetail detail;
1279 errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1280 return errCode;
1281 }
1282
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1283 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1284 CloudSyncData &syncData)
1285 {
1286 LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1287 VBucket defaultRecord;
1288 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1289 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1290 auto encodeDev = DBBase64Utils::Encode(dev);
1291 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1292 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1293 syncData.insData.record.push_back(std::move(defaultRecord));
1294 VBucket defaultExtend;
1295 defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1296 syncData.insData.extend.push_back(std::move(defaultExtend));
1297 syncData.insData.assets.emplace_back();
1298 Bytes bytesHashKey;
1299 DBCommon::StringToVector(key, bytesHashKey);
1300 syncData.insData.hashKey.push_back(bytesHashKey);
1301 }
1302
BindVersionStmt(const std::string & device,sqlite3_stmt * dataStmt)1303 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, sqlite3_stmt *dataStmt)
1304 {
1305 std::string hashDevice;
1306 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1307 if (errCode != E_OK) {
1308 return errCode;
1309 }
1310 Bytes bytes;
1311 if (device == hashDevice) {
1312 DBCommon::StringToVector("", bytes);
1313 } else {
1314 hashDevice = DBCommon::TransferHashString(device);
1315 DBCommon::StringToVector(hashDevice, bytes);
1316 }
1317 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1318 if (errCode != E_OK) {
1319 LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1320 }
1321 return errCode;
1322 }
1323
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & device,std::vector<VBucket> & dataVector)1324 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &device,
1325 std::vector<VBucket> &dataVector)
1326 {
1327 sqlite3_stmt *dataStmt = nullptr;
1328 bool isDeviceEmpty = device.empty();
1329 std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1330 int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1331 if (errCode != E_OK) {
1332 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1333 return errCode;
1334 }
1335 ResFinalizer finalizerData([dataStmt]() {
1336 int ret = E_OK;
1337 sqlite3_stmt *statement = dataStmt;
1338 SQLiteUtils::ResetStatement(statement, true, ret);
1339 if (ret != E_OK) {
1340 LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1341 }
1342 });
1343 if (!isDeviceEmpty) {
1344 errCode = BindVersionStmt(device, dataStmt);
1345 if (errCode != E_OK) {
1346 return errCode;
1347 }
1348 }
1349 uint32_t totalSize = 0;
1350 do {
1351 errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1352 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1353 errCode = E_OK;
1354 break;
1355 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1356 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1357 break;
1358 }
1359 VBucket data;
1360 errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1361 dataVector.push_back(data);
1362 } while (errCode == E_OK);
1363 return errCode;
1364 }
1365
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1366 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1367 {
1368 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1369 if (errCode != E_OK) {
1370 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudVersionRecordData] Get key failed:%d", errCode);
1371 return errCode;
1372 }
1373 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1374 if (errCode != E_OK) {
1375 LOGE("[SqliteCloudKvExecutorUtils] [GetCloudVersionRecordData] Get value failed:%d", errCode);
1376 return errCode;
1377 }
1378 return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1379 }
1380
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1381 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1382 {
1383 sqlite3_stmt *stmt = nullptr;
1384 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1385 if (errCode != E_OK) {
1386 LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1387 return errCode;
1388 }
1389 ResFinalizer finalizerData([stmt]() {
1390 int ret = E_OK;
1391 sqlite3_stmt *statement = stmt;
1392 SQLiteUtils::ResetStatement(statement, true, ret);
1393 if (ret != E_OK) {
1394 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1395 }
1396 });
1397 uint32_t totalSize = 0;
1398 do {
1399 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1400 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1401 errCode = E_OK;
1402 break;
1403 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1404 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1405 break;
1406 }
1407 VBucket key;
1408 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1409 if (errCode != E_OK) {
1410 return errCode;
1411 }
1412 data.push_back(key);
1413 } while (errCode == E_OK);
1414 return errCode;
1415 }
1416
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1417 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1418 std::vector<VBucket> &users)
1419 {
1420 sqlite3_stmt *stmt = nullptr;
1421 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1422 if (errCode != E_OK) {
1423 LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1424 return errCode;
1425 }
1426 ResFinalizer finalizerData([stmt]() {
1427 int ret = E_OK;
1428 sqlite3_stmt *statement = stmt;
1429 SQLiteUtils::ResetStatement(statement, true, ret);
1430 if (ret != E_OK) {
1431 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1432 }
1433 });
1434 uint32_t totalSize = 0;
1435 do {
1436 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1437 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1438 errCode = E_OK;
1439 break;
1440 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1441 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1442 break;
1443 }
1444 VBucket key;
1445 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1446 key, totalSize);
1447 if (errCode != E_OK) {
1448 return errCode;
1449 }
1450 users.push_back(key);
1451 } while (errCode == E_OK);
1452 return errCode;
1453 }
1454
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1455 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1456 std::vector<VBucket> &users)
1457 {
1458 int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1459 if (errCode != E_OK) {
1460 LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1461 return errCode;
1462 }
1463 errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1464 if (errCode != E_OK) {
1465 LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1466 }
1467 return errCode;
1468 }
1469
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,const QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1470 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1471 const QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1472 {
1473 int errCode = E_OK;
1474 QuerySyncObject query = querySyncObject;
1475 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1476 if (errCode != E_OK) {
1477 return errCode;
1478 }
1479 sqlite3_stmt *stmt = nullptr;
1480 errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1481 if (errCode != E_OK) {
1482 return errCode;
1483 }
1484 do {
1485 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1486 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1487 errCode = E_OK;
1488 break;
1489 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1490 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1491 break;
1492 }
1493 std::string gid;
1494 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1495 cloudGid.push_back(gid);
1496 } while (errCode == E_OK);
1497 int ret = E_OK;
1498 SQLiteUtils::ResetStatement(stmt, true, ret);
1499 return errCode == E_OK ? ret : errCode;
1500 }
1501
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1502 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1503 const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1504 {
1505 DataItem wItem = dataItem;
1506 if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1507 wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1508 }
1509 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1510 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1511 wItem.gid = {};
1512 wItem.version = {};
1513 }
1514 int errCode = E_OK;
1515 if (type == CloudWaterType::DELETE) {
1516 if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1517 errCode = BindUpdateLogStmt(logStmt, user, wItem);
1518 }
1519 } else {
1520 errCode = BindInsertLogStmt(logStmt, user, wItem, false);
1521 }
1522 if (errCode != E_OK) {
1523 LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1524 }
1525 return errCode;
1526 }
1527
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1528 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam ¶m, const CloudSyncBatch &data,
1529 const std::string &user, size_t dataIndex)
1530 {
1531 if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1532 LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1533 return;
1534 }
1535 if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1536 return;
1537 }
1538 if (CheckDataChanged(param, data, dataIndex)) {
1539 LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1540 return;
1541 }
1542 MarkUploadSuccessInner(param, data, user, dataIndex);
1543 }
1544
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1545 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam ¶m,
1546 const CloudSyncBatch &data, size_t dataIndex)
1547 {
1548 sqlite3_stmt *checkStmt = nullptr;
1549 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1550 if (errCode != E_OK) {
1551 LOGW("[SqliteCloudKvExecutorUtils] check data change get stmt failed %d", errCode);
1552 return true;
1553 }
1554 ResFinalizer finalizerData([checkStmt]() {
1555 sqlite3_stmt *statement = checkStmt;
1556 int ret = E_OK;
1557 SQLiteUtils::ResetStatement(statement, true, ret);
1558 if (ret != E_OK) {
1559 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1560 }
1561 });
1562 int index = 1;
1563 if (data.timestamp.size() > 0) {
1564 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1565 if (errCode != E_OK) {
1566 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1567 return true;
1568 }
1569 }
1570 if (data.hashKey.size() > 0) {
1571 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1572 if (errCode != E_OK) {
1573 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1574 return true;
1575 }
1576 }
1577 errCode = SQLiteUtils::StepNext(checkStmt);
1578 if (errCode != E_OK) {
1579 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1580 return true;
1581 }
1582 return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1583 }
1584
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1585 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam ¶m,
1586 const CloudSyncBatch &data, size_t dataIndex)
1587 {
1588 sqlite3_stmt *checkStmt = nullptr;
1589 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1590 if (errCode != E_OK) {
1591 LOGW("[SqliteCloudKvExecutorUtils] check data delete get stmt failed %d", errCode);
1592 return true;
1593 }
1594 ResFinalizer finalizerData([checkStmt]() {
1595 sqlite3_stmt *statement = checkStmt;
1596 int ret = E_OK;
1597 SQLiteUtils::ResetStatement(statement, true, ret);
1598 if (ret != E_OK) {
1599 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1600 }
1601 });
1602 int index = 1;
1603 if (data.timestamp.size() > 0) {
1604 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1605 if (errCode != E_OK) {
1606 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1607 return true;
1608 }
1609 }
1610 if (data.hashKey.size() > 0) {
1611 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1612 if (errCode != E_OK) {
1613 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1614 return true;
1615 }
1616 }
1617 errCode = SQLiteUtils::StepNext(checkStmt);
1618 if (errCode != E_OK) {
1619 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1620 return true;
1621 }
1622 return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1623 }
1624
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1625 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam ¶m,
1626 const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1627 {
1628 sqlite3_stmt *logStmt = nullptr;
1629 int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1630 if (errCode != E_OK) {
1631 LOGW("[SqliteCloudKvExecutorUtils] mark upload success inner get stmt failed %d", errCode);
1632 return;
1633 }
1634 ResFinalizer finalizerData([logStmt]() {
1635 sqlite3_stmt *statement = logStmt;
1636 int ret = E_OK;
1637 SQLiteUtils::ResetStatement(statement, true, ret);
1638 if (ret != E_OK) {
1639 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1640 }
1641 });
1642 int index = 1;
1643 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1644 if (errCode != E_OK) {
1645 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1646 return;
1647 }
1648 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1649 if (errCode != E_OK) {
1650 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1651 return;
1652 }
1653 errCode = SQLiteUtils::StepNext(logStmt);
1654 if (errCode != E_OK && errCode != -E_FINISHED) {
1655 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1656 return;
1657 }
1658 }
1659 }
1660