• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25 
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam &param,
28     const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30     auto [db, isMemory] = param;
31     bool stepNext = false;
32     auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33     if (errCode != E_OK) {
34         token.ReleaseCloudQueryStmt();
35         return errCode;
36     }
37     UploadDetail detail;
38     auto &[stepNum, totalSize] = detail;
39     do {
40         if (stepNext) {
41             errCode = SQLiteUtils::StepNext(stmt, isMemory);
42             if (errCode != E_OK) {
43                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44                 break;
45             }
46         }
47         stepNext = true;
48         errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49         stepNum++;
50     } while (errCode == E_OK);
51     LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52          data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53     UpdateBeginTime(token, data);
54     token.ReleaseCloudQueryStmt();
55     return errCode;
56 }
57 
GetMaxTimeStamp(const std::vector<VBucket> & dataExtend)58 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(const std::vector<VBucket> &dataExtend)
59 {
60     Timestamp maxTimeStamp = 0;
61     if (dataExtend.empty()) {
62         LOGE("[SqliteCloudKvExecutorUtils] [GetMaxTimeStamp] data extend is empty.");
63         return maxTimeStamp;
64     }
65     VBucket lastRecord = dataExtend.back();
66     auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
67     if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
68         maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
69     }
70     return maxTimeStamp;
71 }
72 
UpdateBeginTime(SQLiteSingleVerContinueToken & token,const CloudSyncData & data)73 void SqliteCloudKvExecutorUtils::UpdateBeginTime(SQLiteSingleVerContinueToken &token,
74     const CloudSyncData &data)
75 {
76     Timestamp maxTimeStamp = 0;
77     switch (data.mode) {
78         case DistributedDB::CloudWaterType::DELETE:
79             maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
80             break;
81         case DistributedDB::CloudWaterType::UPDATE:
82             maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
83             break;
84         case DistributedDB::CloudWaterType::INSERT:
85             maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
86             break;
87         case DistributedDB::CloudWaterType::BUTT:
88         default:
89             break;
90     }
91     if (maxTimeStamp > token.GetQueryBeginTime()) {
92         token.SetNextBeginTime("", maxTimeStamp);
93         return;
94     }
95     LOGW("[SqliteCloudKvExecutorUtils] The start time of the database has not been updated.");
96 }
97 
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)98 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
99     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
100 {
101     auto &[stepNum, totalSize] = detail;
102     VBucket log;
103     VBucket extraLog;
104     uint32_t preSize = totalSize;
105     int64_t revisedTime = 0;
106     int64_t invalidTime = 0;
107     GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
108     GetCloudExtraLog(statement, extraLog);
109     if (revisedTime != 0) {
110         Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
111         cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
112     }
113 
114     VBucket data;
115     int64_t flag = 0;
116     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
117     if (errCode != E_OK) {
118         return errCode;
119     }
120 
121     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
122         errCode = GetCloudKvData(statement, data, totalSize);
123         if (errCode != E_OK) {
124             return errCode;
125         }
126     }
127 
128     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
129         errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
130     } else {
131         errCode = -E_UNFINISHED;
132     }
133     if (errCode == -E_IGNORE_DATA) {
134         errCode = E_OK;
135         totalSize = preSize;
136         stepNum--;
137     }
138     return errCode;
139 }
140 
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)141 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
142     uint32_t &totalSize, int64_t &revisedTime, int64_t &invalidTime)
143 {
144     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
145     uint64_t curTime = 0;
146     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
147         if (modifyTime > static_cast<int64_t>(curTime)) {
148             invalidTime = modifyTime;
149             modifyTime = static_cast<int64_t>(curTime);
150             revisedTime = modifyTime;
151         }
152     } else {
153         LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
154     }
155     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
156     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
157         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
158     totalSize += sizeof(int64_t) + sizeof(int64_t);
159     if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
160         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
161             sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
162         if (!cloudGid.empty()) {
163             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
164             totalSize += cloudGid.size();
165         }
166     }
167     if (revisedTime != 0) {
168         std::string cloudGid;
169         if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
170             cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
171         }
172         LOGW("[SqliteCloudKvExecutorUtils] Found invalid mod time: %lld, curTime: %lld, gid: %s",
173             invalidTime, revisedTime, DBCommon::StringMiddleMasking(cloudGid).c_str());
174     }
175     std::string version;
176     SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
177     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
178     totalSize += version.size();
179 }
180 
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)181 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
182 {
183     flags.insert_or_assign(DBConstant::ROWID,
184         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
185     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
186         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
187     flags.insert_or_assign(CloudDbConstant::FLAG,
188         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
189     Bytes hashKey;
190     (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
191     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
192     flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
193         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
194 }
195 
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)196 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
197 {
198     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
199     if (errCode != E_OK) {
200         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get key failed:%d", errCode);
201         return errCode;
202     }
203     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
204     if (errCode != E_OK) {
205         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get value failed:%d", errCode);
206         return errCode;
207     }
208     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
209     if (errCode != E_OK) {
210         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get device failed:%d", errCode);
211         return errCode;
212     }
213     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
214         totalSize);
215     if (errCode != E_OK) {
216         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudKvData] Get ori device failed:%d", errCode);
217         return errCode;
218     }
219     data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
220         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
221     totalSize += sizeof(int64_t);
222     return E_OK;
223 }
224 
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)225 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
226     VBucket &data, uint32_t &totalSize)
227 {
228     std::vector<uint8_t> blob;
229     int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
230     if (errCode != E_OK) {
231         LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
232         return errCode;
233     }
234     std::string tmp = std::string(blob.begin(), blob.end());
235     if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
236         keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
237         if (tmp.empty()) {
238             errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
239             if (errCode != E_OK) {
240                 LOGE("[SqliteCloudKvExecutorUtils] Get local identity failed %d", errCode);
241                 return errCode;
242             }
243             tmp = DBCommon::TransferHashString(tmp);
244         }
245         tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
246     }
247     totalSize += tmp.size();
248     data.insert_or_assign(keyStr, tmp);
249     return E_OK;
250 }
251 
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)252 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
253     const VBucket &cloudData, const std::string &userId)
254 {
255     std::pair<int, DataInfoWithLog> res;
256     int &errCode = res.first;
257     std::string keyStr;
258     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
259     if (errCode == -E_NOT_FOUND) {
260         LOGW("[SqliteCloudKvExecutorUtils] Get key not found.");
261         errCode = E_OK;
262     }
263     if (errCode != E_OK) {
264         LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
265         return res;
266     }
267     Bytes key;
268     DBCommon::StringToVector(keyStr, key);
269     Bytes hashKey;
270     DBCommon::CalcValueHash(key, hashKey);
271     sqlite3_stmt *stmt = nullptr;
272     std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
273     if (errCode != E_OK) {
274         LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
275         return res;
276     }
277     std::string gid;
278     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
279     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
280         LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
281         return res;
282     }
283     return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
284 }
285 
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)286 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
287     bool existKey, bool emptyUserId)
288 {
289     std::pair<int, sqlite3_stmt*> res;
290     auto &[errCode, stmt] = res;
291     std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
292     if (!emptyUserId) {
293         querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
294     }
295     std::string sql = querySql;
296     sql += " WHERE cloud_gid = ?";
297     if (existKey) {
298         sql += " UNION ";
299         sql += querySql;
300         sql += " WHERE sync_data.hash_key = ?";
301     }
302     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
303     return res;
304 }
305 
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)306 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
307     const std::string &gid, const Bytes &key, const std::string &userId)
308 {
309     ResFinalizer finalizer([stmt]() {
310         sqlite3_stmt *statement = stmt;
311         int ret = E_OK;
312         SQLiteUtils::ResetStatement(statement, true, ret);
313         if (ret != E_OK) {
314             LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
315         }
316     });
317     std::pair<int, DataInfoWithLog> res;
318     auto &[errCode, logInfo] = res;
319     int index = 1;
320     if (!userId.empty()) {
321         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
322         if (errCode != E_OK) {
323             LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
324             return res;
325         }
326     }
327     errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
328     if (errCode != E_OK) {
329         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
330         return res;
331     }
332     if (!key.empty()) {
333         if (!userId.empty()) {
334             errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
335             if (errCode != E_OK) {
336                 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
337                 return res;
338             }
339         }
340         errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
341         if (errCode != E_OK) {
342             LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
343             return res;
344         }
345     }
346     errCode = SQLiteUtils::StepNext(stmt, isMemory);
347     if (errCode == -E_FINISHED) {
348         errCode = -E_NOT_FOUND;
349         // not found is ok, just return error
350         return res;
351     }
352     if (errCode != E_OK) {
353         LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
354         return res;
355     }
356     logInfo = FillLogInfoWithStmt(stmt);
357     return res;
358 }
359 
FillLogInfoWithStmt(sqlite3_stmt * stmt)360 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
361 {
362     DataInfoWithLog dataInfoWithLog;
363     int index = 0;
364     dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
365     dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
366     std::vector<uint8_t> device;
367     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
368     DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
369     std::vector<uint8_t> oriDev;
370     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
371     DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
372     dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
373     dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
374     std::string gid;
375     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
376     dataInfoWithLog.logInfo.cloudGid = gid;
377     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
378     Bytes key;
379     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
380     std::string keyStr(key.begin(), key.end());
381     dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
382     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
383     dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
384     return dataInfoWithLog;
385 }
386 
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)387 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
388 {
389     if (downloadData.data.size() != downloadData.opType.size()) {
390         LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
391             downloadData.opType.size());
392         return -E_CLOUD_ERROR;
393     }
394     std::map<int, int> statisticMap = {};
395     int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
396     LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
397         " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
398         " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
399         errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
400         statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
401         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
402         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
403         statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
404         statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
405     return errCode;
406 }
407 
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)408 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
409     DownloadData &downloadData, std::map<int, int> &statisticMap)
410 {
411     int index = 0;
412     int errCode = E_OK;
413     for (OpType op : downloadData.opType) {
414         switch (op) {
415             case OpType::INSERT: // fallthrough
416             case OpType::UPDATE: // fallthrough
417             case OpType::DELETE: // fallthrough
418                 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
419                 break;
420             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
421             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
422             case OpType::UPDATE_TIMESTAMP:               // fallthrough
423                 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
424                 if (errCode != E_OK) {
425                     break;
426                 }
427                 [[fallthrough]];
428             case OpType::ONLY_UPDATE_GID:                // fallthrough
429             case OpType::NOT_HANDLE:                     // fallthrough
430             case OpType::CLEAR_GID:                      // fallthrough
431                 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
432                 break;
433             default:
434                 errCode = -E_CLOUD_ERROR;
435                 break;
436         }
437         if (errCode != E_OK) {
438             LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
439             return errCode;
440         }
441         statisticMap[static_cast<int>(op)]++;
442         index++;
443     }
444     return errCode;
445 }
446 
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)447 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
448     DownloadData &downloadData)
449 {
450     sqlite3_stmt *logStmt = nullptr;
451     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
452     if (errCode != E_OK) {
453         LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
454         return errCode;
455     }
456     sqlite3_stmt *dataStmt = nullptr;
457     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
458     if (errCode != E_OK) {
459         int ret = E_OK;
460         SQLiteUtils::ResetStatement(logStmt, true, ret);
461         LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
462         return errCode;
463     }
464     ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
465         sqlite3_stmt *statement = logStmt;
466         int ret = E_OK;
467         SQLiteUtils::ResetStatement(statement, true, ret);
468         if (ret != E_OK) {
469             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
470         }
471         statement = dataStmt;
472         SQLiteUtils::ResetStatement(statement, true, ret);
473         if (ret != E_OK) {
474             LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
475         }
476     });
477     errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
478     if (errCode != E_OK) {
479         LOGE("[SqliteCloudKvExecutorUtils] BindStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
480         return errCode;
481     }
482     errCode = StepStmt(logStmt, dataStmt, isMemory);
483     if (errCode != E_OK) {
484         LOGE("[SqliteCloudKvExecutorUtils] StepStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
485         return errCode;
486     }
487     if (opType == OpType::INSERT || opType == OpType::UPDATE || opType == OpType::DELETE) {
488         return OperateOtherUserLog(db, isMemory, index, downloadData);
489     }
490     return errCode;
491 }
492 
OperateOtherUserLog(sqlite3 * db,bool isMemory,int index,DownloadData & downloadData)493 int SqliteCloudKvExecutorUtils::OperateOtherUserLog(sqlite3 *db, bool isMemory, int index, DownloadData &downloadData)
494 {
495     auto [errCode, dataItem] = GetDataItem(index, downloadData);
496     if (errCode != E_OK) {
497         return errCode;
498     }
499     sqlite3_stmt *logStmt = nullptr;
500     std::string sql = "UPDATE naturalbase_kv_aux_sync_data_log SET cloud_flag  = cloud_flag & ~0x2000 "
501                         "WHERE userid != ? AND hash_key = ?";
502     errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
503     if (errCode != E_OK) {
504         LOGE("[SqliteCloudKvExecutorUtils] Get operate other log statement failed %d", errCode);
505         return errCode;
506     }
507     ResFinalizer finalizerData([logStmt]() {
508         sqlite3_stmt *statement = logStmt;
509         int ret = E_OK;
510         SQLiteUtils::ResetStatement(statement, true, ret);
511         if (ret != E_OK) {
512             LOGW("[SqliteCloudKvExecutorUtils] Reset operate other log stmt failed %d ", ret);
513         }
514     });
515     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, downloadData.user);
516     if (errCode != E_OK) {
517         LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log user failed %d when insert", errCode);
518         return errCode;
519     }
520     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
521     if (errCode != E_OK) {
522         LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log hashKey failed %d when insert", errCode);
523         return errCode;
524     }
525     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
526     if (errCode == -E_FINISHED) {
527         return E_OK;
528     }
529     return errCode;
530 }
531 
GetOperateDataSql(OpType opType)532 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
533 {
534     switch (opType) {
535         case OpType::INSERT:
536             return INSERT_SYNC_SQL;
537         case OpType::UPDATE: // fallthrough
538         case OpType::DELETE:
539             return UPDATE_SYNC_SQL;
540         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
541             return SET_SYNC_DATA_NO_FORCE_PUSH;
542         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
543             return SET_SYNC_DATA_FORCE_PUSH;
544         case OpType::UPDATE_TIMESTAMP:
545             return UPDATE_TIMESTAMP;
546         default:
547             return "";
548     }
549 }
550 
GetOperateLogSql(OpType opType)551 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
552 {
553     switch (opType) {
554         case OpType::INSERT: // fallthrough
555         case OpType::UPDATE:
556             return INSERT_CLOUD_SYNC_DATA_LOG;
557         case OpType::DELETE:
558             return UPDATE_CLOUD_SYNC_DATA_LOG;
559         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
560         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
561         case OpType::UPDATE_TIMESTAMP:               // fallthrough
562         case OpType::ONLY_UPDATE_GID:                // fallthrough
563         case OpType::NOT_HANDLE:                     // fallthrough
564         case OpType::CLEAR_GID:                      // fallthrough
565             return UPSERT_CLOUD_SYNC_DATA_LOG;
566         default:
567             return "";
568     }
569 }
570 
TransToOpType(const CloudWaterType type)571 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
572 {
573     switch (type) {
574         case CloudWaterType::INSERT:
575             return OpType::INSERT;
576         case CloudWaterType::UPDATE:
577             return OpType::UPDATE;
578         case CloudWaterType::DELETE:
579             return OpType::DELETE;
580         default:
581             return OpType::NOT_HANDLE;
582     }
583 }
584 
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)585 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
586     const DataItem &dataItem)
587 {
588     int index = 0;
589     int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
590     if (errCode != E_OK) {
591         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
592         return errCode;
593     }
594     errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
595     if (errCode != E_OK) {
596         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
597     }
598     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
599     if (errCode != E_OK) {
600         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
601         return errCode;
602     }
603     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
604     if (errCode != E_OK) {
605         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
606         return errCode;
607     }
608     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
609     if (errCode != E_OK) {
610         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
611         return errCode;
612     }
613     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
614     if (errCode != E_OK) {
615         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
616         return errCode;
617     }
618     return errCode;
619 }
620 
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)621 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
622     DownloadData &downloadData)
623 {
624     auto [errCode, dataItem] = GetDataItem(index, downloadData);
625     if (errCode != E_OK) {
626         return errCode;
627     }
628     switch (opType) {
629         case OpType::INSERT:
630             return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
631         case OpType::UPDATE:
632             return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
633         case OpType::DELETE:
634             dataItem.hashKey = downloadData.existDataHashKey[index];
635             dataItem.gid.clear();
636             dataItem.version.clear();
637             return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
638         default:
639             return E_OK;
640     }
641 }
642 
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)643 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
644     const std::string &user, const DataItem &dataItem)
645 {
646     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
647     if (errCode != E_OK) {
648         return errCode;
649     }
650     return BindDataStmt(dataStmt, dataItem, true);
651 }
652 
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,bool isTagLogin)653 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
654     const DataItem &dataItem, bool isTagLogin)
655 {
656     int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
657     if (errCode != E_OK) {
658         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
659         return errCode;
660     }
661     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
662     if (errCode != E_OK) {
663         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
664         return errCode;
665     }
666     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
667     if (errCode != E_OK) {
668         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
669         return errCode;
670     }
671     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
672     if (errCode != E_OK) {
673         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
674         return errCode;
675     }
676     int64_t flag = dataItem.cloud_flag | (isTagLogin ? static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIN_USER) : 0x00);
677     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, flag);
678     if (errCode != E_OK) {
679         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
680     }
681     return errCode;
682 }
683 
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)684 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
685     const DataItem &dataItem)
686 {
687     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
688     if (errCode != E_OK) {
689         return errCode;
690     }
691     errCode = BindDataStmt(dataStmt, dataItem, false);
692     if (errCode != E_OK) {
693         return errCode;
694     }
695     return E_OK;
696 }
697 
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)698 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
699     const DataItem &dataItem)
700 {
701     int index = 1;
702     int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
703     if (errCode != E_OK) {
704         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
705         return errCode;
706     }
707     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
708     if (errCode != E_OK) {
709         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
710         return errCode;
711     }
712     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
713     if (errCode != E_OK) {
714         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
715         return errCode;
716     }
717     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
718     if (errCode != E_OK) {
719         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
720         return errCode;
721     }
722     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
723     if (errCode != E_OK) {
724         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
725     }
726     return errCode;
727 }
728 
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)729 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
730     DataItem &dataItem)
731 {
732     dataItem.key = {};
733     dataItem.value = {};
734     dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
735     int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
736     if (errCode != E_OK) {
737         return errCode;
738     }
739     errCode = BindDataStmt(dataStmt, dataItem, false);
740     if (errCode != E_OK) {
741         return errCode;
742     }
743     return E_OK;
744 }
745 
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)746 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
747 {
748     int index = 1;
749     int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
750     if (errCode != E_OK) {
751         return errCode;
752     }
753     errCode = BindCloudDataStmt(dataStmt, dataItem, index);
754     if (errCode != E_OK) {
755         return errCode;
756     }
757     if (!isInsert) {
758         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
759         if (errCode != E_OK) {
760             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
761         }
762     }
763     return errCode;
764 }
765 
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)766 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
767     int &index)
768 {
769     int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
770     if (errCode != E_OK) {
771         LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
772         return errCode;
773     }
774     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
775     if (errCode != E_OK) {
776         LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
777         return errCode;
778     }
779     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
780     if (errCode != E_OK) {
781         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
782         return errCode;
783     }
784     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
785     if (errCode != E_OK) {
786         LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
787         return errCode;
788     }
789     Bytes bytes;
790     DBCommon::StringToVector(dataItem.dev, bytes);
791     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
792     if (errCode != E_OK) {
793         LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
794         return errCode;
795     }
796     DBCommon::StringToVector(dataItem.origDev, bytes);
797     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
798     if (errCode != E_OK) {
799         LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
800         return errCode;
801     }
802     if (isInsert) {
803         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
804         if (errCode != E_OK) {
805             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
806             return errCode;
807         }
808     }
809     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
810     if (errCode != E_OK) {
811         LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
812     }
813     return errCode;
814 }
815 
GetModifyTime(int64_t timestamp)816 int64_t SqliteCloudKvExecutorUtils::GetModifyTime(int64_t timestamp)
817 {
818     uint64_t curTime = 0;
819     int64_t modifyTime = timestamp;
820     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
821         if (curTime < static_cast<uint64_t>(INT64_MAX)) {
822             if (modifyTime > static_cast<int64_t>(curTime)) {
823                 modifyTime = static_cast<int64_t>(curTime);
824             }
825         }
826     }
827     return modifyTime;
828 }
829 
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)830 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
831 {
832     int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
833     if (errCode != E_OK) {
834         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
835         return errCode;
836     }
837     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
838     if (errCode != E_OK) {
839         LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
840     }
841     return errCode;
842 }
843 
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)844 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
845 {
846     int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
847     if (errCode != -E_FINISHED) {
848         LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
849         return errCode;
850     }
851     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
852     if (errCode != -E_FINISHED) {
853         LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
854         return errCode;
855     }
856     return E_OK;
857 }
858 
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)859 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam &param, OpType opType, const CloudSyncData &data,
860     const std::string &user, CloudUploadRecorder &recorder)
861 {
862     if (param.first == nullptr) {
863         LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
864         return -E_INVALID_ARGS;
865     }
866     if (data.isCloudVersionRecord) {
867         int errCode = FillCloudVersionRecord(param.first, opType, data);
868         if (errCode != E_OK) {
869             return errCode;
870         }
871     }
872     switch (opType) {
873         case OpType::INSERT:
874             return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
875         case OpType::UPDATE:
876             return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
877         case OpType::DELETE:
878             return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
879         default:
880             return E_OK;
881     }
882 }
883 
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)884 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
885     DownloadData &downloadData)
886 {
887     if (downloadData.existDataHashKey[index].empty()) {
888         return E_OK;
889     }
890     sqlite3_stmt *logStmt = nullptr;
891     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
892     if (errCode != E_OK) {
893         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
894         return errCode;
895     }
896     ResFinalizer finalizerData([logStmt]() {
897         sqlite3_stmt *statement = logStmt;
898         int ret = E_OK;
899         SQLiteUtils::ResetStatement(statement, true, ret);
900         if (ret != E_OK) {
901             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
902         }
903     });
904     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
905     if (res.first != E_OK) {
906         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
907         return res.first;
908     }
909     bool clearCloudInfo = (op == OpType::CLEAR_GID);
910     if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
911         res.second.hashKey = downloadData.existDataHashKey[index];
912         clearCloudInfo = true;
913     }
914     if (clearCloudInfo) {
915         res.second.gid.clear();
916         res.second.version.clear();
917     }
918     errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
919     if (errCode != E_OK) {
920         return errCode;
921     }
922     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
923     if (errCode == -E_FINISHED) {
924         errCode = E_OK;
925     }
926     return errCode;
927 }
928 
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)929 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam &param, const CloudSyncBatch &data,
930     const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
931 {
932     auto [db, ignoreEmptyGid] = param;
933     sqlite3_stmt *logStmt = nullptr;
934     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
935     if (errCode != E_OK) {
936         return errCode;
937     }
938     ResFinalizer finalizerData([logStmt]() {
939         sqlite3_stmt *statement = logStmt;
940         int ret = E_OK;
941         SQLiteUtils::ResetStatement(statement, true, ret);
942         if (ret != E_OK) {
943             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
944         }
945     });
946     for (size_t i = 0; i < data.hashKey.size(); ++i) {
947         if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
948             continue;
949         }
950         DataItem dataItem;
951         errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
952         if (dataItem.gid.empty() && ignoreEmptyGid) {
953             continue;
954         }
955         if (errCode != E_OK) {
956             return errCode;
957         }
958         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
959         dataItem.hashKey = data.hashKey[i];
960         dataItem.dataDelete = CheckDataDelete(param, data, i);
961         errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
962         if (errCode != E_OK) {
963             return errCode;
964         }
965         errCode = SQLiteUtils::StepNext(logStmt, false);
966         if (errCode == -E_FINISHED) {
967             errCode = E_OK;
968         }
969         if (errCode != E_OK) {
970             LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
971             return errCode;
972         }
973         SQLiteUtils::ResetStatement(logStmt, false, errCode);
974         MarkUploadSuccess(param, data, user, i);
975         // ignored version record
976         if (i < data.timestamp.size()) {
977             recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
978         }
979     }
980     return E_OK;
981 }
982 
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)983 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
984     DownloadData &downloadData)
985 {
986     if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
987         opType != OpType::UPDATE_TIMESTAMP) {
988         LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
989         return E_OK;
990     }
991     sqlite3_stmt *dataStmt = nullptr;
992     int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
993     if (errCode != E_OK) {
994         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
995         return errCode;
996     }
997     ResFinalizer finalizerData([dataStmt]() {
998         sqlite3_stmt *statement = dataStmt;
999         int ret = E_OK;
1000         SQLiteUtils::ResetStatement(statement, true, ret);
1001         if (ret != E_OK) {
1002             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
1003         }
1004     });
1005     errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
1006     if (errCode != E_OK) {
1007         LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
1008         return errCode;
1009     }
1010     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
1011     if (errCode == -E_FINISHED) {
1012         errCode = E_OK;
1013     }
1014     return errCode;
1015 }
1016 
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)1017 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
1018     DownloadData &downloadData)
1019 {
1020     switch (opType) {
1021         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1022         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1023             return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
1024         case OpType::UPDATE_TIMESTAMP:
1025             return BindUpdateTimestampStmt(dataStmt, index, downloadData);
1026         default:
1027             return E_OK;
1028     }
1029 }
1030 
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)1031 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
1032 {
1033     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1034     auto &[errCode, dataItem] = res;
1035     if (errCode != E_OK) {
1036         return errCode;
1037     }
1038     int currentBindIndex = 1; // bind sql index start at 1
1039     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
1040     if (errCode != E_OK) {
1041         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
1042         return errCode;
1043     }
1044     int64_t modifyTime = GetModifyTime(dataItem.modifyTime);
1045     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, modifyTime);
1046     if (errCode != E_OK) {
1047         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
1048         return errCode;
1049     }
1050     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
1051     if (errCode != E_OK) {
1052         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
1053         return errCode;
1054     }
1055     return E_OK;
1056 }
1057 
GetDataItem(int index,DownloadData & downloadData)1058 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
1059 {
1060     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1061     auto &[errCode, dataItem] = res;
1062     if (errCode != E_OK) {
1063         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1064         return res;
1065     }
1066     std::string dev;
1067     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
1068     if (errCode != E_OK) {
1069         return res;
1070     }
1071     dev = DBCommon::TransferHashString(dev);
1072     auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
1073     if (!decodeDevice.empty()) {
1074         dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
1075     }
1076     if (dataItem.dev == dev) {
1077         dataItem.dev = "";
1078     }
1079     decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
1080     if (!decodeDevice.empty()) {
1081         dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
1082     }
1083     if (dataItem.origDev == dev) {
1084         dataItem.origDev = "";
1085     }
1086     int64_t timestamp = GetModifyTime(dataItem.modifyTime);
1087     dataItem.timestamp = static_cast<Timestamp>(timestamp + downloadData.timeOffset);
1088     dataItem.writeTimestamp = static_cast<Timestamp>(timestamp); // writeTimestamp is process conflict time
1089     return res;
1090 }
1091 
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,const std::string & sql)1092 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1093     const Timestamp &timestamp, const std::string &user, const std::string &sql)
1094 {
1095     std::pair<int, int64_t> res;
1096     auto &[errCode, count] = res;
1097     sqlite3_stmt *stmt = nullptr;
1098     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1099     if (errCode != E_OK) {
1100         LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1101         return res;
1102     }
1103     ResFinalizer finalizer([stmt]() {
1104         sqlite3_stmt *statement = stmt;
1105         int ret = E_OK;
1106         SQLiteUtils::ResetStatement(statement, true, ret);
1107         if (ret != E_OK) {
1108             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1109         }
1110     });
1111     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1112     if (errCode != E_OK) {
1113         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1114         return res;
1115     }
1116     errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1117     if (errCode != E_OK) {
1118         LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1119         return res;
1120     }
1121     errCode = SQLiteUtils::StepNext(stmt, isMemory);
1122     if (errCode == -E_FINISHED) {
1123         count = 0;
1124         return res;
1125     }
1126     count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1127     LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1128     return res;
1129 }
1130 
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1131 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1132     const Timestamp &timestamp, const std::string &user, bool forcePush)
1133 {
1134     std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1135     return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1136 }
1137 
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1138 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam &param,
1139     const std::vector<Timestamp> &timestampVec, const std::string &user, bool forcePush,
1140     QuerySyncObject &querySyncObject)
1141 {
1142     std::pair<int, int64_t> res = { E_OK, 0 };
1143     auto &[errCode, count] = res;
1144     if (timestampVec.size() != 3) { // 3 is the number of three mode.
1145         errCode = -E_INVALID_ARGS;
1146         return res;
1147     }
1148     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1149     SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1150     if (errCode != E_OK) {
1151         return res;
1152     }
1153     for (size_t i = 0; i < typeVec.size(); i++) {
1154         sqlite3_stmt *stmt = nullptr;
1155         errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1156         if (errCode != E_OK) {
1157             return res;
1158         }
1159         // count no use watermark
1160         auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1161         if (err != E_OK) {
1162             return { err, 0 };
1163         }
1164         count += cnt;
1165     }
1166     return res;
1167 }
1168 
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1169 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1170 {
1171     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1172         return E_OK;
1173     }
1174     bool isInsert = (opType == OpType::INSERT);
1175     CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1176     if (syncBatch.record.empty()) {
1177         LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1178         return E_OK;
1179     }
1180     syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1181     auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1182     auto &[errCode, dataItem] = res;
1183     sqlite3_stmt *dataStmt = nullptr;
1184     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1185     if (errCode != E_OK) {
1186         LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1187         return errCode;
1188     }
1189     ResFinalizer finalizerData([dataStmt]() {
1190         int ret = E_OK;
1191         sqlite3_stmt *statement = dataStmt;
1192         SQLiteUtils::ResetStatement(statement, true, ret);
1193         if (ret != E_OK) {
1194             LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1195         }
1196     });
1197     errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1198     if (errCode != E_OK) {
1199         return errCode;
1200     }
1201     errCode = SQLiteUtils::StepNext(dataStmt, false);
1202     if (errCode != -E_FINISHED) {
1203         LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1204         return errCode;
1205     }
1206     return E_OK;
1207 }
1208 
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1209 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1210     const std::string &user)
1211 {
1212     auto res = GetLocalCloudVersionInner(db, isMemory, user);
1213     if (res.first != E_OK) {
1214         LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1215     }
1216     return res;
1217 }
1218 
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1219 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1220     const std::string &user)
1221 {
1222     std::pair<int, CloudSyncData> res;
1223     auto &[errCode, syncData] = res;
1224     auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1225     sqlite3_stmt *stmt = nullptr;
1226     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1227     if (errCode != E_OK) {
1228         return res;
1229     }
1230     ResFinalizer finalizerData([stmt]() {
1231         int ret = E_OK;
1232         sqlite3_stmt *statement = stmt;
1233         SQLiteUtils::ResetStatement(statement, true, ret);
1234         if (ret != E_OK) {
1235             LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1236         }
1237     });
1238     std::string hashDev;
1239     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1240     if (errCode != E_OK) {
1241         return res;
1242     }
1243     std::string tempDev = DBCommon::TransferHashString(hashDev);
1244     hashDev = DBCommon::TransferStringToHex(tempDev);
1245     std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1246     Key keyVec;
1247     DBCommon::StringToVector(key, keyVec);
1248     errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1249     if (errCode != E_OK) {
1250         return res;
1251     }
1252     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1253     if (errCode != E_OK) {
1254         return res;
1255     }
1256     errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1257     if (errCode == -E_NOT_FOUND) {
1258         InitDefaultCloudVersionRecord(key, tempDev, syncData);
1259         errCode = E_OK;
1260     }
1261     return res;
1262 }
1263 
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1264 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1265 {
1266     int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1267     if (errCode == -E_FINISHED) {
1268         return -E_NOT_FOUND;
1269     }
1270     if (errCode != E_OK) {
1271         LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1272         return errCode;
1273     }
1274     CloudSyncConfig config;
1275     config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1276     config.maxUploadCount = 1;
1277     CloudUploadRecorder recorder; // ignore last record
1278     UploadDetail detail;
1279     errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1280     return errCode;
1281 }
1282 
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1283 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1284     CloudSyncData &syncData)
1285 {
1286     LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1287     VBucket defaultRecord;
1288     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1289     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1290     auto encodeDev = DBBase64Utils::Encode(dev);
1291     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1292     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1293     syncData.insData.record.push_back(std::move(defaultRecord));
1294     VBucket defaultExtend;
1295     defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1296     syncData.insData.extend.push_back(std::move(defaultExtend));
1297     syncData.insData.assets.emplace_back();
1298     Bytes bytesHashKey;
1299     DBCommon::StringToVector(key, bytesHashKey);
1300     syncData.insData.hashKey.push_back(bytesHashKey);
1301 }
1302 
BindVersionStmt(const std::string & device,sqlite3_stmt * dataStmt)1303 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, sqlite3_stmt *dataStmt)
1304 {
1305     std::string hashDevice;
1306     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1307     if (errCode != E_OK) {
1308         return errCode;
1309     }
1310     Bytes bytes;
1311     if (device == hashDevice) {
1312         DBCommon::StringToVector("", bytes);
1313     } else {
1314         hashDevice = DBCommon::TransferHashString(device);
1315         DBCommon::StringToVector(hashDevice, bytes);
1316     }
1317     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1318     if (errCode != E_OK) {
1319         LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1320     }
1321     return errCode;
1322 }
1323 
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & device,std::vector<VBucket> & dataVector)1324 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &device,
1325     std::vector<VBucket> &dataVector)
1326 {
1327     sqlite3_stmt *dataStmt = nullptr;
1328     bool isDeviceEmpty = device.empty();
1329     std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1330     int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1331     if (errCode != E_OK) {
1332         LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1333         return errCode;
1334     }
1335     ResFinalizer finalizerData([dataStmt]() {
1336         int ret = E_OK;
1337         sqlite3_stmt *statement = dataStmt;
1338         SQLiteUtils::ResetStatement(statement, true, ret);
1339         if (ret != E_OK) {
1340             LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1341         }
1342     });
1343     if (!isDeviceEmpty) {
1344         errCode = BindVersionStmt(device, dataStmt);
1345         if (errCode != E_OK) {
1346             return errCode;
1347         }
1348     }
1349     uint32_t totalSize = 0;
1350     do {
1351         errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1352         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1353             errCode = E_OK;
1354             break;
1355         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1356             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1357             break;
1358         }
1359         VBucket data;
1360         errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1361         dataVector.push_back(data);
1362     } while (errCode == E_OK);
1363     return errCode;
1364 }
1365 
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1366 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1367 {
1368     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1369     if (errCode != E_OK) {
1370         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudVersionRecordData] Get key failed:%d", errCode);
1371         return errCode;
1372     }
1373     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1374     if (errCode != E_OK) {
1375         LOGE("[SqliteCloudKvExecutorUtils] [GetCloudVersionRecordData] Get value failed:%d", errCode);
1376         return errCode;
1377     }
1378     return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1379 }
1380 
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1381 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1382 {
1383     sqlite3_stmt *stmt = nullptr;
1384     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1385     if (errCode != E_OK) {
1386         LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1387         return errCode;
1388     }
1389     ResFinalizer finalizerData([stmt]() {
1390         int ret = E_OK;
1391         sqlite3_stmt *statement = stmt;
1392         SQLiteUtils::ResetStatement(statement, true, ret);
1393         if (ret != E_OK) {
1394             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1395         }
1396     });
1397     uint32_t totalSize = 0;
1398     do {
1399         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1400         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1401             errCode = E_OK;
1402             break;
1403         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1404             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1405             break;
1406         }
1407         VBucket key;
1408         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1409         if (errCode != E_OK) {
1410             return errCode;
1411         }
1412         data.push_back(key);
1413     } while (errCode == E_OK);
1414     return errCode;
1415 }
1416 
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1417 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1418     std::vector<VBucket> &users)
1419 {
1420     sqlite3_stmt *stmt = nullptr;
1421     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1422     if (errCode != E_OK) {
1423         LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1424         return errCode;
1425     }
1426     ResFinalizer finalizerData([stmt]() {
1427         int ret = E_OK;
1428         sqlite3_stmt *statement = stmt;
1429         SQLiteUtils::ResetStatement(statement, true, ret);
1430         if (ret != E_OK) {
1431             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1432         }
1433     });
1434     uint32_t totalSize = 0;
1435     do {
1436         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1437         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1438             errCode = E_OK;
1439             break;
1440         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1441             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1442             break;
1443         }
1444         VBucket key;
1445         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1446             key, totalSize);
1447         if (errCode != E_OK) {
1448             return errCode;
1449         }
1450         users.push_back(key);
1451     } while (errCode == E_OK);
1452     return errCode;
1453 }
1454 
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1455 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1456     std::vector<VBucket> &users)
1457 {
1458     int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1459     if (errCode != E_OK) {
1460         LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1461         return errCode;
1462     }
1463     errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1464     if (errCode != E_OK) {
1465         LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1466     }
1467     return errCode;
1468 }
1469 
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,const QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1470 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1471     const QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1472 {
1473     int errCode = E_OK;
1474     QuerySyncObject query = querySyncObject;
1475     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1476     if (errCode != E_OK) {
1477         return errCode;
1478     }
1479     sqlite3_stmt *stmt = nullptr;
1480     errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1481     if (errCode != E_OK) {
1482         return errCode;
1483     }
1484     do {
1485         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1486         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1487             errCode = E_OK;
1488             break;
1489         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1490             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1491             break;
1492         }
1493         std::string gid;
1494         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1495         cloudGid.push_back(gid);
1496     } while (errCode == E_OK);
1497     int ret = E_OK;
1498     SQLiteUtils::ResetStatement(stmt, true, ret);
1499     return errCode == E_OK ? ret : errCode;
1500 }
1501 
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1502 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1503     const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1504 {
1505     DataItem wItem = dataItem;
1506     if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1507         wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1508     }
1509     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1510         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1511         wItem.gid = {};
1512         wItem.version = {};
1513     }
1514     int errCode = E_OK;
1515     if (type == CloudWaterType::DELETE) {
1516         if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1517             errCode = BindUpdateLogStmt(logStmt, user, wItem);
1518         }
1519     } else {
1520         errCode = BindInsertLogStmt(logStmt, user, wItem, false);
1521     }
1522     if (errCode != E_OK) {
1523         LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1524     }
1525     return errCode;
1526 }
1527 
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1528 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam &param, const CloudSyncBatch &data,
1529     const std::string &user, size_t dataIndex)
1530 {
1531     if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1532         LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1533         return;
1534     }
1535     if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1536         return;
1537     }
1538     if (CheckDataChanged(param, data, dataIndex)) {
1539         LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1540         return;
1541     }
1542     MarkUploadSuccessInner(param, data, user, dataIndex);
1543 }
1544 
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1545 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam &param,
1546     const CloudSyncBatch &data, size_t dataIndex)
1547 {
1548     sqlite3_stmt *checkStmt = nullptr;
1549     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1550     if (errCode != E_OK) {
1551         LOGW("[SqliteCloudKvExecutorUtils] check data change get stmt failed %d", errCode);
1552         return true;
1553     }
1554     ResFinalizer finalizerData([checkStmt]() {
1555         sqlite3_stmt *statement = checkStmt;
1556         int ret = E_OK;
1557         SQLiteUtils::ResetStatement(statement, true, ret);
1558         if (ret != E_OK) {
1559             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1560         }
1561     });
1562     int index = 1;
1563     if (data.timestamp.size() > 0) {
1564         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1565         if (errCode != E_OK) {
1566             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1567             return true;
1568         }
1569     }
1570     if (data.hashKey.size() > 0) {
1571         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1572         if (errCode != E_OK) {
1573             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1574             return true;
1575         }
1576     }
1577     errCode = SQLiteUtils::StepNext(checkStmt);
1578     if (errCode != E_OK) {
1579         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1580         return true;
1581     }
1582     return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1583 }
1584 
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1585 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam &param,
1586     const CloudSyncBatch &data, size_t dataIndex)
1587 {
1588     sqlite3_stmt *checkStmt = nullptr;
1589     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1590     if (errCode != E_OK) {
1591         LOGW("[SqliteCloudKvExecutorUtils] check data delete get stmt failed %d", errCode);
1592         return true;
1593     }
1594     ResFinalizer finalizerData([checkStmt]() {
1595         sqlite3_stmt *statement = checkStmt;
1596         int ret = E_OK;
1597         SQLiteUtils::ResetStatement(statement, true, ret);
1598         if (ret != E_OK) {
1599             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1600         }
1601     });
1602     int index = 1;
1603     if (data.timestamp.size() > 0) {
1604         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1605         if (errCode != E_OK) {
1606             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1607             return true;
1608         }
1609     }
1610     if (data.hashKey.size() > 0) {
1611         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1612         if (errCode != E_OK) {
1613             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1614             return true;
1615         }
1616     }
1617     errCode = SQLiteUtils::StepNext(checkStmt);
1618     if (errCode != E_OK) {
1619         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1620         return true;
1621     }
1622     return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1623 }
1624 
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1625 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam &param,
1626     const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1627 {
1628     sqlite3_stmt *logStmt = nullptr;
1629     int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1630     if (errCode != E_OK) {
1631         LOGW("[SqliteCloudKvExecutorUtils] mark upload success inner get stmt failed %d", errCode);
1632         return;
1633     }
1634     ResFinalizer finalizerData([logStmt]() {
1635         sqlite3_stmt *statement = logStmt;
1636         int ret = E_OK;
1637         SQLiteUtils::ResetStatement(statement, true, ret);
1638         if (ret != E_OK) {
1639             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1640         }
1641     });
1642     int index = 1;
1643     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1644     if (errCode != E_OK) {
1645         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1646         return;
1647     }
1648     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1649     if (errCode != E_OK) {
1650         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1651         return;
1652     }
1653     errCode = SQLiteUtils::StepNext(logStmt);
1654     if (errCode != E_OK && errCode != -E_FINISHED) {
1655         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1656         return;
1657     }
1658 }
1659 }
1660