• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "storage_proxy.h"
31 #include "store_types.h"
32 #include "strategy_factory.h"
33 #include "version.h"
34 
35 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)36 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
37 {
38     Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
39     waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
40     RecordWaterMark(taskId, 0u);
41 }
42 
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)43 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
44 {
45     std::lock_guard<std::mutex> autoLock(dataLock_);
46     std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
47     cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
48 }
49 
ReloadUploadInfoIfNeed(TaskId taskId,const UploadParam & param,InnerProcessInfo & info)50 void CloudSyncer::ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam &param, InnerProcessInfo &info)
51 {
52     info.upLoadInfo.total = static_cast<uint32_t>(param.count);
53     {
54         std::lock_guard<std::mutex> autoLock(dataLock_);
55         if (!cloudTaskInfos_[taskId].resume) {
56             return;
57         }
58     }
59     uint32_t lastSuccessCount = GetLastUploadSuccessCount(info.tableName);
60     if (lastSuccessCount == 0) {
61         return;
62     }
63     info.upLoadInfo.total += lastSuccessCount;
64     info.upLoadInfo.successCount += lastSuccessCount;
65     LOGD("[CloudSyncer] resume upload, last success count %" PRIu32, lastSuccessCount);
66 }
67 
GetLastUploadSuccessCount(const std::string & tableName)68 uint32_t CloudSyncer::GetLastUploadSuccessCount(const std::string &tableName)
69 {
70     std::lock_guard<std::mutex> autoLock(dataLock_);
71     return currentContext_.notifier->GetLastUploadSuccessCount(tableName);
72 }
73 
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)74 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
75     VBucket &extend)
76 {
77     extend = {
78         {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
79     };
80 
81     QuerySyncObject obj = GetQuerySyncObject(tableName);
82     if (obj.IsContainQueryNodes()) {
83         int errCode = GetCloudGid(taskId, tableName, obj);
84         if (errCode != E_OK) {
85             LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
86             return errCode;
87         }
88         Bytes bytes;
89         bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
90         Parcel parcel(bytes.data(), bytes.size());
91         errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
92         if (errCode != E_OK) {
93             LOGE("[CloudSyncer] Query serialize failed %d", errCode);
94             return errCode;
95         }
96         extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
97         extend[CloudDbConstant::QUERY_FIELD] = bytes;
98     } else {
99         extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
100     }
101     return E_OK;
102 }
103 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)104 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
105 {
106     std::vector<std::string> cloudGid;
107     bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
108     int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
109     if (errCode != E_OK) {
110         LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
111     } else if (!cloudGid.empty()) {
112         obj.SetCloudGid(cloudGid);
113     }
114     LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
115     return errCode;
116 }
117 
GetQuerySyncObject(const std::string & tableName)118 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
119 {
120     std::lock_guard<std::mutex> autoLock(dataLock_);
121     for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
122         if (item.GetTableName() == tableName) {
123             return item;
124         }
125     }
126     LOGW("[CloudSyncer] not found query in cache");
127     QuerySyncObject querySyncObject;
128     querySyncObject.SetTableName(tableName);
129     return querySyncObject;
130 }
131 
NotifyUploadFailed(int errCode,InnerProcessInfo & info)132 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
133 {
134     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
135         LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
136         return;
137     } else {
138         LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
139     }
140     info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
141     info.tableStatus = ProcessStatus::FINISHED;
142     {
143         std::lock_guard<std::mutex> autoLock(dataLock_);
144         currentContext_.notifier->UpdateProcess(info);
145     }
146 }
147 
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)148 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
149 {
150     int errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
151         uploadData.insData.extend, insertInfo);
152     innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
153     innerProcessInfo.upLoadInfo.insertCount += insertInfo.successCount;
154     innerProcessInfo.upLoadInfo.total -= insertInfo.total - insertInfo.successCount - insertInfo.failCount;
155     if (errCode != E_OK) {
156         LOGE("[CloudSyncer][BatchInsert] BatchInsert with error, ret is %d.", errCode);
157     }
158     if (uploadData.isCloudVersionRecord) {
159         return errCode;
160     }
161     bool isSharedTable = false;
162     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
163     if (ret != E_OK) {
164         LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
165         return ret;
166     }
167     if (!isSharedTable) {
168         ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.insData, errCode, CloudWaterType::INSERT);
169         if (ret != errCode) {
170             LOGW("[CloudSyncer][BatchInsert] FillAssetIdToAssets with error, ret is %d.", ret);
171         }
172     }
173     if (errCode != E_OK) {
174         storageProxy_->FillCloudGidIfSuccess(OpType::INSERT, uploadData);
175         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.insData.extend);
176         if (isSkip) {
177             LOGI("[CloudSyncer][BatchInsert] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
178             return E_OK;
179         } else {
180             LOGE("[CloudSyncer][BatchInsert] errCode: %d, can not skip assets missing record.", errCode);
181             return errCode;
182         }
183     }
184     // we need to fill back gid after insert data to cloud.
185     int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::INSERT, uploadData);
186     if ((errorCode != E_OK) || (ret != E_OK)) {
187         LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", errorCode);
188         return ret == E_OK ? errorCode : ret;
189     }
190     return E_OK;
191 }
192 
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)193 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
194 {
195     int errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
196         uploadData.updData.extend, updateInfo);
197     innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
198     innerProcessInfo.upLoadInfo.updateCount += updateInfo.successCount;
199     innerProcessInfo.upLoadInfo.total -= updateInfo.total - updateInfo.successCount - updateInfo.failCount;
200     if (errCode != E_OK) {
201         LOGE("[CloudSyncer][BatchUpdate] BatchUpdate with error, ret is %d.", errCode);
202     }
203     if (uploadData.isCloudVersionRecord) {
204         return errCode;
205     }
206     bool isSharedTable = false;
207     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
208     if (ret != E_OK) {
209         LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
210         return ret;
211     }
212     if (!isSharedTable) {
213         ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.updData, errCode, CloudWaterType::UPDATE);
214         if (ret != E_OK) {
215             LOGW("[CloudSyncer][BatchUpdate] FillAssetIdToAssets with error, ret is %d.", ret);
216         }
217     }
218     if (errCode != E_OK) {
219         storageProxy_->FillCloudGidIfSuccess(OpType::UPDATE, uploadData);
220         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.updData.extend);
221         if (isSkip) {
222             LOGI("[CloudSyncer][BatchUpdate] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
223             return E_OK;
224         } else {
225             LOGE("[CloudSyncer][BatchUpdate] errCode: %d, can not skip assets missing record.", errCode);
226             return errCode;
227         }
228     }
229     int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::UPDATE, uploadData);
230     if ((errorCode != E_OK) || (ret != E_OK)) {
231         LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errorCode);
232         return ret == E_OK ? errorCode : ret;
233     }
234     return E_OK;
235 }
236 
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)237 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
238     std::map<std::string, Assets> &downloadAssets)
239 {
240     bool isSharedTable = false;
241     int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
242     if (errCode != E_OK) {
243         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
244         return errCode;
245     }
246     int transactionCode = E_OK;
247     // shared table don't download, so just begin transaction once
248     if (isSharedTable) {
249         transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
250     }
251     if (transactionCode != E_OK) {
252         LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
253         return transactionCode;
254     }
255     errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
256     if (isSharedTable) {
257         transactionCode = storageProxy_->Commit();
258         if (transactionCode != E_OK) {
259             LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
260         }
261     }
262     return (errCode == E_OK) ? transactionCode : errCode;
263 }
264 
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)265 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
266     const DownloadItem &downloadItem, VBucket &dbAssets)
267 {
268     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
269     auto &errCode = res.first;
270     if (!isSharedTable) {
271         errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
272     }
273     if (errCode != E_OK) {
274         LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
275         return res;
276     }
277     res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, downloadItem.gid,
278         downloadItem.hashKey, dbAssets);
279     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
280         if (errCode != -E_CLOUD_GID_MISMATCH) {
281             LOGE("[CloudSyncer] get assets from db failed %d", errCode);
282         }
283         if (!isSharedTable) {
284             (void)storageProxy_->Rollback();
285         }
286         return res;
287     }
288     if (!isSharedTable) {
289         errCode = storageProxy_->Commit();
290     }
291     if (errCode != E_OK) {
292         LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
293     }
294     return res;
295 }
296 
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)297 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
298     std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
299     std::map<std::string, Assets> &tmpAssetsToDelete)
300 {
301     std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
302     for (auto &[col, assets] : tmpAssetsToDownload) {
303         int i = 0;
304         for (auto &asset : assets) {
305             asset.flag = tmpFlags[col][i++];
306             if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
307                 continue;
308             }
309             if (downloadCode == E_OK) {
310                 asset.status = NORMAL;
311             } else {
312                 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
313             }
314         }
315     }
316     for (auto &[col, assets] : tmpAssetsToDelete) {
317         for (auto &asset : assets) {
318             asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
319             if (deleteCode == E_OK) {
320                 asset.status = NORMAL;
321             } else {
322                 asset.status = ABNORMAL;
323             }
324             downloadAssets[col].push_back(asset);
325         }
326     }
327     return downloadAssets;
328 }
329 
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)330 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
331     const DownloadItem &downloadItem, VBucket &dbAssets)
332 {
333     auto [tmpCode, status] = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
334     if (tmpCode == -E_CLOUD_GID_MISMATCH) {
335         LOGW("[CloudSyncer] skip download asset because gid mismatch");
336         errCode = E_OK;
337         return true;
338     }
339     if (CloudStorageUtils::IsDataLocked(status)) {
340         LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
341         errCode = E_OK;
342         return true;
343     }
344     if (tmpCode != E_OK) {
345         errCode = (errCode != E_OK) ? errCode : tmpCode;
346         return true;
347     }
348     return false;
349 }
350 
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)351 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
352 {
353     if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
354         downloadItem.recordConflict = true;
355         errCode = E_OK;
356         return false;
357     }
358     errCode = (errCode != E_OK) ? errCode : deleteCode;
359     errCode = (errCode != E_OK) ? errCode : downloadCode;
360     if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
361         return false;
362     }
363     return true;
364 }
365 
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)366 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
367     DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
368 {
369     int errCode = E_OK;
370     std::map<std::string, Assets> tmpAssetsToDownload;
371     std::map<std::string, Assets> tmpAssetsToDelete;
372     std::map<std::string, std::vector<uint32_t>> tmpFlags;
373     for (auto &[col, assets] : downloadAssets) {
374         for (auto &asset : assets) {
375             VBucket dbAssets;
376             if (IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
377                 break;
378             }
379             if (!isSharedTable && asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
380                 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
381                 tmpAssetsToDelete[col].push_back(asset);
382             } else if (!isSharedTable && AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
383                 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
384                 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
385                     static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
386                 tmpAssetsToDownload[col].push_back(asset);
387                 tmpFlags[col].push_back(asset.flag);
388             } else {
389                 LOGD("[CloudSyncer] skip download asset...");
390             }
391         }
392     }
393     auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
394         tmpAssetsToDelete);
395     auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
396     if (!CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
397         return errCode;
398     }
399 
400     // copy asset back
401     downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
402         tmpAssetsToDelete);
403     return errCode;
404 }
405 
CommitDownloadAssets(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)406 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName,
407     DownloadCommitList &commitList, uint32_t &successCount)
408 {
409     int errCode = storageProxy_->SetLogTriggerStatus(false);
410     if (errCode != E_OK) {
411         return errCode;
412     }
413     for (auto &item : commitList) {
414         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
415         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
416         std::map<std::string, Assets> assetsMap = std::get<1>(item);
417         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
418         VBucket normalAssets;
419         VBucket failedAssets;
420         normalAssets[CloudDbConstant::GID_FIELD] = gid;
421         failedAssets[CloudDbConstant::GID_FIELD] = gid;
422         VBucket &assets = setAllNormal ? normalAssets : failedAssets;
423         for (auto &[key, asset] : assetsMap) {
424             assets[key] = std::move(asset);
425         }
426         if (!downloadItem.recordConflict) {
427             errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
428             if (errCode != E_OK) {
429                 break;
430             }
431         }
432         LogInfo logInfo;
433         logInfo.cloudGid = gid;
434         // download must contain gid, just set the default value here.
435         logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
436         logInfo.hashKey = downloadItem.hashKey;
437         logInfo.timestamp = downloadItem.timestamp;
438         // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
439         if (failedAssets.size() > 1) {
440             logInfo.timestamp = 0u;
441         }
442 
443         errCode = storageProxy_->UpdateRecordFlag(tableName, downloadItem.recordConflict, logInfo);
444         if (errCode != E_OK) {
445             break;
446         }
447         successCount++;
448     }
449     int ret = storageProxy_->SetLogTriggerStatus(true);
450     return errCode == E_OK ? ret : errCode;
451 }
452 
GenerateCompensatedSync(CloudTaskInfo & taskInfo)453 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
454 {
455     std::vector<QuerySyncObject> syncQuery;
456     int errCode = storageProxy_->GetCompensatedSyncQuery(syncQuery);
457     if (errCode != E_OK) {
458         LOGW("[CloudSyncer] Generate compensated sync failed by get query! errCode = %d", errCode);
459         return;
460     }
461     if (syncQuery.empty()) {
462         LOGD("[CloudSyncer] Not need generate compensated sync");
463         return;
464     }
465     for (const auto &query : syncQuery) {
466         taskInfo.table.push_back(query.GetRelationTableName());
467         taskInfo.queryList.push_back(query);
468     }
469     Sync(taskInfo);
470     LOGI("[CloudSyncer] Generate compensated sync finished");
471 }
472 
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)473 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
474 {
475     if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
476         return;
477     }
478     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
479     if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
480         return;
481     }
482     if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
483         return;
484     }
485     info.tableStatus = ProcessStatus::FINISHED;
486     info.upLoadInfo.batchIndex++;
487     NotifyInBatchUpload(uploadParam, info, true);
488 }
489 
SaveCursorIfNeed(const std::string & tableName)490 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
491 {
492     std::string cursor = "";
493     int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
494     if (errCode != E_OK) {
495         LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
496         return errCode;
497     }
498     if (!cursor.empty()) {
499         return E_OK;
500     }
501     auto res = cloudDB_.GetEmptyCursor(tableName);
502     if (res.first != E_OK) {
503         LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
504         return res.first;
505     }
506     if (res.second.empty()) {
507         LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
508         return -E_CLOUD_ERROR;
509     }
510     errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
511     if (errCode != E_OK) {
512         LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
513     }
514     return errCode;
515 }
516 
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)517 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
518 {
519     int errCode = SaveCursorIfNeed(table);
520     if (errCode != E_OK) {
521         return errCode;
522     }
523     bool isShared = false;
524     errCode = storageProxy_->IsSharedTable(table, isShared);
525     if (errCode != E_OK) {
526         LOGE("[CloudSyncer] check shared table failed %d", errCode);
527         return errCode;
528     }
529     // shared table not allow logic delete
530     storageProxy_->SetCloudTaskConfig({ !isShared });
531     errCode = CheckTaskIdValid(taskInfo.taskId);
532     if (errCode != E_OK) {
533         LOGW("[CloudSyncer] task is invalid, abort sync");
534         return errCode;
535     }
536     errCode = DoDownload(taskInfo.taskId, isFirstDownload);
537     if (errCode != E_OK) {
538         LOGE("[CloudSyncer] download failed %d", errCode);
539     }
540     return errCode;
541 }
542 
IsClosed() const543 bool CloudSyncer::IsClosed() const
544 {
545     return closed_ || IsKilled();
546 }
547 
UpdateFlagForSavedRecord(const SyncParam & param)548 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam &param)
549 {
550     DownloadList downloadList;
551     {
552         std::lock_guard<std::mutex> autoLock(dataLock_);
553         downloadList = currentContext_.assetDownloadList;
554     }
555     std::set<std::string> gidFilters;
556     for (const auto &tuple: downloadList) {
557         gidFilters.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
558     }
559     return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, gidFilters);
560 }
561 
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)562 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
563 {
564     int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
565         uploadData.delData.extend, deleteInfo);
566     innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
567     innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
568     if (errCode != E_OK) {
569         LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
570         storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
571         return errCode;
572     }
573     errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
574     if (errCode != E_OK) {
575         LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
576     }
577     return errCode;
578 }
579 
IsCompensatedTask(TaskId taskId)580 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
581 {
582     std::lock_guard<std::mutex> autoLock(dataLock_);
583     return cloudTaskInfos_[taskId].compensatedTask;
584 }
585 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)586 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
587 {
588     return cloudDB_.SetCloudDB(cloudDBs);
589 }
590 
CleanAllWaterMark()591 void CloudSyncer::CleanAllWaterMark()
592 {
593     storageProxy_->CleanAllWaterMark();
594 }
595 
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)596 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
597 {
598     downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
599     downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
600     downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
601     downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
602     downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
603     downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
604     downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
605 }
606 
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)607 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
608     const bool isFirstDownload)
609 {
610     bool isNeedNotify = false;
611     {
612         std::lock_guard<std::mutex> autoLock(dataLock_);
613         // only when the first download and the task no need upload actually, notify the process, otherwise,
614         // the process will notify in the upload procedure, which can guarantee the notify order of the tables
615         isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
616     }
617     if (!isNeedNotify) {
618         return;
619     }
620     for (size_t i = 0; i < needNotifyTables.size(); ++i) {
621         UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
622     }
623 }
624 
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)625 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
626 {
627     std::string tableName;
628     int ret = GetCurrentTableName(tableName);
629     if (ret != E_OK) {
630         LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
631         return ret;
632     }
633 
634     ret = storageProxy_->StartTransaction();
635     if (ret != E_OK) {
636         LOGE("[CloudSyncer] start transaction failed before getting upload count.");
637         return ret;
638     }
639 
640     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
641         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
642     if (ret != E_OK) {
643         // GetUploadCount will return E_OK when upload count is zero.
644         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
645     }
646     // No need Rollback when GetUploadCount failed
647     storageProxy_->Commit();
648     return ret;
649 }
650 
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)651 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
652     bool needNotify)
653 {
654     LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
655     InnerProcessInfo innerProcessInfo;
656     innerProcessInfo.tableName = tableName;
657     innerProcessInfo.upLoadInfo.total = 0;  // count is zero
658     innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
659     {
660         std::lock_guard<std::mutex> autoLock(dataLock_);
661         if (!needNotify) {
662             currentContext_.notifier->UpdateProcess(innerProcessInfo);
663         } else {
664             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
665         }
666     }
667 }
668 
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)669 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
670 {
671     std::vector<std::string> needNotifyTables;
672     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
673         std::string table;
674         {
675             std::lock_guard<std::mutex> autoLock(dataLock_);
676             if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
677                 taskInfo.table[i])) {
678                 continue;
679             }
680             LOGD("[CloudSyncer] try download table, index: %zu", i);
681             currentContext_.tableName = taskInfo.table[i];
682             table = currentContext_.tableName;
683         }
684         int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
685         if (errCode != E_OK) {
686             return errCode;
687         }
688         MarkDownloadFinishIfNeed(table);
689         // needUpload indicate that the syncMode need push
690         if (needUpload) {
691             int64_t count = 0;
692             errCode = GetUploadCountByTable(taskInfo.taskId, count);
693             if (errCode != E_OK) {
694                 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
695                 return errCode;
696             }
697             // count > 0 means current table need upload actually
698             if (count > 0) {
699                 {
700                     std::lock_guard<std::mutex> autoLock(dataLock_);
701                     currentContext_.isNeedUpload = true;
702                 }
703                 continue;
704             }
705             needNotifyTables.emplace_back(table);
706         }
707         errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
708         if (errCode != E_OK) {
709             LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
710             return errCode;
711         }
712     }
713     DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
714     return E_OK;
715 }
716 
IsNeedGetLocalWater(TaskId taskId)717 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
718 {
719     return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
720         !IsCompensatedTask(taskId);
721 }
722 
TryToAddSyncTask(CloudTaskInfo && taskInfo)723 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
724 {
725     if (closed_) {
726         LOGW("[CloudSyncer] syncer is closed, should not sync now");
727         return -E_DB_CLOSED;
728     }
729     std::shared_ptr<DataBaseSchema> cloudSchema;
730     int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
731     if (errCode != E_OK) {
732         LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
733         return errCode;
734     }
735     std::lock_guard<std::mutex> autoLock(dataLock_);
736     errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
737     if (errCode != E_OK) {
738         return errCode;
739     }
740     errCode = GenerateTaskIdIfNeed(taskInfo);
741     if (errCode != E_OK) {
742         return errCode;
743     }
744     cloudTaskInfos_[lastTaskId_] = std::move(taskInfo);
745     if (cloudTaskInfos_[lastTaskId_].priorityTask) {
746         priorityTaskQueue_.push_back(lastTaskId_);
747         LOGI("[CloudSyncer] Add priority task ok, storeId %.3s, taskId %" PRIu64,
748             cloudTaskInfos_[lastTaskId_].storeId.c_str(), cloudTaskInfos_[lastTaskId_].taskId);
749         return E_OK;
750     }
751     if (!MergeTaskInfo(cloudSchema, lastTaskId_)) {
752         taskQueue_.push_back(lastTaskId_);
753         LOGI("[CloudSyncer] Add task ok, storeId %.3s, taskId %" PRIu64,
754             cloudTaskInfos_[lastTaskId_].storeId.c_str(), cloudTaskInfos_[lastTaskId_].taskId);
755     }
756     return E_OK;
757 }
758 
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)759 bool CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
760 {
761     if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
762         return false;
763     }
764     bool isMerge = false;
765     bool mergeHappen = false;
766     TaskId checkTaskId = taskId;
767     do {
768         std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
769         mergeHappen = mergeHappen || isMerge;
770     } while (isMerge);
771     return mergeHappen;
772 }
773 
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)774 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
775 {
776     std::pair<bool, TaskId> res;
777     auto &[merge, nextTryTask] = res;
778     TaskId beMergeTask = INVALID_TASK_ID;
779     TaskId runningTask = currentContext_.currentTaskId;
780     for (const auto &taskId : taskQueue_) {
781         if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
782             continue;
783         }
784         if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
785             continue;
786         }
787         if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
788             beMergeTask = taskId;
789             nextTryTask = tryTaskId;
790             merge = true;
791             break;
792         }
793         if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
794             beMergeTask = tryTaskId;
795             nextTryTask = taskId;
796             merge = true;
797             break;
798         }
799     }
800     if (!merge) { // LCOV_EXCL_BR_LINE
801         return res;
802     }
803     if (beMergeTask < nextTryTask) { // LCOV_EXCL_BR_LINE
804         std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
805     }
806     AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
807     auto processNotifier = std::make_shared<ProcessNotifier>(this);
808     processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
809         cloudTaskInfos_[beMergeTask].users);
810     cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
811     cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
812     processNotifier->SetAllTableFinish();
813     processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
814     cloudTaskInfos_.erase(beMergeTask);
815     taskQueue_.remove(beMergeTask);
816     LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
817     return res;
818 }
819 
IsTaskCanMerge(const CloudTaskInfo & taskInfo)820 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
821 {
822     return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
823         taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
824 }
825 
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)826 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
827 {
828     const auto &taskInfo = cloudTaskInfos_[taskId];
829     const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
830     return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
831         taskInfo.devices == tryMergeTaskInfo.devices;
832 }
833 
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)834 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
835 {
836     const auto &source = cloudTaskInfos_[sourceId];
837     const auto &target = cloudTaskInfos_[targetId];
838     bool isMerge = true;
839     for (const auto &table : source.table) {
840         if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
841             isMerge = false;
842             break;
843         }
844     }
845     return isMerge;
846 }
847 
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)848 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
849     CloudTaskInfo &taskInfo)
850 {
851     std::vector<std::string> tmpTables = taskInfo.table;
852     taskInfo.table.clear();
853     taskInfo.queryList.clear();
854     for (const auto &table : cloudSchema->tables) {
855         if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
856             taskInfo.table.push_back(table.name);
857             QuerySyncObject querySyncObject;
858             querySyncObject.SetTableName(table.name);
859             taskInfo.queryList.push_back(querySyncObject);
860         }
861     }
862 }
863 
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)864 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
865 {
866     cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
867     cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
868     std::set<std::string> users;
869     users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
870     users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
871     cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
872     return {target, source};
873 }
874 
IsQueryListEmpty(TaskId taskId)875 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
876 {
877     std::lock_guard<std::mutex> autoLock(dataLock_);
878     return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
879         [](const auto &item) {
880             return item.IsContainQueryNodes();
881     });
882 }
883 
IsNeedLock(const UploadParam & param)884 bool CloudSyncer::IsNeedLock(const UploadParam &param)
885 {
886     return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
887 }
888 
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)889 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
890 {
891     std::pair<int, Timestamp> res = { E_OK, 0u };
892     if (IsNeedGetLocalWater(uploadParam.taskId)) {
893         res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
894     }
895     uploadParam.localMark = res.second;
896     return res;
897 }
898 
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken)899 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
900     CloudSyncData &uploadData, ContinueToken &continueStmtToken)
901 {
902     int ret = E_OK;
903     uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
904     bool isLocked = false;
905     while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
906         ret = PreProcessBatchUpload(uploadParam, info, uploadData);
907         if (ret != E_OK) {
908             break;
909         }
910         info.upLoadInfo.batchIndex = ++batchIndex;
911         if (IsNeedLock(uploadParam) && !isLocked) {
912             ret = LockCloudIfNeed(uploadParam.taskId);
913             if (ret != E_OK) {
914                 break;
915             }
916             isLocked = true;
917         }
918         ret = DoBatchUpload(uploadData, uploadParam, info);
919         if (ret != E_OK) {
920             break;
921         }
922         uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
923         if (continueStmtToken == nullptr) {
924             break;
925         }
926         SetUploadDataFlag(uploadParam.taskId, uploadData);
927         RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
928         ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
929         if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
930             LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
931             break;
932         }
933         ChkIgnoredProcess(info, uploadData, uploadParam);
934     }
935     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
936         NotifyUploadFailed(ret, info);
937     }
938     if (isLocked && IsNeedLock(uploadParam)) {
939         UnlockIfNeed();
940     }
941     return ret;
942 }
943 
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)944 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
945 {
946     InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
947     static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
948     int errCode = E_OK;
949     for (const auto &waterType: waterTypes) {
950         uploadParam.mode = waterType;
951         errCode = DoUploadByMode(tableName, uploadParam, info);
952         if (errCode != E_OK) {
953             break;
954         }
955     }
956     int ret = E_OK;
957     if (info.upLoadInfo.successCount > 0) {
958         ret = UploadVersionRecordIfNeed(uploadParam);
959     }
960     return errCode != E_OK ? errCode : ret;
961 }
962 
UploadVersionRecordIfNeed(const UploadParam & uploadParam)963 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
964 {
965     if (uploadParam.count == 0) {
966         // no record upload
967         return E_OK;
968     }
969     if (!cloudDB_.IsExistCloudVersionCallback()) {
970         return E_OK;
971     }
972     auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
973     if (errCode != E_OK) {
974         return errCode;
975     }
976     bool isInsert = !uploadData.insData.record.empty();
977     CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
978     if (batchData.record.empty()) {
979         LOGE("[CloudSyncer] Get invalid cloud version record");
980         return -E_INTERNAL_ERROR;
981     }
982     std::string oriVersion;
983     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
984     std::string newVersion;
985     std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
986     if (errCode != E_OK) {
987         LOGE("[CloudSyncer] Get cloud version error %d", errCode);
988         return errCode;
989     }
990     batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
991     InnerProcessInfo processInfo;
992     Info info;
993     std::vector<VBucket> copyRecord = batchData.record;
994     WaterMark waterMark;
995     CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
996     errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
997     batchData.record = copyRecord;
998     CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
999     auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1000     return errCode != E_OK ? errCode : ret;
1001 }
1002 
TagUploadAssets(CloudSyncData & uploadData)1003 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1004 {
1005     if (!IsDataContainAssets()) {
1006         return;
1007     }
1008     std::vector<Field> assetFields;
1009     {
1010         std::lock_guard<std::mutex> autoLock(dataLock_);
1011         assetFields = currentContext_.assetFields[currentContext_.tableName];
1012     }
1013 
1014     for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1015         for (const Field &assetField : assetFields) {
1016             (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1017         }
1018     }
1019     for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1020         for (const Field &assetField : assetFields) {
1021             (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1022         }
1023     }
1024 }
1025 
IsLockInDownload()1026 bool CloudSyncer::IsLockInDownload()
1027 {
1028     std::lock_guard<std::mutex> autoLock(dataLock_);
1029     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1030         return false;
1031     }
1032     auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1033     return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1034 }
1035 
SetCurrentTaskFailedInMachine(int errCode)1036 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1037 {
1038     std::lock_guard<std::mutex> autoLock(dataLock_);
1039     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1040     return CloudSyncEvent::ERROR_EVENT;
1041 }
1042 
InitCloudSyncStateMachine()1043 void CloudSyncer::InitCloudSyncStateMachine()
1044 {
1045     CloudSyncStateMachine::Initialize();
1046     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1047         return SyncMachineDoDownload();
1048     });
1049     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1050         return SyncMachineDoUpload();
1051     });
1052     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1053         return SyncMachineDoFinished();
1054     });
1055     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1056         return SyncMachineDoRepeatCheck();
1057     });
1058 }
1059 
SyncMachineDoRepeatCheck()1060 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1061 {
1062     auto config = storageProxy_->GetCloudSyncConfig();
1063     {
1064         std::lock_guard<std::mutex> autoLock(dataLock_);
1065         if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1066             return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1067         }
1068         currentContext_.repeatCount++;
1069         if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1070             LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1071                 config.maxRetryConflictTimes);
1072             SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1073             return CloudSyncEvent::ERROR_EVENT;
1074         }
1075         LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1076             currentContext_.repeatCount);
1077     }
1078     return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1079 }
1080 
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1081 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1082 {
1083     // table exist reference should download every times
1084     if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1085         return;
1086     }
1087     std::lock_guard<std::mutex> autoLock(dataLock_);
1088     currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1089 }
1090 
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1091 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1092 {
1093     CloudSyncData uploadData(tableName, uploadParam.mode);
1094     SetUploadDataFlag(uploadParam.taskId, uploadData);
1095     auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1096     if (err != E_OK) {
1097         return err;
1098     }
1099     ContinueToken continueStmtToken = nullptr;
1100     int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1101     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1102         LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1103         return ret;
1104     }
1105     uploadParam.count -= uploadData.ignoredCount;
1106     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1107     ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken);
1108     if (ret != -E_TASK_PAUSED) {
1109         // reset watermark to zero when task no paused
1110         RecordWaterMark(uploadParam.taskId, 0u);
1111     }
1112     if (continueStmtToken != nullptr) {
1113         storageProxy_->ReleaseContinueToken(continueStmtToken);
1114     }
1115     return ret;
1116 }
1117 
IsTableFinishInUpload(const std::string & table)1118 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1119 {
1120     std::lock_guard<std::mutex> autoLock(dataLock_);
1121     return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1122 }
1123 
MarkUploadFinishIfNeed(const std::string & table)1124 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1125 {
1126     // table exist reference should upload every times
1127     if (storageProxy_->IsTableExistReference(table)) {
1128         return;
1129     }
1130     std::lock_guard<std::mutex> autoLock(dataLock_);
1131     currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1132 }
1133 
IsNeedUpdateAsset(const VBucket & data)1134 bool CloudSyncer::IsNeedUpdateAsset(const VBucket &data)
1135 {
1136     for (const auto &item : data) {
1137         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1138         if (asset != nullptr) {
1139             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
1140             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1141                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1142                 return true;
1143             }
1144             continue;
1145         }
1146         const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1147         if (assets == nullptr) {
1148             continue;
1149         }
1150         for (const auto &oneAsset : *assets) {
1151             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
1152             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1153                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1154                 return true;
1155             }
1156         }
1157     }
1158     return false;
1159 }
1160 
GetCloudTaskStatus(uint64_t taskId) const1161 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1162 {
1163     std::lock_guard<std::mutex> autoLock(dataLock_);
1164     auto iter = cloudTaskInfos_.find(taskId);
1165     SyncProcess syncProcess;
1166     if (iter == cloudTaskInfos_.end()) {
1167         syncProcess.process = ProcessStatus::FINISHED;
1168         syncProcess.errCode = NOT_FOUND;
1169         LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1170         return syncProcess;
1171     }
1172     syncProcess.process = iter->second.status;
1173     syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1174     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1175     if (currentContext_.currentTaskId == taskId) {
1176         notifier = currentContext_.notifier;
1177     }
1178     bool hasNotifier = notifier != nullptr;
1179     if (hasNotifier) {
1180         syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1181     }
1182     LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1183         iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1184     return syncProcess;
1185 }
1186 
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1187 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1188 {
1189     if (taskInfo.taskId != INVALID_TASK_ID) {
1190         if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1191             LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1192                 taskInfo.storeId.c_str());
1193             return -E_INVALID_ARGS;
1194         }
1195         lastTaskId_ = taskInfo.taskId;
1196         LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1197         return E_OK;
1198     }
1199     lastTaskId_++;
1200     if (lastTaskId_ == UINT64_MAX) {
1201         lastTaskId_ = 1u;
1202     }
1203     taskInfo.taskId = lastTaskId_;
1204     return E_OK;
1205 }
1206 }