• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25 
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam &param,
28     const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30     auto [db, isMemory] = param;
31     bool stepNext = false;
32     auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33     if (errCode != E_OK) {
34         token.ReleaseCloudQueryStmt();
35         return errCode;
36     }
37     UploadDetail detail;
38     auto &[stepNum, totalSize] = detail;
39     do {
40         if (stepNext) {
41             errCode = SQLiteUtils::StepNext(stmt, isMemory);
42             if (errCode != E_OK) {
43                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44                 break;
45             }
46         }
47         stepNext = true;
48         errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49         stepNum++;
50     } while (errCode == E_OK);
51     LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52          data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53     if (errCode != -E_UNFINISHED) {
54         token.ReleaseCloudQueryStmt();
55     } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56         token.ReleaseCloudQueryStmt();
57     }
58     return errCode;
59 }
60 
GetMaxTimeStamp(const std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(const std::vector<VBucket> &dataExtend)
62 {
63     Timestamp maxTimeStamp = 0;
64     VBucket lastRecord = dataExtend.back();
65     auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66     if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67         maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68     }
69     return maxTimeStamp;
70 }
71 
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,const CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token,
73     const CloudSyncData &data)
74 {
75     Timestamp maxTimeStamp = 0;
76     switch (data.mode) {
77         case DistributedDB::CloudWaterType::DELETE:
78             maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
79             break;
80         case DistributedDB::CloudWaterType::UPDATE:
81             maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
82             break;
83         case DistributedDB::CloudWaterType::INSERT:
84             maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
85             break;
86         case DistributedDB::CloudWaterType::BUTT:
87         default:
88             break;
89     }
90     if (maxTimeStamp > token.GetQueryBeginTime()) {
91         token.SetNextBeginTime("", maxTimeStamp);
92         return true;
93     }
94     LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
95     return false;
96 }
97 
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)98 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
99     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
100 {
101     auto &[stepNum, totalSize] = detail;
102     VBucket log;
103     VBucket extraLog;
104     uint32_t preSize = totalSize;
105     int64_t revisedTime = 0;
106     int64_t invalidTime = 0;
107     GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
108     GetCloudExtraLog(statement, extraLog);
109     if (revisedTime != 0) {
110         Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
111         cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
112     }
113 
114     VBucket data;
115     int64_t flag = 0;
116     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
117     if (errCode != E_OK) {
118         return errCode;
119     }
120 
121     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
122         errCode = GetCloudKvData(statement, data, totalSize);
123         if (errCode != E_OK) {
124             return errCode;
125         }
126     }
127 
128     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
129         errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
130     } else {
131         errCode = -E_UNFINISHED;
132     }
133     if (errCode == -E_IGNORE_DATA) {
134         errCode = E_OK;
135         totalSize = preSize;
136         stepNum--;
137     }
138     return errCode;
139 }
140 
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)141 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
142     uint32_t &totalSize, int64_t &revisedTime, int64_t &invalidTime)
143 {
144     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
145     uint64_t curTime = 0;
146     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
147         if (modifyTime > static_cast<int64_t>(curTime)) {
148             invalidTime = modifyTime;
149             modifyTime = static_cast<int64_t>(curTime);
150             revisedTime = modifyTime;
151         }
152     } else {
153         LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
154     }
155     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
156     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
157         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
158     totalSize += sizeof(int64_t) + sizeof(int64_t);
159     if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
160         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
161             sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
162         if (!cloudGid.empty()) {
163             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
164             totalSize += cloudGid.size();
165         }
166     }
167     if (revisedTime != 0) {
168         std::string cloudGid;
169         if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
170             cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
171         }
172         LOGW("[SqliteCloudKvExecutorUtils] Found invalid mod time: %lld, curTime: %lld, gid: %s",
173             invalidTime, revisedTime, DBCommon::StringMiddleMasking(cloudGid).c_str());
174     }
175     std::string version;
176     SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
177     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
178     totalSize += version.size();
179 }
180 
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)181 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
182 {
183     flags.insert_or_assign(DBConstant::ROWID,
184         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
185     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
186         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
187     flags.insert_or_assign(CloudDbConstant::FLAG,
188         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
189     Bytes hashKey;
190     (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
191     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
192     flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
193         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
194 }
195 
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)196 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
197 {
198     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
199     if (errCode != E_OK) {
200         return errCode;
201     }
202     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
203     if (errCode != E_OK) {
204         return errCode;
205     }
206     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
207     if (errCode != E_OK) {
208         return errCode;
209     }
210     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
211         totalSize);
212     if (errCode != E_OK) {
213         return errCode;
214     }
215     data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
216         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
217     totalSize += sizeof(int64_t);
218     return E_OK;
219 }
220 
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)221 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
222     VBucket &data, uint32_t &totalSize)
223 {
224     std::vector<uint8_t> blob;
225     int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
226     if (errCode != E_OK) {
227         LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
228         return errCode;
229     }
230     std::string tmp = std::string(blob.begin(), blob.end());
231     if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
232         keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
233         if (tmp.empty()) {
234             errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
235             if (errCode != E_OK) {
236                 return errCode;
237             }
238             tmp = DBCommon::TransferHashString(tmp);
239         }
240         tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
241     }
242     totalSize += tmp.size();
243     data.insert_or_assign(keyStr, tmp);
244     return E_OK;
245 }
246 
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)247 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
248     const VBucket &cloudData, const std::string &userId)
249 {
250     std::pair<int, DataInfoWithLog> res;
251     int &errCode = res.first;
252     std::string keyStr;
253     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
254     if (errCode == -E_NOT_FOUND) {
255         errCode = E_OK;
256     }
257     if (errCode != E_OK) {
258         LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
259         return res;
260     }
261     Bytes key;
262     DBCommon::StringToVector(keyStr, key);
263     Bytes hashKey;
264     DBCommon::CalcValueHash(key, hashKey);
265     sqlite3_stmt *stmt = nullptr;
266     std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
267     if (errCode != E_OK) {
268         LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
269         return res;
270     }
271     std::string gid;
272     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
273     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
274         LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
275         return res;
276     }
277     return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
278 }
279 
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)280 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
281     bool existKey, bool emptyUserId)
282 {
283     std::pair<int, sqlite3_stmt*> res;
284     auto &[errCode, stmt] = res;
285     std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
286     if (!emptyUserId) {
287         querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
288     }
289     std::string sql = querySql;
290     sql += " WHERE cloud_gid = ?";
291     if (existKey) {
292         sql += " UNION ";
293         sql += querySql;
294         sql += " WHERE sync_data.hash_key = ?";
295     }
296     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
297     return res;
298 }
299 
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)300 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
301     const std::string &gid, const Bytes &key, const std::string &userId)
302 {
303     ResFinalizer finalizer([stmt]() {
304         sqlite3_stmt *statement = stmt;
305         int ret = E_OK;
306         SQLiteUtils::ResetStatement(statement, true, ret);
307         if (ret != E_OK) {
308             LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
309         }
310     });
311     std::pair<int, DataInfoWithLog> res;
312     auto &[errCode, logInfo] = res;
313     int index = 1;
314     if (!userId.empty()) {
315         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
316         if (errCode != E_OK) {
317             LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
318             return res;
319         }
320     }
321     errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
322     if (errCode != E_OK) {
323         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
324         return res;
325     }
326     if (!key.empty()) {
327         if (!userId.empty()) {
328             errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
329             if (errCode != E_OK) {
330                 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
331                 return res;
332             }
333         }
334         errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
335         if (errCode != E_OK) {
336             LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
337             return res;
338         }
339     }
340     errCode = SQLiteUtils::StepNext(stmt, isMemory);
341     if (errCode == -E_FINISHED) {
342         errCode = -E_NOT_FOUND;
343         // not found is ok, just return error
344         return res;
345     }
346     if (errCode != E_OK) {
347         LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
348         return res;
349     }
350     logInfo = FillLogInfoWithStmt(stmt);
351     return res;
352 }
353 
FillLogInfoWithStmt(sqlite3_stmt * stmt)354 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
355 {
356     DataInfoWithLog dataInfoWithLog;
357     int index = 0;
358     dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
359     dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
360     std::vector<uint8_t> device;
361     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
362     DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
363     std::vector<uint8_t> oriDev;
364     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
365     DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
366     dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
367     dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
368     std::string gid;
369     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
370     dataInfoWithLog.logInfo.cloudGid = gid;
371     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
372     Bytes key;
373     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
374     std::string keyStr(key.begin(), key.end());
375     dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
376     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
377     dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
378     return dataInfoWithLog;
379 }
380 
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)381 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
382 {
383     if (downloadData.data.size() != downloadData.opType.size()) {
384         LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
385             downloadData.opType.size());
386         return -E_CLOUD_ERROR;
387     }
388     std::map<int, int> statisticMap = {};
389     int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
390     LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
391         " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
392         " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
393         errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
394         statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
395         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
396         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
397         statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
398         statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
399     return errCode;
400 }
401 
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)402 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
403     DownloadData &downloadData, std::map<int, int> &statisticMap)
404 {
405     int index = 0;
406     int errCode = E_OK;
407     for (OpType op : downloadData.opType) {
408         switch (op) {
409             case OpType::INSERT: // fallthrough
410             case OpType::UPDATE: // fallthrough
411             case OpType::DELETE: // fallthrough
412                 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
413                 break;
414             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
415             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
416             case OpType::UPDATE_TIMESTAMP:               // fallthrough
417                 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
418                 if (errCode != E_OK) {
419                     break;
420                 }
421                 [[fallthrough]];
422             case OpType::ONLY_UPDATE_GID:                // fallthrough
423             case OpType::NOT_HANDLE:                     // fallthrough
424             case OpType::CLEAR_GID:                      // fallthrough
425                 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
426                 break;
427             default:
428                 errCode = -E_CLOUD_ERROR;
429                 break;
430         }
431         if (errCode != E_OK) {
432             LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
433             return errCode;
434         }
435         statisticMap[static_cast<int>(op)]++;
436         index++;
437     }
438     return errCode;
439 }
440 
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)441 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
442     DownloadData &downloadData)
443 {
444     sqlite3_stmt *logStmt = nullptr;
445     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
446     if (errCode != E_OK) {
447         LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
448         return errCode;
449     }
450     sqlite3_stmt *dataStmt = nullptr;
451     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
452     if (errCode != E_OK) {
453         int ret = E_OK;
454         SQLiteUtils::ResetStatement(logStmt, true, ret);
455         LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
456         return errCode;
457     }
458     ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
459         sqlite3_stmt *statement = logStmt;
460         int ret = E_OK;
461         SQLiteUtils::ResetStatement(statement, true, ret);
462         if (ret != E_OK) {
463             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
464         }
465         statement = dataStmt;
466         SQLiteUtils::ResetStatement(statement, true, ret);
467         if (ret != E_OK) {
468             LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
469         }
470     });
471     errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
472     if (errCode != E_OK) {
473         LOGE("[SqliteCloudKvExecutorUtils] BindStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
474         return errCode;
475     }
476     errCode = StepStmt(logStmt, dataStmt, isMemory);
477     if (errCode != E_OK) {
478         LOGE("[SqliteCloudKvExecutorUtils] StepStmt data stmt failed %d opType %d", errCode, static_cast<int>(opType));
479         return errCode;
480     }
481     if (opType == OpType::INSERT || opType == OpType::UPDATE || opType == OpType::DELETE) {
482         return OperateOtherUserLog(db, isMemory, index, downloadData);
483     }
484     return errCode;
485 }
486 
OperateOtherUserLog(sqlite3 * db,bool isMemory,int index,DownloadData & downloadData)487 int SqliteCloudKvExecutorUtils::OperateOtherUserLog(sqlite3 *db, bool isMemory, int index, DownloadData &downloadData)
488 {
489     auto [errCode, dataItem] = GetDataItem(index, downloadData);
490     if (errCode != E_OK) {
491         return errCode;
492     }
493     sqlite3_stmt *logStmt = nullptr;
494     std::string sql = "UPDATE naturalbase_kv_aux_sync_data_log SET cloud_flag  = cloud_flag & ~0x2000 "
495                         "WHERE userid != ? AND hash_key = ?";
496     errCode = SQLiteUtils::GetStatement(db, sql, logStmt);
497     if (errCode != E_OK) {
498         LOGE("[SqliteCloudKvExecutorUtils] Get operate other log statement failed %d", errCode);
499         return errCode;
500     }
501     ResFinalizer finalizerData([logStmt]() {
502         sqlite3_stmt *statement = logStmt;
503         int ret = E_OK;
504         SQLiteUtils::ResetStatement(statement, true, ret);
505         if (ret != E_OK) {
506             LOGW("[SqliteCloudKvExecutorUtils] Reset operate other log stmt failed %d ", ret);
507         }
508     });
509     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, downloadData.user);
510     if (errCode != E_OK) {
511         LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log user failed %d when insert", errCode);
512         return errCode;
513     }
514     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
515     if (errCode != E_OK) {
516         LOGE("[SqliteCloudKvExecutorUtils] Bind operate other log hashKey failed %d when insert", errCode);
517         return errCode;
518     }
519     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
520     if (errCode == -E_FINISHED) {
521         return E_OK;
522     }
523     return errCode;
524 }
525 
GetOperateDataSql(OpType opType)526 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
527 {
528     switch (opType) {
529         case OpType::INSERT:
530             return INSERT_SYNC_SQL;
531         case OpType::UPDATE: // fallthrough
532         case OpType::DELETE:
533             return UPDATE_SYNC_SQL;
534         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
535             return SET_SYNC_DATA_NO_FORCE_PUSH;
536         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
537             return SET_SYNC_DATA_FORCE_PUSH;
538         case OpType::UPDATE_TIMESTAMP:
539             return UPDATE_TIMESTAMP;
540         default:
541             return "";
542     }
543 }
544 
GetOperateLogSql(OpType opType)545 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
546 {
547     switch (opType) {
548         case OpType::INSERT: // fallthrough
549         case OpType::UPDATE:
550             return INSERT_CLOUD_SYNC_DATA_LOG;
551         case OpType::DELETE:
552             return UPDATE_CLOUD_SYNC_DATA_LOG;
553         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
554         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
555         case OpType::UPDATE_TIMESTAMP:               // fallthrough
556         case OpType::ONLY_UPDATE_GID:                // fallthrough
557         case OpType::NOT_HANDLE:                     // fallthrough
558         case OpType::CLEAR_GID:                      // fallthrough
559             return UPSERT_CLOUD_SYNC_DATA_LOG;
560         default:
561             return "";
562     }
563 }
564 
TransToOpType(const CloudWaterType type)565 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
566 {
567     switch (type) {
568         case CloudWaterType::INSERT:
569             return OpType::INSERT;
570         case CloudWaterType::UPDATE:
571             return OpType::UPDATE;
572         case CloudWaterType::DELETE:
573             return OpType::DELETE;
574         default:
575             return OpType::NOT_HANDLE;
576     }
577 }
578 
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)579 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
580     const DataItem &dataItem)
581 {
582     int index = 0;
583     int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
584     if (errCode != E_OK) {
585         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
586         return errCode;
587     }
588     errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
589     if (errCode != E_OK) {
590         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
591     }
592     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
593     if (errCode != E_OK) {
594         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
595         return errCode;
596     }
597     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
598     if (errCode != E_OK) {
599         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
600         return errCode;
601     }
602     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
603     if (errCode != E_OK) {
604         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
605         return errCode;
606     }
607     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
608     if (errCode != E_OK) {
609         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
610         return errCode;
611     }
612     return errCode;
613 }
614 
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)615 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
616     DownloadData &downloadData)
617 {
618     auto [errCode, dataItem] = GetDataItem(index, downloadData);
619     if (errCode != E_OK) {
620         return errCode;
621     }
622     switch (opType) {
623         case OpType::INSERT:
624             return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
625         case OpType::UPDATE:
626             return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
627         case OpType::DELETE:
628             dataItem.hashKey = downloadData.existDataHashKey[index];
629             dataItem.gid.clear();
630             dataItem.version.clear();
631             return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
632         default:
633             return E_OK;
634     }
635 }
636 
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)637 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
638     const std::string &user, const DataItem &dataItem)
639 {
640     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
641     if (errCode != E_OK) {
642         return errCode;
643     }
644     return BindDataStmt(dataStmt, dataItem, true);
645 }
646 
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,bool isTagLogin)647 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
648     const DataItem &dataItem, bool isTagLogin)
649 {
650     int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
651     if (errCode != E_OK) {
652         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
653         return errCode;
654     }
655     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
656     if (errCode != E_OK) {
657         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
658         return errCode;
659     }
660     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
661     if (errCode != E_OK) {
662         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
663         return errCode;
664     }
665     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
666     if (errCode != E_OK) {
667         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
668         return errCode;
669     }
670     int64_t flag = dataItem.cloud_flag | (isTagLogin ? static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIN_USER) : 0x00);
671     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, flag);
672     if (errCode != E_OK) {
673         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
674     }
675     return errCode;
676 }
677 
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)678 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
679     const DataItem &dataItem)
680 {
681     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
682     if (errCode != E_OK) {
683         return errCode;
684     }
685     errCode = BindDataStmt(dataStmt, dataItem, false);
686     if (errCode != E_OK) {
687         return errCode;
688     }
689     return E_OK;
690 }
691 
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)692 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
693     const DataItem &dataItem)
694 {
695     int index = 1;
696     int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
697     if (errCode != E_OK) {
698         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
699         return errCode;
700     }
701     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
702     if (errCode != E_OK) {
703         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
704         return errCode;
705     }
706     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
707     if (errCode != E_OK) {
708         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
709         return errCode;
710     }
711     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
712     if (errCode != E_OK) {
713         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
714         return errCode;
715     }
716     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
717     if (errCode != E_OK) {
718         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
719     }
720     return errCode;
721 }
722 
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)723 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
724     DataItem &dataItem)
725 {
726     dataItem.key = {};
727     dataItem.value = {};
728     dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
729     int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
730     if (errCode != E_OK) {
731         return errCode;
732     }
733     errCode = BindDataStmt(dataStmt, dataItem, false);
734     if (errCode != E_OK) {
735         return errCode;
736     }
737     return E_OK;
738 }
739 
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)740 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
741 {
742     int index = 1;
743     int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
744     if (errCode != E_OK) {
745         return errCode;
746     }
747     errCode = BindCloudDataStmt(dataStmt, dataItem, index);
748     if (errCode != E_OK) {
749         return errCode;
750     }
751     if (!isInsert) {
752         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
753         if (errCode != E_OK) {
754             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
755         }
756     }
757     return errCode;
758 }
759 
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)760 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
761     int &index)
762 {
763     int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
764     if (errCode != E_OK) {
765         LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
766         return errCode;
767     }
768     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
769     if (errCode != E_OK) {
770         LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
771         return errCode;
772     }
773     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
774     if (errCode != E_OK) {
775         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
776         return errCode;
777     }
778     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
779     if (errCode != E_OK) {
780         LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
781         return errCode;
782     }
783     Bytes bytes;
784     DBCommon::StringToVector(dataItem.dev, bytes);
785     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
786     if (errCode != E_OK) {
787         LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
788         return errCode;
789     }
790     DBCommon::StringToVector(dataItem.origDev, bytes);
791     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
792     if (errCode != E_OK) {
793         LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
794         return errCode;
795     }
796     if (isInsert) {
797         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
798         if (errCode != E_OK) {
799             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
800             return errCode;
801         }
802     }
803     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
804     if (errCode != E_OK) {
805         LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
806     }
807     return errCode;
808 }
809 
GetModifyTime(int64_t timestamp)810 int64_t SqliteCloudKvExecutorUtils::GetModifyTime(int64_t timestamp)
811 {
812     uint64_t curTime = 0;
813     int64_t modifyTime = timestamp;
814     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
815         if (curTime < static_cast<uint64_t>(INT64_MAX)) {
816             if (modifyTime > static_cast<int64_t>(curTime)) {
817                 modifyTime = static_cast<int64_t>(curTime);
818             }
819         }
820     }
821     return modifyTime;
822 }
823 
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)824 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
825 {
826     int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
827     if (errCode != E_OK) {
828         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
829         return errCode;
830     }
831     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
832     if (errCode != E_OK) {
833         LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
834     }
835     return errCode;
836 }
837 
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)838 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
839 {
840     int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
841     if (errCode != -E_FINISHED) {
842         LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
843         return errCode;
844     }
845     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
846     if (errCode != -E_FINISHED) {
847         LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
848         return errCode;
849     }
850     return E_OK;
851 }
852 
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)853 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam &param, OpType opType, const CloudSyncData &data,
854     const std::string &user, CloudUploadRecorder &recorder)
855 {
856     if (param.first == nullptr) {
857         LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
858         return -E_INVALID_ARGS;
859     }
860     if (data.isCloudVersionRecord) {
861         int errCode = FillCloudVersionRecord(param.first, opType, data);
862         if (errCode != E_OK) {
863             return errCode;
864         }
865     }
866     switch (opType) {
867         case OpType::INSERT:
868             return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
869         case OpType::UPDATE:
870             return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
871         case OpType::DELETE:
872             return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
873         default:
874             return E_OK;
875     }
876 }
877 
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)878 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
879     DownloadData &downloadData)
880 {
881     if (downloadData.existDataHashKey[index].empty()) {
882         return E_OK;
883     }
884     sqlite3_stmt *logStmt = nullptr;
885     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
886     if (errCode != E_OK) {
887         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
888         return errCode;
889     }
890     ResFinalizer finalizerData([logStmt]() {
891         sqlite3_stmt *statement = logStmt;
892         int ret = E_OK;
893         SQLiteUtils::ResetStatement(statement, true, ret);
894         if (ret != E_OK) {
895             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
896         }
897     });
898     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
899     if (res.first != E_OK) {
900         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
901         return res.first;
902     }
903     bool clearCloudInfo = (op == OpType::CLEAR_GID);
904     if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
905         res.second.hashKey = downloadData.existDataHashKey[index];
906         clearCloudInfo = true;
907     }
908     if (clearCloudInfo) {
909         res.second.gid.clear();
910         res.second.version.clear();
911     }
912     errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
913     if (errCode != E_OK) {
914         return errCode;
915     }
916     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
917     if (errCode == -E_FINISHED) {
918         errCode = E_OK;
919     }
920     return errCode;
921 }
922 
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)923 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam &param, const CloudSyncBatch &data,
924     const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
925 {
926     auto [db, ignoreEmptyGid] = param;
927     sqlite3_stmt *logStmt = nullptr;
928     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
929     ResFinalizer finalizerData([logStmt]() {
930         sqlite3_stmt *statement = logStmt;
931         int ret = E_OK;
932         SQLiteUtils::ResetStatement(statement, true, ret);
933         if (ret != E_OK) {
934             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
935         }
936     });
937     for (size_t i = 0; i < data.hashKey.size(); ++i) {
938         if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
939             continue;
940         }
941         DataItem dataItem;
942         errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
943         if (dataItem.gid.empty() && ignoreEmptyGid) {
944             continue;
945         }
946         if (errCode != E_OK) {
947             return errCode;
948         }
949         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
950         dataItem.hashKey = data.hashKey[i];
951         dataItem.dataDelete = CheckDataDelete(param, data, i);
952         errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
953         if (errCode != E_OK) {
954             return errCode;
955         }
956         errCode = SQLiteUtils::StepNext(logStmt, false);
957         if (errCode == -E_FINISHED) {
958             errCode = E_OK;
959         }
960         if (errCode != E_OK) {
961             LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
962             return errCode;
963         }
964         SQLiteUtils::ResetStatement(logStmt, false, errCode);
965         MarkUploadSuccess(param, data, user, i);
966         // ignored version record
967         if (i >= data.timestamp.size()) {
968             continue;
969         }
970         recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
971     }
972     return E_OK;
973 }
974 
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)975 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
976     DownloadData &downloadData)
977 {
978     if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
979         opType != OpType::UPDATE_TIMESTAMP) {
980         LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
981         return E_OK;
982     }
983     sqlite3_stmt *dataStmt = nullptr;
984     int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
985     if (errCode != E_OK) {
986         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
987         return errCode;
988     }
989     ResFinalizer finalizerData([dataStmt]() {
990         sqlite3_stmt *statement = dataStmt;
991         int ret = E_OK;
992         SQLiteUtils::ResetStatement(statement, true, ret);
993         if (ret != E_OK) {
994             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
995         }
996     });
997     errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
998     if (errCode != E_OK) {
999         LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
1000         return errCode;
1001     }
1002     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
1003     if (errCode == -E_FINISHED) {
1004         errCode = E_OK;
1005     }
1006     return errCode;
1007 }
1008 
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)1009 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
1010     DownloadData &downloadData)
1011 {
1012     switch (opType) {
1013         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1014         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1015             return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
1016         case OpType::UPDATE_TIMESTAMP:
1017             return BindUpdateTimestampStmt(dataStmt, index, downloadData);
1018         default:
1019             return E_OK;
1020     }
1021 }
1022 
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)1023 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
1024 {
1025     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1026     auto &[errCode, dataItem] = res;
1027     if (errCode != E_OK) {
1028         return errCode;
1029     }
1030     int currentBindIndex = 1; // bind sql index start at 1
1031     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
1032     if (errCode != E_OK) {
1033         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
1034         return errCode;
1035     }
1036     int64_t modifyTime = GetModifyTime(dataItem.modifyTime);
1037     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, modifyTime);
1038     if (errCode != E_OK) {
1039         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
1040     }
1041     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
1042     if (errCode != E_OK) {
1043         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
1044         return errCode;
1045     }
1046     return E_OK;
1047 }
1048 
GetDataItem(int index,DownloadData & downloadData)1049 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
1050 {
1051     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
1052     auto &[errCode, dataItem] = res;
1053     if (errCode != E_OK) {
1054         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1055         return res;
1056     }
1057     std::string dev;
1058     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
1059     if (errCode != E_OK) {
1060         return res;
1061     }
1062     dev = DBCommon::TransferHashString(dev);
1063     auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
1064     if (!decodeDevice.empty()) {
1065         dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
1066     }
1067     if (dataItem.dev == dev) {
1068         dataItem.dev = "";
1069     }
1070     decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
1071     if (!decodeDevice.empty()) {
1072         dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
1073     }
1074     if (dataItem.origDev == dev) {
1075         dataItem.origDev = "";
1076     }
1077     int64_t timestamp = GetModifyTime(dataItem.modifyTime);
1078     dataItem.timestamp = static_cast<Timestamp>(timestamp + downloadData.timeOffset);
1079     dataItem.writeTimestamp = static_cast<Timestamp>(timestamp); // writeTimestamp is process conflict time
1080     return res;
1081 }
1082 
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,const std::string & sql)1083 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1084     const Timestamp &timestamp, const std::string &user, const std::string &sql)
1085 {
1086     std::pair<int, int64_t> res;
1087     auto &[errCode, count] = res;
1088     sqlite3_stmt *stmt = nullptr;
1089     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1090     if (errCode != E_OK) {
1091         LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1092         return res;
1093     }
1094     ResFinalizer finalizer([stmt]() {
1095         sqlite3_stmt *statement = stmt;
1096         int ret = E_OK;
1097         SQLiteUtils::ResetStatement(statement, true, ret);
1098         if (ret != E_OK) {
1099             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1100         }
1101     });
1102     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1103     if (errCode != E_OK) {
1104         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1105         return res;
1106     }
1107     errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1108     if (errCode != E_OK) {
1109         LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1110         return res;
1111     }
1112     errCode = SQLiteUtils::StepNext(stmt, isMemory);
1113     if (errCode == -E_FINISHED) {
1114         count = 0;
1115         return res;
1116     }
1117     count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1118     LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1119     return res;
1120 }
1121 
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1122 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1123     const Timestamp &timestamp, const std::string &user, bool forcePush)
1124 {
1125     std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1126     return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1127 }
1128 
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1129 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam &param,
1130     const std::vector<Timestamp> &timestampVec, const std::string &user, bool forcePush,
1131     QuerySyncObject &querySyncObject)
1132 {
1133     std::pair<int, int64_t> res = { E_OK, 0 };
1134     auto &[errCode, count] = res;
1135     if (timestampVec.size() != 3) { // 3 is the number of three mode.
1136         errCode = -E_INVALID_ARGS;
1137         return res;
1138     }
1139     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1140     SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1141     if (errCode != E_OK) {
1142         return res;
1143     }
1144     for (size_t i = 0; i < typeVec.size(); i++) {
1145         sqlite3_stmt *stmt = nullptr;
1146         errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1147         if (errCode != E_OK) {
1148             return res;
1149         }
1150         // count no use watermark
1151         auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1152         if (err != E_OK) {
1153             return { err, 0 };
1154         }
1155         count += cnt;
1156     }
1157     return res;
1158 }
1159 
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1160 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1161 {
1162     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1163         return E_OK;
1164     }
1165     bool isInsert = (opType == OpType::INSERT);
1166     CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1167     if (syncBatch.record.empty()) {
1168         LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1169         return E_OK;
1170     }
1171     syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1172     auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1173     auto &[errCode, dataItem] = res;
1174     sqlite3_stmt *dataStmt = nullptr;
1175     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1176     if (errCode != E_OK) {
1177         LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1178         return errCode;
1179     }
1180     ResFinalizer finalizerData([dataStmt]() {
1181         int ret = E_OK;
1182         sqlite3_stmt *statement = dataStmt;
1183         SQLiteUtils::ResetStatement(statement, true, ret);
1184         if (ret != E_OK) {
1185             LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1186         }
1187     });
1188     errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1189     if (errCode != E_OK) {
1190         return errCode;
1191     }
1192     errCode = SQLiteUtils::StepNext(dataStmt, false);
1193     if (errCode != -E_FINISHED) {
1194         LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1195         return errCode;
1196     }
1197     return E_OK;
1198 }
1199 
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1200 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1201     const std::string &user)
1202 {
1203     auto res = GetLocalCloudVersionInner(db, isMemory, user);
1204     if (res.first != E_OK) {
1205         LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1206     }
1207     return res;
1208 }
1209 
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1210 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1211     const std::string &user)
1212 {
1213     std::pair<int, CloudSyncData> res;
1214     auto &[errCode, syncData] = res;
1215     auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1216     sqlite3_stmt *stmt = nullptr;
1217     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1218     if (errCode != E_OK) {
1219         return res;
1220     }
1221     ResFinalizer finalizerData([stmt]() {
1222         int ret = E_OK;
1223         sqlite3_stmt *statement = stmt;
1224         SQLiteUtils::ResetStatement(statement, true, ret);
1225         if (ret != E_OK) {
1226             LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1227         }
1228     });
1229     std::string hashDev;
1230     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1231     if (errCode != E_OK) {
1232         return res;
1233     }
1234     std::string tempDev = DBCommon::TransferHashString(hashDev);
1235     hashDev = DBCommon::TransferStringToHex(tempDev);
1236     std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1237     Key keyVec;
1238     DBCommon::StringToVector(key, keyVec);
1239     errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1240     if (errCode != E_OK) {
1241         return res;
1242     }
1243     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1244     if (errCode != E_OK) {
1245         return res;
1246     }
1247     errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1248     if (errCode == -E_NOT_FOUND) {
1249         InitDefaultCloudVersionRecord(key, tempDev, syncData);
1250         errCode = E_OK;
1251     }
1252     return res;
1253 }
1254 
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1255 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1256 {
1257     int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1258     if (errCode == -E_FINISHED) {
1259         return -E_NOT_FOUND;
1260     }
1261     if (errCode != E_OK) {
1262         LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1263         return errCode;
1264     }
1265     CloudSyncConfig config;
1266     config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1267     config.maxUploadCount = 1;
1268     CloudUploadRecorder recorder; // ignore last record
1269     UploadDetail detail;
1270     errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1271     return errCode;
1272 }
1273 
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1274 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1275     CloudSyncData &syncData)
1276 {
1277     LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1278     VBucket defaultRecord;
1279     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1280     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1281     auto encodeDev = DBBase64Utils::Encode(dev);
1282     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1283     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1284     syncData.insData.record.push_back(std::move(defaultRecord));
1285     VBucket defaultExtend;
1286     defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1287     syncData.insData.extend.push_back(std::move(defaultExtend));
1288     syncData.insData.assets.emplace_back();
1289     Bytes bytesHashKey;
1290     DBCommon::StringToVector(key, bytesHashKey);
1291     syncData.insData.hashKey.push_back(bytesHashKey);
1292 }
1293 
BindVersionStmt(const std::string & device,sqlite3_stmt * dataStmt)1294 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, sqlite3_stmt *dataStmt)
1295 {
1296     std::string hashDevice;
1297     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1298     if (errCode != E_OK) {
1299         return errCode;
1300     }
1301     Bytes bytes;
1302     if (device == hashDevice) {
1303         DBCommon::StringToVector("", bytes);
1304     } else {
1305         hashDevice = DBCommon::TransferHashString(device);
1306         DBCommon::StringToVector(hashDevice, bytes);
1307     }
1308     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1309     if (errCode != E_OK) {
1310         LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1311     }
1312     return errCode;
1313 }
1314 
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & device,std::vector<VBucket> & dataVector)1315 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &device,
1316     std::vector<VBucket> &dataVector)
1317 {
1318     sqlite3_stmt *dataStmt = nullptr;
1319     bool isDeviceEmpty = device.empty();
1320     std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1321     int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1322     if (errCode != E_OK) {
1323         LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1324         return errCode;
1325     }
1326     ResFinalizer finalizerData([dataStmt]() {
1327         int ret = E_OK;
1328         sqlite3_stmt *statement = dataStmt;
1329         SQLiteUtils::ResetStatement(statement, true, ret);
1330         if (ret != E_OK) {
1331             LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1332         }
1333     });
1334     if (!isDeviceEmpty) {
1335         errCode = BindVersionStmt(device, dataStmt);
1336         if (errCode != E_OK) {
1337             return errCode;
1338         }
1339     }
1340     uint32_t totalSize = 0;
1341     do {
1342         errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1343         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1344             errCode = E_OK;
1345             break;
1346         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1347             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1348             break;
1349         }
1350         VBucket data;
1351         errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1352         dataVector.push_back(data);
1353     } while (errCode == E_OK);
1354     return errCode;
1355 }
1356 
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1357 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1358 {
1359     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1360     if (errCode != E_OK) {
1361         return errCode;
1362     }
1363     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1364     if (errCode != E_OK) {
1365         return errCode;
1366     }
1367     return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1368 }
1369 
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1370 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1371 {
1372     sqlite3_stmt *stmt = nullptr;
1373     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1374     if (errCode != E_OK) {
1375         LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1376         return errCode;
1377     }
1378     ResFinalizer finalizerData([stmt]() {
1379         int ret = E_OK;
1380         sqlite3_stmt *statement = stmt;
1381         SQLiteUtils::ResetStatement(statement, true, ret);
1382         if (ret != E_OK) {
1383             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1384         }
1385     });
1386     uint32_t totalSize = 0;
1387     do {
1388         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1389         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1390             errCode = E_OK;
1391             break;
1392         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1393             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1394             break;
1395         }
1396         VBucket key;
1397         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1398         if (errCode != E_OK) {
1399             return errCode;
1400         }
1401         data.push_back(key);
1402     } while (errCode == E_OK);
1403     return errCode;
1404 }
1405 
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1406 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1407     std::vector<VBucket> &users)
1408 {
1409     sqlite3_stmt *stmt = nullptr;
1410     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1411     if (errCode != E_OK) {
1412         LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1413         return errCode;
1414     }
1415     ResFinalizer finalizerData([stmt]() {
1416         int ret = E_OK;
1417         sqlite3_stmt *statement = stmt;
1418         SQLiteUtils::ResetStatement(statement, true, ret);
1419         if (ret != E_OK) {
1420             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1421         }
1422     });
1423     uint32_t totalSize = 0;
1424     do {
1425         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1426         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1427             errCode = E_OK;
1428             break;
1429         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1430             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1431             break;
1432         }
1433         VBucket key;
1434         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1435             key, totalSize);
1436         if (errCode != E_OK) {
1437             return errCode;
1438         }
1439         users.push_back(key);
1440     } while (errCode == E_OK);
1441     return errCode;
1442 }
1443 
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1444 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1445     std::vector<VBucket> &users)
1446 {
1447     int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1448     if (errCode != E_OK) {
1449         LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1450         return errCode;
1451     }
1452     errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1453     if (errCode != E_OK) {
1454         LOGE("[GetWaitCompensatedSyncData] Get wait compensated sync data failed! errCode=%d", errCode);
1455     }
1456     return errCode;
1457 }
1458 
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,const QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1459 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1460     const QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1461 {
1462     int errCode = E_OK;
1463     QuerySyncObject query = querySyncObject;
1464     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1465     if (errCode != E_OK) {
1466         return errCode;
1467     }
1468     sqlite3_stmt *stmt = nullptr;
1469     errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1470     if (errCode != E_OK) {
1471         return errCode;
1472     }
1473     do {
1474         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1475         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1476             errCode = E_OK;
1477             break;
1478         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1479             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1480             break;
1481         }
1482         std::string gid;
1483         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1484         cloudGid.push_back(gid);
1485     } while (errCode == E_OK);
1486     int ret = E_OK;
1487     SQLiteUtils::ResetStatement(stmt, true, ret);
1488     return errCode == E_OK ? ret : errCode;
1489 }
1490 
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1491 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1492     const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1493 {
1494     DataItem wItem = dataItem;
1495     if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1496         wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1497     }
1498     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1499         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1500         wItem.gid = {};
1501         wItem.version = {};
1502     }
1503     int errCode = E_OK;
1504     if (type == CloudWaterType::DELETE) {
1505         if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1506             errCode = BindUpdateLogStmt(logStmt, user, wItem);
1507         }
1508     } else {
1509         errCode = BindInsertLogStmt(logStmt, user, wItem, false);
1510     }
1511     if (errCode != E_OK) {
1512         LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1513     }
1514     return errCode;
1515 }
1516 
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1517 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam &param, const CloudSyncBatch &data,
1518     const std::string &user, size_t dataIndex)
1519 {
1520     if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1521         LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1522         return;
1523     }
1524     if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1525         return;
1526     }
1527     if (CheckDataChanged(param, data, dataIndex)) {
1528         LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1529         return;
1530     }
1531     MarkUploadSuccessInner(param, data, user, dataIndex);
1532 }
1533 
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1534 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam &param,
1535     const CloudSyncBatch &data, size_t dataIndex)
1536 {
1537     sqlite3_stmt *checkStmt = nullptr;
1538     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1539     if (errCode != E_OK) {
1540         LOGW("[SqliteCloudKvExecutorUtils] check data change get stmt failed %d", errCode);
1541         return true;
1542     }
1543     ResFinalizer finalizerData([checkStmt]() {
1544         sqlite3_stmt *statement = checkStmt;
1545         int ret = E_OK;
1546         SQLiteUtils::ResetStatement(statement, true, ret);
1547         if (ret != E_OK) {
1548             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1549         }
1550     });
1551     int index = 1;
1552     if (data.timestamp.size() > 0) {
1553         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1554         if (errCode != E_OK) {
1555             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1556             return true;
1557         }
1558     }
1559     if (data.hashKey.size() > 0) {
1560         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1561         if (errCode != E_OK) {
1562             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1563             return true;
1564         }
1565     }
1566     errCode = SQLiteUtils::StepNext(checkStmt);
1567     if (errCode != E_OK) {
1568         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1569         return true;
1570     }
1571     return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1572 }
1573 
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1574 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam &param,
1575     const CloudSyncBatch &data, size_t dataIndex)
1576 {
1577     sqlite3_stmt *checkStmt = nullptr;
1578     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1579     if (errCode != E_OK) {
1580         LOGW("[SqliteCloudKvExecutorUtils] check data delete get stmt failed %d", errCode);
1581         return true;
1582     }
1583     ResFinalizer finalizerData([checkStmt]() {
1584         sqlite3_stmt *statement = checkStmt;
1585         int ret = E_OK;
1586         SQLiteUtils::ResetStatement(statement, true, ret);
1587         if (ret != E_OK) {
1588             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1589         }
1590     });
1591     int index = 1;
1592     if (data.timestamp.size() > 0) {
1593         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1594         if (errCode != E_OK) {
1595             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1596             return true;
1597         }
1598     }
1599     if (data.hashKey.size() > 0) {
1600         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1601         if (errCode != E_OK) {
1602             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1603             return true;
1604         }
1605     }
1606     errCode = SQLiteUtils::StepNext(checkStmt);
1607     if (errCode != E_OK) {
1608         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1609         return true;
1610     }
1611     return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1612 }
1613 
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1614 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam &param,
1615     const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1616 {
1617     sqlite3_stmt *logStmt = nullptr;
1618     int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1619     if (errCode != E_OK) {
1620         LOGW("[SqliteCloudKvExecutorUtils] mark upload success inner get stmt failed %d", errCode);
1621         return;
1622     }
1623     ResFinalizer finalizerData([logStmt]() {
1624         sqlite3_stmt *statement = logStmt;
1625         int ret = E_OK;
1626         SQLiteUtils::ResetStatement(statement, true, ret);
1627         if (ret != E_OK) {
1628             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1629         }
1630     });
1631     int index = 1;
1632     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1633     if (errCode != E_OK) {
1634         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1635         return;
1636     }
1637     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1638     if (errCode != E_OK) {
1639         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1640         return;
1641     }
1642     errCode = SQLiteUtils::StepNext(logStmt);
1643     if (errCode != E_OK && errCode != -E_FINISHED) {
1644         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1645         return;
1646     }
1647 }
1648 }
1649