1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam ¶m,
28 const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30 auto [db, isMemory] = param;
31 bool stepNext = false;
32 auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33 if (errCode != E_OK) {
34 token.ReleaseCloudQueryStmt();
35 return errCode;
36 }
37 UploadDetail detail;
38 auto &[stepNum, totalSize] = detail;
39 do {
40 if (stepNext) {
41 errCode = SQLiteUtils::StepNext(stmt, isMemory);
42 if (errCode != E_OK) {
43 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44 break;
45 }
46 }
47 stepNext = true;
48 errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49 stepNum++;
50 } while (errCode == E_OK);
51 LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52 data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53 if (errCode != -E_UNFINISHED) {
54 token.ReleaseCloudQueryStmt();
55 } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56 token.ReleaseCloudQueryStmt();
57 }
58 return errCode;
59 }
60
GetMaxTimeStamp(const std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(const std::vector<VBucket> &dataExtend)
62 {
63 Timestamp maxTimeStamp = 0;
64 VBucket lastRecord = dataExtend.back();
65 auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66 if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67 maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68 }
69 return maxTimeStamp;
70 }
71
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,const CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token,
73 const CloudSyncData &data)
74 {
75 Timestamp maxTimeStamp = 0;
76 switch (data.mode) {
77 case DistributedDB::CloudWaterType::DELETE:
78 maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
79 break;
80 case DistributedDB::CloudWaterType::UPDATE:
81 maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
82 break;
83 case DistributedDB::CloudWaterType::INSERT:
84 maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
85 break;
86 case DistributedDB::CloudWaterType::BUTT:
87 default:
88 break;
89 }
90 if (maxTimeStamp > token.GetQueryBeginTime()) {
91 token.SetNextBeginTime("", maxTimeStamp);
92 return true;
93 }
94 LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
95 return false;
96 }
97
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)98 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
99 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
100 {
101 auto &[stepNum, totalSize] = detail;
102 VBucket log;
103 VBucket extraLog;
104 uint32_t preSize = totalSize;
105 int64_t revisedTime = 0;
106 int64_t invalidTime = 0;
107 GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
108 GetCloudExtraLog(statement, extraLog);
109 if (revisedTime != 0) {
110 Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
111 cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
112 }
113
114 VBucket data;
115 int64_t flag = 0;
116 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
117 if (errCode != E_OK) {
118 return errCode;
119 }
120
121 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
122 errCode = GetCloudKvData(statement, data, totalSize);
123 if (errCode != E_OK) {
124 return errCode;
125 }
126 }
127
128 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
129 errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
130 } else {
131 errCode = -E_UNFINISHED;
132 }
133 if (errCode == -E_IGNORE_DATA) {
134 errCode = E_OK;
135 totalSize = preSize;
136 stepNum--;
137 }
138 return errCode;
139 }
140
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)141 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
142 uint32_t &totalSize, int64_t &revisedTime, int64_t &invalidTime)
143 {
144 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
145 uint64_t curTime = 0;
146 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
147 if (modifyTime > static_cast<int64_t>(curTime)) {
148 invalidTime = modifyTime;
149 modifyTime = static_cast<int64_t>(curTime);
150 revisedTime = modifyTime;
151 }
152 } else {
153 LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
154 }
155 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
156 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
157 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
158 totalSize += sizeof(int64_t) + sizeof(int64_t);
159 if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
160 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
161 sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
162 if (!cloudGid.empty()) {
163 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
164 totalSize += cloudGid.size();
165 }
166 }
167 if (revisedTime != 0) {
168 std::string cloudGid;
169 if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
170 cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
171 }
172 LOGW("[SqliteCloudKvExecutorUtils] Found invalid mod time: %lld, curTime: %lld, gid: %s",
173 invalidTime, revisedTime, DBCommon::StringMiddleMasking(cloudGid).c_str());
174 }
175 std::string version;
176 SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
177 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
178 totalSize += version.size();
179 }
180
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)181 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
182 {
183 flags.insert_or_assign(DBConstant::ROWID,
184 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
185 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
186 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
187 flags.insert_or_assign(CloudDbConstant::FLAG,
188 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
189 Bytes hashKey;
190 (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
191 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
192 flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
193 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
194 }
195
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)196 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
197 {
198 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
199 if (errCode != E_OK) {
200 return errCode;
201 }
202 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
203 if (errCode != E_OK) {
204 return errCode;
205 }
206 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
207 if (errCode != E_OK) {
208 return errCode;
209 }
210 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
211 totalSize);
212 if (errCode != E_OK) {
213 return errCode;
214 }
215 data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
216 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
217 totalSize += sizeof(int64_t);
218 return E_OK;
219 }
220
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)221 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
222 VBucket &data, uint32_t &totalSize)
223 {
224 std::vector<uint8_t> blob;
225 int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
226 if (errCode != E_OK) {
227 LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
228 return errCode;
229 }
230 std::string tmp = std::string(blob.begin(), blob.end());
231 if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
232 keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
233 if (tmp.empty()) {
234 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
235 if (errCode != E_OK) {
236 return errCode;
237 }
238 tmp = DBCommon::TransferHashString(tmp);
239 }
240 tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
241 }
242 totalSize += tmp.size();
243 data.insert_or_assign(keyStr, tmp);
244 return E_OK;
245 }
246
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)247 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
248 const VBucket &cloudData, const std::string &userId)
249 {
250 std::pair<int, DataInfoWithLog> res;
251 int &errCode = res.first;
252 std::string keyStr;
253 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
254 if (errCode == -E_NOT_FOUND) {
255 errCode = E_OK;
256 }
257 if (errCode != E_OK) {
258 LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
259 return res;
260 }
261 Bytes key;
262 DBCommon::StringToVector(keyStr, key);
263 Bytes hashKey;
264 DBCommon::CalcValueHash(key, hashKey);
265 sqlite3_stmt *stmt = nullptr;
266 std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
267 if (errCode != E_OK) {
268 LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
269 return res;
270 }
271 std::string gid;
272 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
273 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
274 LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
275 return res;
276 }
277 return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
278 }
279
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)280 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
281 bool existKey, bool emptyUserId)
282 {
283 std::pair<int, sqlite3_stmt*> res;
284 auto &[errCode, stmt] = res;
285 std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
286 if (!emptyUserId) {
287 querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
288 }
289 std::string sql = querySql;
290 sql += " WHERE cloud_gid = ?";
291 if (existKey) {
292 sql += " UNION ";
293 sql += querySql;
294 sql += " WHERE sync_data.hash_key = ?";
295 }
296 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
297 return res;
298 }
299
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)300 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
301 const std::string &gid, const Bytes &key, const std::string &userId)
302 {
303 ResFinalizer finalizer([stmt]() {
304 sqlite3_stmt *statement = stmt;
305 int ret = E_OK;
306 SQLiteUtils::ResetStatement(statement, true, ret);
307 if (ret != E_OK) {
308 LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
309 }
310 });
311 std::pair<int, DataInfoWithLog> res;
312 auto &[errCode, logInfo] = res;
313 int index = 1;
314 if (!userId.empty()) {
315 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
316 if (errCode != E_OK) {
317 LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
318 return res;
319 }
320 }
321 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
322 if (errCode != E_OK) {
323 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
324 return res;
325 }
326 if (!key.empty()) {
327 if (!userId.empty()) {
328 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
329 if (errCode != E_OK) {
330 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
331 return res;
332 }
333 }
334 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
335 if (errCode != E_OK) {
336 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
337 return res;
338 }
339 }
340 errCode = SQLiteUtils::StepNext(stmt, isMemory);
341 if (errCode == -E_FINISHED) {
342 errCode = -E_NOT_FOUND;
343 // not found is ok, just return error
344 return res;
345 }
346 if (errCode != E_OK) {
347 LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
348 return res;
349 }
350 logInfo = FillLogInfoWithStmt(stmt);
351 return res;
352 }
353
FillLogInfoWithStmt(sqlite3_stmt * stmt)354 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
355 {
356 DataInfoWithLog dataInfoWithLog;
357 int index = 0;
358 dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
359 dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
360 std::vector<uint8_t> device;
361 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
362 DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
363 std::vector<uint8_t> oriDev;
364 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
365 DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
366 dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
367 dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
368 std::string gid;
369 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
370 dataInfoWithLog.logInfo.cloudGid = gid;
371 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
372 Bytes key;
373 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
374 std::string keyStr(key.begin(), key.end());
375 dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
376 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
377 dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
378 return dataInfoWithLog;
379 }
380
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)381 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
382 {
383 if (downloadData.data.size() != downloadData.opType.size()) {
384 LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
385 downloadData.opType.size());
386 return -E_CLOUD_ERROR;
387 }
388 std::map<int, int> statisticMap = {};
389 int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
390 LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
391 " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
392 " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
393 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
394 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
395 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
396 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
397 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
398 statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
399 return errCode;
400 }
401
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)402 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
403 DownloadData &downloadData, std::map<int, int> &statisticMap)
404 {
405 int index = 0;
406 int errCode = E_OK;
407 for (OpType op : downloadData.opType) {
408 switch (op) {
409 case OpType::INSERT: // fallthrough
410 case OpType::UPDATE: // fallthrough
411 case OpType::DELETE: // fallthrough
412 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
413 break;
414 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
415 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
416 case OpType::UPDATE_TIMESTAMP: // fallthrough
417 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
418 if (errCode != E_OK) {
419 break;
420 }
421 [[fallthrough]];
422 case OpType::ONLY_UPDATE_GID: // fallthrough
423 case OpType::NOT_HANDLE: // fallthrough
424 case OpType::CLEAR_GID: // fallthrough
425 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
426 break;
427 default:
428 errCode = -E_CLOUD_ERROR;
429 break;
430 }
431 if (errCode != E_OK) {
432 LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
433 return errCode;
434 }
435 statisticMap[static_cast<int>(op)]++;
436 index++;
437 }
438 return errCode;
439 }
440
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)441 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
442 DownloadData &downloadData)
443 {
444 sqlite3_stmt *logStmt = nullptr;
445 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
446 if (errCode != E_OK) {
447 LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
448 return errCode;
449 }
450 sqlite3_stmt *dataStmt = nullptr;
451 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
452 if (errCode != E_OK) {
453 int ret = E_OK;
454 SQLiteUtils::ResetStatement(logStmt, true, ret);
455 LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
456 return errCode;
457 }
458 ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
459 sqlite3_stmt *statement = logStmt;
460 int ret = E_OK;
461 SQLiteUtils::ResetStatement(statement, true, ret);
462 if (ret != E_OK) {
463 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
464 }
465 statement = dataStmt;
466 SQLiteUtils::ResetStatement(statement, true, ret);
467 if (ret != E_OK) {
468 LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
469 }
470 });
471 errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
472 if (errCode != E_OK) {
473 LOGE("[SqliteCloudKvExecutorUtils] BindStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
474 return errCode;
475 }
476 errCode = StepStmt(logStmt, dataStmt, isMemory);
477 if (errCode != E_OK) {
478 LOGE("[SqliteCloudKvExecutorUtils] StepStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
479 return errCode;
480 }
481 if (opType == OpType::INSERT || opType == OpType::UPDATE || opType == OpType::DELETE) {
482 return OperateOtherUserLog(db, isMemory, index, downloadData);
483 }
484 return errCode;
485 }
486
OperateOtherUserLog(sqlite3 * db,bool isMemory,int index,DownloadData & downloadData)487 int SqliteCloudKvExecutorUtils::OperateOtherUserLog(sqlite3 *db, bool isMemory, int index, DownloadData &downloadData)
488 {
489 auto [errCode, dataItem] = GetDataItem(index, downloadData);
490 if (errCode != E_OK) {
491 return errCode;
492 }
493 sqlite3_stmt *logStmt = nullptr;
494 std::string sql = "UPDATE naturalbase_kv_aux_sync_data_log SET cloud_flag = cloud_flag & ~0x2000 "
495 "WHERE userid != ? AND hash_key = ?";
496 errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
497 if (errCode != E_OK) {
498 LOGE("[SqliteCloudKvExecutorUtils] Get operate other log statement failed %d", errCode);
499 return errCode;
500 }
501 ResFinalizer finalizerData([logStmt]() {
502 sqlite3_stmt *statement = logStmt;
503 int ret = E_OK;
504 SQLiteUtils::ResetStatement(statement, true, ret);
505 if (ret != E_OK) {
506 LOGW("[SqliteCloudKvExecutorUtils] Reset operate other log stmt failed %d ", ret);
507 }
508 });
509 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, downloadData.user);
510 if (errCode != E_OK) {
511 LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log user failed %d when insert", errCode);
512 return errCode;
513 }
514 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
515 if (errCode != E_OK) {
516 LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log hashKey failed %d when insert", errCode);
517 return errCode;
518 }
519 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
520 if (errCode == -E_FINISHED) {
521 return E_OK;
522 }
523 return errCode;
524 }
525
GetOperateDataSql(OpType opType)526 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
527 {
528 switch (opType) {
529 case OpType::INSERT:
530 return INSERT_SYNC_SQL;
531 case OpType::UPDATE: // fallthrough
532 case OpType::DELETE:
533 return UPDATE_SYNC_SQL;
534 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
535 return SET_SYNC_DATA_NO_FORCE_PUSH;
536 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
537 return SET_SYNC_DATA_FORCE_PUSH;
538 case OpType::UPDATE_TIMESTAMP:
539 return UPDATE_TIMESTAMP;
540 default:
541 return "";
542 }
543 }
544
GetOperateLogSql(OpType opType)545 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
546 {
547 switch (opType) {
548 case OpType::INSERT: // fallthrough
549 case OpType::UPDATE:
550 return INSERT_CLOUD_SYNC_DATA_LOG;
551 case OpType::DELETE:
552 return UPDATE_CLOUD_SYNC_DATA_LOG;
553 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
554 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
555 case OpType::UPDATE_TIMESTAMP: // fallthrough
556 case OpType::ONLY_UPDATE_GID: // fallthrough
557 case OpType::NOT_HANDLE: // fallthrough
558 case OpType::CLEAR_GID: // fallthrough
559 return UPSERT_CLOUD_SYNC_DATA_LOG;
560 default:
561 return "";
562 }
563 }
564
TransToOpType(const CloudWaterType type)565 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
566 {
567 switch (type) {
568 case CloudWaterType::INSERT:
569 return OpType::INSERT;
570 case CloudWaterType::UPDATE:
571 return OpType::UPDATE;
572 case CloudWaterType::DELETE:
573 return OpType::DELETE;
574 default:
575 return OpType::NOT_HANDLE;
576 }
577 }
578
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)579 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
580 const DataItem &dataItem)
581 {
582 int index = 0;
583 int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
584 if (errCode != E_OK) {
585 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
586 return errCode;
587 }
588 errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
589 if (errCode != E_OK) {
590 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
591 }
592 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
593 if (errCode != E_OK) {
594 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
595 return errCode;
596 }
597 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
598 if (errCode != E_OK) {
599 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
600 return errCode;
601 }
602 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
603 if (errCode != E_OK) {
604 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
605 return errCode;
606 }
607 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
608 if (errCode != E_OK) {
609 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
610 return errCode;
611 }
612 return errCode;
613 }
614
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)615 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
616 DownloadData &downloadData)
617 {
618 auto [errCode, dataItem] = GetDataItem(index, downloadData);
619 if (errCode != E_OK) {
620 return errCode;
621 }
622 switch (opType) {
623 case OpType::INSERT:
624 return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
625 case OpType::UPDATE:
626 return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
627 case OpType::DELETE:
628 dataItem.hashKey = downloadData.existDataHashKey[index];
629 dataItem.gid.clear();
630 dataItem.version.clear();
631 return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
632 default:
633 return E_OK;
634 }
635 }
636
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)637 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
638 const std::string &user, const DataItem &dataItem)
639 {
640 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
641 if (errCode != E_OK) {
642 return errCode;
643 }
644 return BindDataStmt(dataStmt, dataItem, true);
645 }
646
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,bool isTagLogin)647 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
648 const DataItem &dataItem, bool isTagLogin)
649 {
650 int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
651 if (errCode != E_OK) {
652 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
653 return errCode;
654 }
655 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
656 if (errCode != E_OK) {
657 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
658 return errCode;
659 }
660 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
661 if (errCode != E_OK) {
662 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
663 return errCode;
664 }
665 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
666 if (errCode != E_OK) {
667 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
668 return errCode;
669 }
670 int64_t flag = dataItem.cloud_flag | (isTagLogin ? static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIN_USER) : 0x00);
671 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, flag);
672 if (errCode != E_OK) {
673 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
674 }
675 return errCode;
676 }
677
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)678 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
679 const DataItem &dataItem)
680 {
681 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
682 if (errCode != E_OK) {
683 return errCode;
684 }
685 errCode = BindDataStmt(dataStmt, dataItem, false);
686 if (errCode != E_OK) {
687 return errCode;
688 }
689 return E_OK;
690 }
691
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)692 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
693 const DataItem &dataItem)
694 {
695 int index = 1;
696 int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
697 if (errCode != E_OK) {
698 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
699 return errCode;
700 }
701 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
702 if (errCode != E_OK) {
703 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
704 return errCode;
705 }
706 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
707 if (errCode != E_OK) {
708 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
709 return errCode;
710 }
711 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
712 if (errCode != E_OK) {
713 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
714 return errCode;
715 }
716 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
717 if (errCode != E_OK) {
718 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
719 }
720 return errCode;
721 }
722
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)723 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
724 DataItem &dataItem)
725 {
726 dataItem.key = {};
727 dataItem.value = {};
728 dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
729 int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
730 if (errCode != E_OK) {
731 return errCode;
732 }
733 errCode = BindDataStmt(dataStmt, dataItem, false);
734 if (errCode != E_OK) {
735 return errCode;
736 }
737 return E_OK;
738 }
739
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)740 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
741 {
742 int index = 1;
743 int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
744 if (errCode != E_OK) {
745 return errCode;
746 }
747 errCode = BindCloudDataStmt(dataStmt, dataItem, index);
748 if (errCode != E_OK) {
749 return errCode;
750 }
751 if (!isInsert) {
752 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
753 if (errCode != E_OK) {
754 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
755 }
756 }
757 return errCode;
758 }
759
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)760 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
761 int &index)
762 {
763 int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
764 if (errCode != E_OK) {
765 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
766 return errCode;
767 }
768 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
769 if (errCode != E_OK) {
770 LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
771 return errCode;
772 }
773 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
774 if (errCode != E_OK) {
775 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
776 return errCode;
777 }
778 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
779 if (errCode != E_OK) {
780 LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
781 return errCode;
782 }
783 Bytes bytes;
784 DBCommon::StringToVector(dataItem.dev, bytes);
785 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
786 if (errCode != E_OK) {
787 LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
788 return errCode;
789 }
790 DBCommon::StringToVector(dataItem.origDev, bytes);
791 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
792 if (errCode != E_OK) {
793 LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
794 return errCode;
795 }
796 if (isInsert) {
797 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
798 if (errCode != E_OK) {
799 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
800 return errCode;
801 }
802 }
803 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
804 if (errCode != E_OK) {
805 LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
806 }
807 return errCode;
808 }
809
GetModifyTime(int64_t timestamp)810 int64_t SqliteCloudKvExecutorUtils::GetModifyTime(int64_t timestamp)
811 {
812 uint64_t curTime = 0;
813 int64_t modifyTime = timestamp;
814 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
815 if (curTime < static_cast<uint64_t>(INT64_MAX)) {
816 if (modifyTime > static_cast<int64_t>(curTime)) {
817 modifyTime = static_cast<int64_t>(curTime);
818 }
819 }
820 }
821 return modifyTime;
822 }
823
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)824 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
825 {
826 int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
827 if (errCode != E_OK) {
828 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
829 return errCode;
830 }
831 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
832 if (errCode != E_OK) {
833 LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
834 }
835 return errCode;
836 }
837
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)838 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
839 {
840 int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
841 if (errCode != -E_FINISHED) {
842 LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
843 return errCode;
844 }
845 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
846 if (errCode != -E_FINISHED) {
847 LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
848 return errCode;
849 }
850 return E_OK;
851 }
852
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)853 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam ¶m, OpType opType, const CloudSyncData &data,
854 const std::string &user, CloudUploadRecorder &recorder)
855 {
856 if (param.first == nullptr) {
857 LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
858 return -E_INVALID_ARGS;
859 }
860 if (data.isCloudVersionRecord) {
861 int errCode = FillCloudVersionRecord(param.first, opType, data);
862 if (errCode != E_OK) {
863 return errCode;
864 }
865 }
866 switch (opType) {
867 case OpType::INSERT:
868 return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
869 case OpType::UPDATE:
870 return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
871 case OpType::DELETE:
872 return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
873 default:
874 return E_OK;
875 }
876 }
877
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)878 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
879 DownloadData &downloadData)
880 {
881 if (downloadData.existDataHashKey[index].empty()) {
882 return E_OK;
883 }
884 sqlite3_stmt *logStmt = nullptr;
885 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
886 if (errCode != E_OK) {
887 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
888 return errCode;
889 }
890 ResFinalizer finalizerData([logStmt]() {
891 sqlite3_stmt *statement = logStmt;
892 int ret = E_OK;
893 SQLiteUtils::ResetStatement(statement, true, ret);
894 if (ret != E_OK) {
895 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
896 }
897 });
898 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
899 if (res.first != E_OK) {
900 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
901 return res.first;
902 }
903 bool clearCloudInfo = (op == OpType::CLEAR_GID);
904 if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
905 res.second.hashKey = downloadData.existDataHashKey[index];
906 clearCloudInfo = true;
907 }
908 if (clearCloudInfo) {
909 res.second.gid.clear();
910 res.second.version.clear();
911 }
912 errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
913 if (errCode != E_OK) {
914 return errCode;
915 }
916 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
917 if (errCode == -E_FINISHED) {
918 errCode = E_OK;
919 }
920 return errCode;
921 }
922
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)923 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam ¶m, const CloudSyncBatch &data,
924 const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
925 {
926 auto [db, ignoreEmptyGid] = param;
927 sqlite3_stmt *logStmt = nullptr;
928 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
929 ResFinalizer finalizerData([logStmt]() {
930 sqlite3_stmt *statement = logStmt;
931 int ret = E_OK;
932 SQLiteUtils::ResetStatement(statement, true, ret);
933 if (ret != E_OK) {
934 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
935 }
936 });
937 for (size_t i = 0; i < data.hashKey.size(); ++i) {
938 if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
939 continue;
940 }
941 DataItem dataItem;
942 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
943 if (dataItem.gid.empty() && ignoreEmptyGid) {
944 continue;
945 }
946 if (errCode != E_OK) {
947 return errCode;
948 }
949 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
950 dataItem.hashKey = data.hashKey[i];
951 dataItem.dataDelete = CheckDataDelete(param, data, i);
952 errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
953 if (errCode != E_OK) {
954 return errCode;
955 }
956 errCode = SQLiteUtils::StepNext(logStmt, false);
957 if (errCode == -E_FINISHED) {
958 errCode = E_OK;
959 }
960 if (errCode != E_OK) {
961 LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
962 return errCode;
963 }
964 SQLiteUtils::ResetStatement(logStmt, false, errCode);
965 MarkUploadSuccess(param, data, user, i);
966 // ignored version record
967 if (i >= data.timestamp.size()) {
968 continue;
969 }
970 recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
971 }
972 return E_OK;
973 }
974
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)975 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
976 DownloadData &downloadData)
977 {
978 if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
979 opType != OpType::UPDATE_TIMESTAMP) {
980 LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
981 return E_OK;
982 }
983 sqlite3_stmt *dataStmt = nullptr;
984 int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
985 if (errCode != E_OK) {
986 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
987 return errCode;
988 }
989 ResFinalizer finalizerData([dataStmt]() {
990 sqlite3_stmt *statement = dataStmt;
991 int ret = E_OK;
992 SQLiteUtils::ResetStatement(statement, true, ret);
993 if (ret != E_OK) {
994 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
995 }
996 });
997 errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
998 if (errCode != E_OK) {
999 LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
1000 return errCode;
1001 }
1002 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
1003 if (errCode == -E_FINISHED) {
1004 errCode = E_OK;
1005 }
1006 return errCode;
1007 }
1008
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)1009 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
1010 DownloadData &downloadData)
1011 {
1012 switch (opType) {
1013 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1014 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1015 return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
1016 case OpType::UPDATE_TIMESTAMP:
1017 return BindUpdateTimestampStmt(dataStmt, index, downloadData);
1018 default:
1019 return E_OK;
1020 }
1021 }
1022
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)1023 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
1024 {
1025 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1026 auto &[errCode, dataItem] = res;
1027 if (errCode != E_OK) {
1028 return errCode;
1029 }
1030 int currentBindIndex = 1; // bind sql index start at 1
1031 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
1032 if (errCode != E_OK) {
1033 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
1034 return errCode;
1035 }
1036 int64_t modifyTime = GetModifyTime(dataItem.modifyTime);
1037 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, modifyTime);
1038 if (errCode != E_OK) {
1039 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
1040 }
1041 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
1042 if (errCode != E_OK) {
1043 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
1044 return errCode;
1045 }
1046 return E_OK;
1047 }
1048
GetDataItem(int index,DownloadData & downloadData)1049 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
1050 {
1051 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1052 auto &[errCode, dataItem] = res;
1053 if (errCode != E_OK) {
1054 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1055 return res;
1056 }
1057 std::string dev;
1058 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
1059 if (errCode != E_OK) {
1060 return res;
1061 }
1062 dev = DBCommon::TransferHashString(dev);
1063 auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
1064 if (!decodeDevice.empty()) {
1065 dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
1066 }
1067 if (dataItem.dev == dev) {
1068 dataItem.dev = "";
1069 }
1070 decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
1071 if (!decodeDevice.empty()) {
1072 dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
1073 }
1074 if (dataItem.origDev == dev) {
1075 dataItem.origDev = "";
1076 }
1077 int64_t timestamp = GetModifyTime(dataItem.modifyTime);
1078 dataItem.timestamp = static_cast<Timestamp>(timestamp + downloadData.timeOffset);
1079 dataItem.writeTimestamp = static_cast<Timestamp>(timestamp); // writeTimestamp is process conflict time
1080 return res;
1081 }
1082
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,const std::string & sql)1083 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1084 const Timestamp ×tamp, const std::string &user, const std::string &sql)
1085 {
1086 std::pair<int, int64_t> res;
1087 auto &[errCode, count] = res;
1088 sqlite3_stmt *stmt = nullptr;
1089 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1090 if (errCode != E_OK) {
1091 LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1092 return res;
1093 }
1094 ResFinalizer finalizer([stmt]() {
1095 sqlite3_stmt *statement = stmt;
1096 int ret = E_OK;
1097 SQLiteUtils::ResetStatement(statement, true, ret);
1098 if (ret != E_OK) {
1099 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1100 }
1101 });
1102 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1103 if (errCode != E_OK) {
1104 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1105 return res;
1106 }
1107 errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1108 if (errCode != E_OK) {
1109 LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1110 return res;
1111 }
1112 errCode = SQLiteUtils::StepNext(stmt, isMemory);
1113 if (errCode == -E_FINISHED) {
1114 count = 0;
1115 return res;
1116 }
1117 count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1118 LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1119 return res;
1120 }
1121
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1122 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1123 const Timestamp ×tamp, const std::string &user, bool forcePush)
1124 {
1125 std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1126 return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1127 }
1128
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1129 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam ¶m,
1130 const std::vector<Timestamp> ×tampVec, const std::string &user, bool forcePush,
1131 QuerySyncObject &querySyncObject)
1132 {
1133 std::pair<int, int64_t> res = { E_OK, 0 };
1134 auto &[errCode, count] = res;
1135 if (timestampVec.size() != 3) { // 3 is the number of three mode.
1136 errCode = -E_INVALID_ARGS;
1137 return res;
1138 }
1139 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1140 SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1141 if (errCode != E_OK) {
1142 return res;
1143 }
1144 for (size_t i = 0; i < typeVec.size(); i++) {
1145 sqlite3_stmt *stmt = nullptr;
1146 errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1147 if (errCode != E_OK) {
1148 return res;
1149 }
1150 // count no use watermark
1151 auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1152 if (err != E_OK) {
1153 return { err, 0 };
1154 }
1155 count += cnt;
1156 }
1157 return res;
1158 }
1159
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1160 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1161 {
1162 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1163 return E_OK;
1164 }
1165 bool isInsert = (opType == OpType::INSERT);
1166 CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1167 if (syncBatch.record.empty()) {
1168 LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1169 return E_OK;
1170 }
1171 syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1172 auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1173 auto &[errCode, dataItem] = res;
1174 sqlite3_stmt *dataStmt = nullptr;
1175 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1176 if (errCode != E_OK) {
1177 LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1178 return errCode;
1179 }
1180 ResFinalizer finalizerData([dataStmt]() {
1181 int ret = E_OK;
1182 sqlite3_stmt *statement = dataStmt;
1183 SQLiteUtils::ResetStatement(statement, true, ret);
1184 if (ret != E_OK) {
1185 LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1186 }
1187 });
1188 errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1189 if (errCode != E_OK) {
1190 return errCode;
1191 }
1192 errCode = SQLiteUtils::StepNext(dataStmt, false);
1193 if (errCode != -E_FINISHED) {
1194 LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1195 return errCode;
1196 }
1197 return E_OK;
1198 }
1199
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1200 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1201 const std::string &user)
1202 {
1203 auto res = GetLocalCloudVersionInner(db, isMemory, user);
1204 if (res.first != E_OK) {
1205 LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1206 }
1207 return res;
1208 }
1209
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1210 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1211 const std::string &user)
1212 {
1213 std::pair<int, CloudSyncData> res;
1214 auto &[errCode, syncData] = res;
1215 auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1216 sqlite3_stmt *stmt = nullptr;
1217 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1218 if (errCode != E_OK) {
1219 return res;
1220 }
1221 ResFinalizer finalizerData([stmt]() {
1222 int ret = E_OK;
1223 sqlite3_stmt *statement = stmt;
1224 SQLiteUtils::ResetStatement(statement, true, ret);
1225 if (ret != E_OK) {
1226 LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1227 }
1228 });
1229 std::string hashDev;
1230 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1231 if (errCode != E_OK) {
1232 return res;
1233 }
1234 std::string tempDev = DBCommon::TransferHashString(hashDev);
1235 hashDev = DBCommon::TransferStringToHex(tempDev);
1236 std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1237 Key keyVec;
1238 DBCommon::StringToVector(key, keyVec);
1239 errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1240 if (errCode != E_OK) {
1241 return res;
1242 }
1243 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1244 if (errCode != E_OK) {
1245 return res;
1246 }
1247 errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1248 if (errCode == -E_NOT_FOUND) {
1249 InitDefaultCloudVersionRecord(key, tempDev, syncData);
1250 errCode = E_OK;
1251 }
1252 return res;
1253 }
1254
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1255 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1256 {
1257 int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1258 if (errCode == -E_FINISHED) {
1259 return -E_NOT_FOUND;
1260 }
1261 if (errCode != E_OK) {
1262 LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1263 return errCode;
1264 }
1265 CloudSyncConfig config;
1266 config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1267 config.maxUploadCount = 1;
1268 CloudUploadRecorder recorder; // ignore last record
1269 UploadDetail detail;
1270 errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1271 return errCode;
1272 }
1273
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1274 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1275 CloudSyncData &syncData)
1276 {
1277 LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1278 VBucket defaultRecord;
1279 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1280 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1281 auto encodeDev = DBBase64Utils::Encode(dev);
1282 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1283 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1284 syncData.insData.record.push_back(std::move(defaultRecord));
1285 VBucket defaultExtend;
1286 defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1287 syncData.insData.extend.push_back(std::move(defaultExtend));
1288 syncData.insData.assets.emplace_back();
1289 Bytes bytesHashKey;
1290 DBCommon::StringToVector(key, bytesHashKey);
1291 syncData.insData.hashKey.push_back(bytesHashKey);
1292 }
1293
BindVersionStmt(const std::string & device,sqlite3_stmt * dataStmt)1294 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, sqlite3_stmt *dataStmt)
1295 {
1296 std::string hashDevice;
1297 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1298 if (errCode != E_OK) {
1299 return errCode;
1300 }
1301 Bytes bytes;
1302 if (device == hashDevice) {
1303 DBCommon::StringToVector("", bytes);
1304 } else {
1305 hashDevice = DBCommon::TransferHashString(device);
1306 DBCommon::StringToVector(hashDevice, bytes);
1307 }
1308 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1309 if (errCode != E_OK) {
1310 LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1311 }
1312 return errCode;
1313 }
1314
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & device,std::vector<VBucket> & dataVector)1315 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &device,
1316 std::vector<VBucket> &dataVector)
1317 {
1318 sqlite3_stmt *dataStmt = nullptr;
1319 bool isDeviceEmpty = device.empty();
1320 std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1321 int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1322 if (errCode != E_OK) {
1323 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1324 return errCode;
1325 }
1326 ResFinalizer finalizerData([dataStmt]() {
1327 int ret = E_OK;
1328 sqlite3_stmt *statement = dataStmt;
1329 SQLiteUtils::ResetStatement(statement, true, ret);
1330 if (ret != E_OK) {
1331 LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1332 }
1333 });
1334 if (!isDeviceEmpty) {
1335 errCode = BindVersionStmt(device, dataStmt);
1336 if (errCode != E_OK) {
1337 return errCode;
1338 }
1339 }
1340 uint32_t totalSize = 0;
1341 do {
1342 errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1343 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1344 errCode = E_OK;
1345 break;
1346 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1347 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1348 break;
1349 }
1350 VBucket data;
1351 errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1352 dataVector.push_back(data);
1353 } while (errCode == E_OK);
1354 return errCode;
1355 }
1356
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1357 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1358 {
1359 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1360 if (errCode != E_OK) {
1361 return errCode;
1362 }
1363 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1364 if (errCode != E_OK) {
1365 return errCode;
1366 }
1367 return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1368 }
1369
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1370 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1371 {
1372 sqlite3_stmt *stmt = nullptr;
1373 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1374 if (errCode != E_OK) {
1375 LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1376 return errCode;
1377 }
1378 ResFinalizer finalizerData([stmt]() {
1379 int ret = E_OK;
1380 sqlite3_stmt *statement = stmt;
1381 SQLiteUtils::ResetStatement(statement, true, ret);
1382 if (ret != E_OK) {
1383 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1384 }
1385 });
1386 uint32_t totalSize = 0;
1387 do {
1388 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1389 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1390 errCode = E_OK;
1391 break;
1392 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1393 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1394 break;
1395 }
1396 VBucket key;
1397 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1398 if (errCode != E_OK) {
1399 return errCode;
1400 }
1401 data.push_back(key);
1402 } while (errCode == E_OK);
1403 return errCode;
1404 }
1405
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1406 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1407 std::vector<VBucket> &users)
1408 {
1409 sqlite3_stmt *stmt = nullptr;
1410 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1411 if (errCode != E_OK) {
1412 LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1413 return errCode;
1414 }
1415 ResFinalizer finalizerData([stmt]() {
1416 int ret = E_OK;
1417 sqlite3_stmt *statement = stmt;
1418 SQLiteUtils::ResetStatement(statement, true, ret);
1419 if (ret != E_OK) {
1420 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1421 }
1422 });
1423 uint32_t totalSize = 0;
1424 do {
1425 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1426 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1427 errCode = E_OK;
1428 break;
1429 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1430 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1431 break;
1432 }
1433 VBucket key;
1434 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1435 key, totalSize);
1436 if (errCode != E_OK) {
1437 return errCode;
1438 }
1439 users.push_back(key);
1440 } while (errCode == E_OK);
1441 return errCode;
1442 }
1443
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1444 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1445 std::vector<VBucket> &users)
1446 {
1447 int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1448 if (errCode != E_OK) {
1449 LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1450 return errCode;
1451 }
1452 errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1453 if (errCode != E_OK) {
1454 LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1455 }
1456 return errCode;
1457 }
1458
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,const QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1459 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1460 const QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1461 {
1462 int errCode = E_OK;
1463 QuerySyncObject query = querySyncObject;
1464 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1465 if (errCode != E_OK) {
1466 return errCode;
1467 }
1468 sqlite3_stmt *stmt = nullptr;
1469 errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1470 if (errCode != E_OK) {
1471 return errCode;
1472 }
1473 do {
1474 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1475 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1476 errCode = E_OK;
1477 break;
1478 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1479 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1480 break;
1481 }
1482 std::string gid;
1483 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1484 cloudGid.push_back(gid);
1485 } while (errCode == E_OK);
1486 int ret = E_OK;
1487 SQLiteUtils::ResetStatement(stmt, true, ret);
1488 return errCode == E_OK ? ret : errCode;
1489 }
1490
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1491 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1492 const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1493 {
1494 DataItem wItem = dataItem;
1495 if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1496 wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1497 }
1498 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1499 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1500 wItem.gid = {};
1501 wItem.version = {};
1502 }
1503 int errCode = E_OK;
1504 if (type == CloudWaterType::DELETE) {
1505 if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1506 errCode = BindUpdateLogStmt(logStmt, user, wItem);
1507 }
1508 } else {
1509 errCode = BindInsertLogStmt(logStmt, user, wItem, false);
1510 }
1511 if (errCode != E_OK) {
1512 LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1513 }
1514 return errCode;
1515 }
1516
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1517 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam ¶m, const CloudSyncBatch &data,
1518 const std::string &user, size_t dataIndex)
1519 {
1520 if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1521 LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1522 return;
1523 }
1524 if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1525 return;
1526 }
1527 if (CheckDataChanged(param, data, dataIndex)) {
1528 LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1529 return;
1530 }
1531 MarkUploadSuccessInner(param, data, user, dataIndex);
1532 }
1533
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1534 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam ¶m,
1535 const CloudSyncBatch &data, size_t dataIndex)
1536 {
1537 sqlite3_stmt *checkStmt = nullptr;
1538 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1539 if (errCode != E_OK) {
1540 LOGW("[SqliteCloudKvExecutorUtils] check data change get stmt failed %d", errCode);
1541 return true;
1542 }
1543 ResFinalizer finalizerData([checkStmt]() {
1544 sqlite3_stmt *statement = checkStmt;
1545 int ret = E_OK;
1546 SQLiteUtils::ResetStatement(statement, true, ret);
1547 if (ret != E_OK) {
1548 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1549 }
1550 });
1551 int index = 1;
1552 if (data.timestamp.size() > 0) {
1553 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1554 if (errCode != E_OK) {
1555 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1556 return true;
1557 }
1558 }
1559 if (data.hashKey.size() > 0) {
1560 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1561 if (errCode != E_OK) {
1562 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1563 return true;
1564 }
1565 }
1566 errCode = SQLiteUtils::StepNext(checkStmt);
1567 if (errCode != E_OK) {
1568 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1569 return true;
1570 }
1571 return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1572 }
1573
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1574 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam ¶m,
1575 const CloudSyncBatch &data, size_t dataIndex)
1576 {
1577 sqlite3_stmt *checkStmt = nullptr;
1578 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1579 if (errCode != E_OK) {
1580 LOGW("[SqliteCloudKvExecutorUtils] check data delete get stmt failed %d", errCode);
1581 return true;
1582 }
1583 ResFinalizer finalizerData([checkStmt]() {
1584 sqlite3_stmt *statement = checkStmt;
1585 int ret = E_OK;
1586 SQLiteUtils::ResetStatement(statement, true, ret);
1587 if (ret != E_OK) {
1588 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1589 }
1590 });
1591 int index = 1;
1592 if (data.timestamp.size() > 0) {
1593 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1594 if (errCode != E_OK) {
1595 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1596 return true;
1597 }
1598 }
1599 if (data.hashKey.size() > 0) {
1600 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1601 if (errCode != E_OK) {
1602 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1603 return true;
1604 }
1605 }
1606 errCode = SQLiteUtils::StepNext(checkStmt);
1607 if (errCode != E_OK) {
1608 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1609 return true;
1610 }
1611 return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1612 }
1613
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1614 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam ¶m,
1615 const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1616 {
1617 sqlite3_stmt *logStmt = nullptr;
1618 int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1619 if (errCode != E_OK) {
1620 LOGW("[SqliteCloudKvExecutorUtils] mark upload success inner get stmt failed %d", errCode);
1621 return;
1622 }
1623 ResFinalizer finalizerData([logStmt]() {
1624 sqlite3_stmt *statement = logStmt;
1625 int ret = E_OK;
1626 SQLiteUtils::ResetStatement(statement, true, ret);
1627 if (ret != E_OK) {
1628 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1629 }
1630 });
1631 int index = 1;
1632 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1633 if (errCode != E_OK) {
1634 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1635 return;
1636 }
1637 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1638 if (errCode != E_OK) {
1639 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1640 return;
1641 }
1642 errCode = SQLiteUtils::StepNext(logStmt);
1643 if (errCode != E_OK && errCode != -E_FINISHED) {
1644 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1645 return;
1646 }
1647 }
1648 }
1649