• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "store_types.h"
31 #include "strategy_factory.h"
32 #include "version.h"
33 
34 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)35 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
36 {
37     Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
38     waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
39     RecordWaterMark(taskId, 0u);
40 }
41 
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)42 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
43 {
44     std::lock_guard<std::mutex> autoLock(dataLock_);
45     std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
46     cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
47 }
48 
ReloadUploadInfoIfNeed(const UploadParam & param,InnerProcessInfo & info)49 void CloudSyncer::ReloadUploadInfoIfNeed(const UploadParam &param, InnerProcessInfo &info)
50 {
51     info.upLoadInfo.total = static_cast<uint32_t>(param.count);
52     Info lastUploadInfo;
53     UploadRetryInfo retryInfo;
54     GetLastUploadInfo(info.tableName, lastUploadInfo, retryInfo);
55     info.upLoadInfo.total += lastUploadInfo.successCount;
56     info.upLoadInfo.successCount += lastUploadInfo.successCount;
57     info.upLoadInfo.failCount += lastUploadInfo.failCount;
58     info.upLoadInfo.insertCount += lastUploadInfo.insertCount;
59     info.upLoadInfo.updateCount += lastUploadInfo.updateCount;
60     info.upLoadInfo.deleteCount += lastUploadInfo.deleteCount;
61     info.retryInfo.uploadBatchRetryCount = retryInfo.uploadBatchRetryCount;
62     LOGD("[CloudSyncer] resume upload, last success count %" PRIu32 ", last fail count %" PRIu32,
63         lastUploadInfo.successCount, lastUploadInfo.failCount);
64 }
65 
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,UploadRetryInfo & retryInfo)66 void CloudSyncer::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo)
67 {
68     std::lock_guard<std::mutex> autoLock(dataLock_);
69     return currentContext_.notifier->GetLastUploadInfo(tableName, lastUploadInfo, retryInfo);
70 }
71 
GetCloudGidAndFillExtend(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,VBucket & extend)72 int CloudSyncer::GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj,
73     VBucket &extend)
74 {
75     int errCode = GetCloudGid(taskId, tableName, obj);
76     if (errCode != E_OK) {
77         LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
78         return errCode;
79     }
80     return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
81 }
82 
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)83 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
84     VBucket &extend)
85 {
86     extend = {
87         {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
88     };
89 
90     QuerySyncObject obj = GetQuerySyncObject(tableName);
91     if (obj.IsContainQueryNodes()) {
92         return GetCloudGidAndFillExtend(taskId, tableName, obj, extend);
93     }
94     if (IsCompensatedTask(taskId)) {
95         int errCode = GetCloudGid(taskId, tableName, obj);
96         if (errCode != E_OK) {
97             return errCode;
98         }
99         if (obj.IsContainQueryNodes()) {
100             return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
101         }
102     }
103     extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
104     return E_OK;
105 }
106 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)107 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
108 {
109     std::vector<std::string> cloudGid;
110     bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
111     int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
112     if (errCode != E_OK) {
113         LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
114     } else if (!cloudGid.empty()) {
115         obj.SetCloudGid(cloudGid);
116     }
117     LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
118     return errCode;
119 }
120 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,std::vector<std::string> & cloudGid)121 int CloudSyncer::GetCloudGid(
122     TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid)
123 {
124     bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
125     int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
126     if (errCode != E_OK) {
127         LOGE("[CloudSyncer] Failed to get cloud gid, taskid:%" PRIu64 ", table name: %s, length: %zu, %d.",
128             taskId, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode);
129     }
130     return errCode;
131 }
132 
GetQuerySyncObject(const std::string & tableName)133 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
134 {
135     std::lock_guard<std::mutex> autoLock(dataLock_);
136     for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
137         if (item.GetTableName() == tableName) {
138             return item;
139         }
140     }
141     LOGW("[CloudSyncer] not found query in cache");
142     QuerySyncObject querySyncObject;
143     querySyncObject.SetTableName(tableName);
144     return querySyncObject;
145 }
146 
UpdateProcessWhenUploadFailed(InnerProcessInfo & info)147 void CloudSyncer::UpdateProcessWhenUploadFailed(InnerProcessInfo &info)
148 {
149     info.tableStatus = ProcessStatus::FINISHED;
150     std::lock_guard<std::mutex> autoLock(dataLock_);
151     currentContext_.notifier->UpdateProcess(info);
152     currentContext_.notifier->UpdateUploadRetryInfo(info);
153 }
154 
NotifyUploadFailed(int errCode,InnerProcessInfo & info)155 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
156 {
157     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
158         LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
159     } else {
160         LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
161         info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
162     }
163     UpdateProcessWhenUploadFailed(info);
164 }
165 
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)166 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
167 {
168     return BatchInsertOrUpdate(insertInfo, uploadData, innerProcessInfo, true);
169 }
170 
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)171 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
172 {
173     return BatchInsertOrUpdate(updateInfo, uploadData, innerProcessInfo, false);
174 }
175 
BatchInsertOrUpdate(Info & uploadInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo,bool isInsert)176 int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
177     bool isInsert)
178 {
179     uint32_t retryCount = 0;
180     int errCode = E_OK;
181     if (isInsert) {
182         errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
183             uploadData.insData.extend, uploadInfo, retryCount);
184         innerProcessInfo.upLoadInfo.insertCount += uploadInfo.successCount;
185     } else {
186         errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
187             uploadData.updData.extend, uploadInfo, retryCount);
188         innerProcessInfo.upLoadInfo.updateCount += uploadInfo.successCount;
189     }
190     innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount;
191     innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount;
192     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
193         ProcessVersionConflictInfo(innerProcessInfo, retryCount);
194     }
195     if (errCode != E_OK) {
196         LOGE("[CloudSyncer][BatchInsertOrUpdate] BatchInsertOrUpdate with error, ret is %d.", errCode);
197     }
198     if (uploadData.isCloudVersionRecord) {
199         return errCode;
200     }
201     return BackFillAfterBatchUpload(uploadData, isInsert, errCode);
202 }
203 
BackFillAfterBatchUpload(CloudSyncData & uploadData,bool isInsert,int batchUploadResult)204 int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult)
205 {
206     bool isSharedTable = false;
207     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
208     if (ret != E_OK) {
209         LOGE("[CloudSyncer] BackFillAfterBatchUpload cannot judge the table is shared table. %d", ret);
210         return ret;
211     }
212     int errCode = batchUploadResult;
213     if (!isSharedTable) {
214         CloudWaterType type = isInsert ? CloudWaterType::INSERT : CloudWaterType::UPDATE;
215         CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
216         ret = CloudSyncUtils::FillAssetIdToAssets(data, errCode, type);
217         if (ret != E_OK) {
218             LOGW("[CloudSyncer][BackFillAfterBatchUpload] FillAssetIdToAssets with error, ret is %d.", ret);
219         }
220     }
221     OpType opType = isInsert ? OpType::INSERT : OpType::UPDATE;
222     if (errCode != E_OK) {
223         storageProxy_->FillCloudGidIfSuccess(opType, uploadData);
224         CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
225         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend);
226         if (isSkip) {
227             LOGI("[CloudSyncer][BackFillAfterBatchUpload] Try to FillCloudLogAndAsset when assets missing: %d",
228                 errCode);
229             return E_OK;
230         } else {
231             LOGE("[CloudSyncer][BackFillAfterBatchUpload] errCode: %d, can not skip assets missing record.", errCode);
232             return errCode;
233         }
234     }
235     int errorCode = storageProxy_->FillCloudLogAndAsset(opType, uploadData);
236     if ((errorCode != E_OK) || (ret != E_OK)) {
237         LOGE("[CloudSyncer] Failed to fill back when doing upload insData or updData, %d.", errorCode);
238         return ret == E_OK ? errorCode : ret;
239     }
240     return E_OK;
241 }
242 
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)243 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
244     std::map<std::string, Assets> &downloadAssets)
245 {
246     bool isSharedTable = false;
247     int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
248     if (errCode != E_OK) {
249         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
250         return errCode;
251     }
252     int transactionCode = E_OK;
253     // shared table don't download, so just begin transaction once
254     if (isSharedTable) {
255         transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
256     }
257     if (transactionCode != E_OK) {
258         LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
259         return transactionCode;
260     }
261     errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
262     if (isSharedTable) {
263         transactionCode = storageProxy_->Commit();
264         if (transactionCode != E_OK) {
265             LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
266         }
267     }
268     return (errCode == E_OK) ? transactionCode : errCode;
269 }
270 
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)271 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
272     const DownloadItem &downloadItem, VBucket &dbAssets)
273 {
274     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
275     auto &errCode = res.first;
276     if (!isSharedTable) {
277         errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
278     }
279     if (errCode != E_OK) {
280         LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
281         return res;
282     }
283     res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
284         downloadItem.hashKey, dbAssets);
285     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
286         if (errCode != -E_CLOUD_GID_MISMATCH) {
287             LOGE("[CloudSyncer] get assets from db failed %d", errCode);
288         }
289         if (!isSharedTable) {
290             (void)storageProxy_->Rollback();
291         }
292         return res;
293     }
294     if (!isSharedTable) {
295         errCode = storageProxy_->Commit();
296     }
297     if (errCode != E_OK) {
298         LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
299     }
300     return res;
301 }
302 
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)303 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
304     std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
305     std::map<std::string, Assets> &tmpAssetsToDelete)
306 {
307     std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
308     for (auto &[col, assets] : tmpAssetsToDownload) {
309         int i = 0;
310         for (auto &asset : assets) {
311             asset.flag = tmpFlags[col][i++];
312             if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
313                 continue;
314             }
315             if (downloadCode == E_OK) {
316                 asset.status = NORMAL;
317             } else {
318                 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
319             }
320         }
321     }
322     for (auto &[col, assets] : tmpAssetsToDelete) {
323         for (auto &asset : assets) {
324             asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
325             if (deleteCode == E_OK) {
326                 asset.status = NORMAL;
327             } else {
328                 asset.status = ABNORMAL;
329             }
330             downloadAssets[col].push_back(asset);
331         }
332     }
333     return downloadAssets;
334 }
335 
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)336 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
337     const DownloadItem &downloadItem, VBucket &dbAssets)
338 {
339     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK)};
340     auto &ret = res.first;
341     auto &status = res.second;
342     if (info.isAsyncDownload) {
343         res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
344             downloadItem.hashKey, dbAssets);
345     } else {
346         res = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
347     }
348     if (ret == -E_CLOUD_GID_MISMATCH) {
349         LOGW("[CloudSyncer] skip download asset because gid mismatch");
350         errCode = E_OK;
351         return true;
352     }
353     if (CloudStorageUtils::IsDataLocked(status)) {
354         LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
355         errCode = E_OK;
356         return true;
357     }
358     if (ret != E_OK) {
359         LOGE("[CloudSyncer]%s download get assets from DB failed: %d, return errCode: %d",
360             info.isAsyncDownload ? "Async" : "Sync", ret, errCode);
361         errCode = (errCode != E_OK) ? errCode : ret;
362         return true;
363     }
364     return false;
365 }
366 
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)367 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
368 {
369     if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
370         downloadItem.recordConflict = true;
371         errCode = E_OK;
372         return false;
373     }
374     errCode = (errCode != E_OK) ? errCode : deleteCode;
375     errCode = (errCode != E_OK) ? errCode : downloadCode;
376     if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
377         return false;
378     }
379     return true;
380 }
381 
GetAssetsToDownload(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToDownload,std::map<std::string,std::vector<uint32_t>> & tmpFlags)382 void GetAssetsToDownload(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
383     std::map<std::string, Assets> &assetsToDownload, std::map<std::string, std::vector<uint32_t>> &tmpFlags)
384 {
385     if (isSharedTable) {
386         LOGD("[CloudSyncer] skip download for shared table");
387         return;
388     }
389     for (auto &[col, assets] : downloadAssets) {
390         for (auto &asset : assets) {
391             if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE) &&
392                 AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
393                 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
394                 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
395                     static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
396                 assetsToDownload[col].push_back(asset);
397                 tmpFlags[col].push_back(asset.flag);
398             } else {
399                 LOGD("[CloudSyncer] skip download asset...");
400             }
401         }
402     }
403 }
404 
GetAssetsToRemove(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToRemove)405 void GetAssetsToRemove(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
406     std::map<std::string, Assets> &assetsToRemove)
407 {
408     if (isSharedTable) {
409         LOGD("[CloudSyncer] skip remove for shared table");
410         return;
411     }
412     for (auto &[col, assets] : downloadAssets) {
413         for (auto &asset : assets) {
414             if (asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) &&
415                 AssetOperationUtils::CalAssetRemoveOperation(col, asset, dbAssets) ==
416                 AssetOperationUtils::AssetOpType::HANDLE) {
417                 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
418                 assetsToRemove[col].push_back(asset);
419             }
420         }
421     }
422 }
423 
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)424 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
425     DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
426 {
427     int errCode = E_OK;
428     VBucket dbAssets;
429     std::map<std::string, Assets> tmpAssetsToRemove;
430     if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
431         GetAssetsToRemove(downloadAssets, dbAssets, isSharedTable, tmpAssetsToRemove);
432     }
433     auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
434         tmpAssetsToRemove);
435 
436     std::map<std::string, Assets> tmpAssetsToDownload;
437     std::map<std::string, std::vector<uint32_t>> tmpFlags;
438     if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
439         GetAssetsToDownload(downloadAssets, dbAssets, isSharedTable, tmpAssetsToDownload, tmpFlags);
440     }
441     auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
442     if (downloadCode != SKIP_ASSET && !CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
443         return errCode;
444     }
445 
446     // copy asset back
447     downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
448         tmpAssetsToRemove);
449     return errCode;
450 }
451 
SeparateNormalAndFailAssets(const std::map<std::string,Assets> & assetsMap,VBucket & normalAssets,VBucket & failedAssets)452 void CloudSyncer::SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
453     VBucket &failedAssets)
454 {
455     for (auto &[key, assets] : assetsMap) {
456         Assets tempFailedAssets;
457         Assets tempNormalAssets;
458         int32_t otherStatusCnt = 0;
459         for (auto &asset : assets) {
460             if (asset.status == AssetStatus::NORMAL) {
461                 tempNormalAssets.push_back(asset);
462             } else if (asset.status == AssetStatus::ABNORMAL) {
463                 tempFailedAssets.push_back(asset);
464             } else {
465                 otherStatusCnt++;
466                 LOGW("[CloudSyncer] download err, statue %d count %d", asset.status, otherStatusCnt);
467             }
468         }
469         LOGI("[CloudSyncer] download asset times, normalCount %d, abnormalCount %d, otherStatusCnt %d",
470             tempNormalAssets.size(), tempFailedAssets.size(), otherStatusCnt);
471         if (tempFailedAssets.size() > 0) {
472             failedAssets[key] = std::move(tempFailedAssets);
473         }
474         if (tempNormalAssets.size() > 0) {
475             normalAssets[key] = std::move(tempNormalAssets);
476         }
477     }
478 }
479 
CommitDownloadAssets(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)480 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
481     DownloadCommitList &commitList, uint32_t &successCount)
482 {
483     int errCode = storageProxy_->SetLogTriggerStatus(false);
484     if (errCode != E_OK) {
485         return errCode;
486     }
487     for (auto &item : commitList) {
488         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
489         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
490         std::map<std::string, Assets> assetsMap = std::get<1>(item);
491         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
492         VBucket normalAssets;
493         VBucket failedAssets;
494         normalAssets[CloudDbConstant::GID_FIELD] = gid;
495         failedAssets[CloudDbConstant::GID_FIELD] = gid;
496         if (setAllNormal) {
497             for (auto &[key, asset] : assetsMap) {
498                 normalAssets[key] = std::move(asset);
499             }
500         } else {
501             SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
502         }
503         if (!downloadItem.recordConflict) {
504             errCode = FillCloudAssets(info, normalAssets, failedAssets);
505             if (errCode != E_OK) {
506                 break;
507             }
508         }
509         LogInfo logInfo;
510         logInfo.cloudGid = gid;
511         // download must contain gid, just set the default value here.
512         logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
513         logInfo.hashKey = downloadItem.hashKey;
514         logInfo.timestamp = downloadItem.timestamp;
515         // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
516         if (failedAssets.size() > 1) {
517             logInfo.timestamp = 0u;
518         }
519 
520         errCode = storageProxy_->UpdateRecordFlag(
521             info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
522         if (errCode != E_OK) {
523             break;
524         }
525         successCount++;
526     }
527     int ret = storageProxy_->SetLogTriggerStatus(true);
528     return errCode == E_OK ? ret : errCode;
529 }
530 
FillCloudAssetsForOneRecord(const std::string & gid,const std::map<std::string,Assets> & assetsMap,InnerProcessInfo & info,bool setAllNormal,bool & isExistAssetDownloadFail)531 int CloudSyncer::FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
532     InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail)
533 {
534     VBucket normalAssets;
535     VBucket failedAssets;
536     normalAssets[CloudDbConstant::GID_FIELD] = gid;
537     failedAssets[CloudDbConstant::GID_FIELD] = gid;
538     if (setAllNormal) {
539         for (auto &[key, asset] : assetsMap) {
540             normalAssets[key] = std::move(asset);
541         }
542     } else {
543         SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
544     }
545     isExistAssetDownloadFail = failedAssets.size() > 1; // There is initially one element present
546     return FillCloudAssets(info, normalAssets, failedAssets);
547 }
548 
UpdateRecordFlagForOneRecord(const std::string & gid,const DownloadItem & downloadItem,InnerProcessInfo & info,bool isExistAssetDownloadFail)549 int CloudSyncer::UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem,
550     InnerProcessInfo &info, bool isExistAssetDownloadFail)
551 {
552     LogInfo logInfo;
553     logInfo.cloudGid = gid;
554     // download must contain gid, just set the default value here.
555     logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
556     logInfo.hashKey = downloadItem.hashKey;
557     logInfo.timestamp = downloadItem.timestamp;
558     // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
559     if (isExistAssetDownloadFail) {
560         logInfo.timestamp = 0u;
561     }
562     return storageProxy_->UpdateRecordFlag(info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
563 }
564 
CommitDownloadAssetsForAsyncDownload(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)565 int CloudSyncer::CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
566     DownloadCommitList &commitList, uint32_t &successCount)
567 {
568     int errCode = storageProxy_->SetLogTriggerStatus(false, true);
569     if (errCode != E_OK) {
570         return errCode;
571     }
572     for (auto &item : commitList) {
573         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
574         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
575         std::map<std::string, Assets> assetsMap = std::get<1>(item);
576         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
577         LockStatus status = LockStatus::BUTT;
578         errCode = storageProxy_->GetLockStatusByGid(info.tableName, gid, status);
579         if (errCode == E_OK && status != LockStatus::UNLOCK) {
580             continue;
581         }
582 
583         bool isExistAssetDownloadFail = false;
584         if (!downloadItem.recordConflict) {
585             errCode = FillCloudAssetsForOneRecord(gid, assetsMap, info, setAllNormal, isExistAssetDownloadFail);
586             if (errCode != E_OK) {
587                 break;
588             }
589         }
590 
591         errCode = UpdateRecordFlagForOneRecord(gid, downloadItem, info, isExistAssetDownloadFail);
592         if (errCode != E_OK) {
593             break;
594         }
595         successCount++;
596     }
597     int ret = storageProxy_->SetLogTriggerStatus(true, true);
598     return errCode == E_OK ? ret : errCode;
599 }
600 
GenerateCompensatedSync(CloudTaskInfo & taskInfo)601 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
602 {
603     std::vector<QuerySyncObject> syncQuery;
604     std::vector<std::string> users;
605     int errCode =
606         CloudSyncUtils::GetQueryAndUsersForCompensatedSync(CanStartAsyncDownload(), storageProxy_, users, syncQuery);
607     if (errCode != E_OK) {
608         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
609         return;
610     }
611     if (syncQuery.empty()) {
612         LOGD("[CloudSyncer] Not need generate compensated sync");
613         return;
614     }
615     for (const auto &it : syncQuery) {
616         CloudTaskInfo compensatedTaskInfo = taskInfo;
617         compensatedTaskInfo.queryList.push_back(it);
618         Sync(compensatedTaskInfo);
619         taskInfo.callback = nullptr;
620         LOGI("[CloudSyncer] Generate compensated sync finished");
621     }
622 }
623 
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)624 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
625 {
626     if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
627         return;
628     }
629     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
630     if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
631         return;
632     }
633     if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
634         return;
635     }
636     info.tableStatus = ProcessStatus::FINISHED;
637     info.upLoadInfo.batchIndex++;
638     NotifyInBatchUpload(uploadParam, info, true);
639 }
640 
SaveCursorIfNeed(const std::string & tableName)641 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
642 {
643     std::string cursor = "";
644     int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
645     if (errCode != E_OK) {
646         LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
647         return errCode;
648     }
649     if (!cursor.empty()) {
650         return E_OK;
651     }
652     auto res = cloudDB_.GetEmptyCursor(tableName);
653     if (res.first != E_OK) {
654         LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
655         return res.first;
656     }
657     if (res.second.empty()) {
658         LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
659         return -E_CLOUD_ERROR;
660     }
661     errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
662     if (errCode != E_OK) {
663         LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
664     }
665     return errCode;
666 }
667 
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)668 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
669 {
670     std::string hashDev;
671     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
672     if (errCode != E_OK) {
673         LOGE("[CloudSyncer] Failed to get local identity.");
674         return errCode;
675     }
676     errCode = SaveCursorIfNeed(table);
677     if (errCode != E_OK) {
678         return errCode;
679     }
680     errCode = CheckTaskIdValid(taskInfo.taskId);
681     if (errCode != E_OK) {
682         LOGW("[CloudSyncer] task is invalid, abort sync");
683         return errCode;
684     }
685     errCode = DoDownload(taskInfo.taskId, isFirstDownload);
686     if (errCode != E_OK) {
687         LOGE("[CloudSyncer] download failed %d", errCode);
688     }
689     return errCode;
690 }
691 
IsClosed() const692 bool CloudSyncer::IsClosed() const
693 {
694     return closed_ || IsKilled();
695 }
696 
UpdateFlagForSavedRecord(const SyncParam & param)697 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam &param)
698 {
699     DownloadList downloadList;
700     {
701         std::lock_guard<std::mutex> autoLock(dataLock_);
702         downloadList = currentContext_.assetDownloadList;
703     }
704     std::set<std::string> downloadGid;
705     std::set<std::string> consistentGid;
706     for (const auto &tuple : downloadList) {
707         if (CloudSyncUtils::IsContainDownloading(tuple)) {
708             downloadGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
709         }
710         consistentGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
711     }
712     if (IsCurrentAsyncDownloadTask()) {
713         int errCode = storageProxy_->MarkFlagAsAssetAsyncDownload(param.tableName, param.downloadData, downloadGid);
714         if (errCode != E_OK) {
715             LOGE("[CloudSyncer] Failed to mark flag consistent errCode %d", errCode);
716             return errCode;
717         }
718     }
719     return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, consistentGid);
720 }
721 
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)722 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
723 {
724     uint32_t retryCount = 0;
725     int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
726         uploadData.delData.extend, deleteInfo, retryCount);
727     innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
728     innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
729     innerProcessInfo.upLoadInfo.failCount += deleteInfo.failCount;
730     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
731         ProcessVersionConflictInfo(innerProcessInfo, retryCount);
732     }
733     if (errCode != E_OK) {
734         LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
735         storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
736         return errCode;
737     }
738     errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
739     if (errCode != E_OK) {
740         LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
741     }
742     return errCode;
743 }
744 
IsCompensatedTask(TaskId taskId)745 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
746 {
747     std::lock_guard<std::mutex> autoLock(dataLock_);
748     return cloudTaskInfos_[taskId].compensatedTask;
749 }
750 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)751 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
752 {
753     return cloudDB_.SetCloudDB(cloudDBs);
754 }
755 
CleanAllWaterMark()756 void CloudSyncer::CleanAllWaterMark()
757 {
758     storageProxy_->CleanAllWaterMark();
759 }
760 
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)761 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
762 {
763     downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
764     downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
765     downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
766     downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
767     downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
768     downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
769     downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
770 }
771 
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)772 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
773     const bool isFirstDownload)
774 {
775     bool isNeedNotify = false;
776     {
777         std::lock_guard<std::mutex> autoLock(dataLock_);
778         // only when the first download and the task no need upload actually, notify the process, otherwise,
779         // the process will notify in the upload procedure, which can guarantee the notify order of the tables
780         isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
781     }
782     if (!isNeedNotify) {
783         return;
784     }
785     for (size_t i = 0; i < needNotifyTables.size(); ++i) {
786         UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
787     }
788 }
789 
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)790 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
791 {
792     std::string tableName;
793     int ret = GetCurrentTableName(tableName);
794     if (ret != E_OK) {
795         LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
796         return ret;
797     }
798 
799     ret = storageProxy_->StartTransaction();
800     if (ret != E_OK) {
801         LOGE("[CloudSyncer] start transaction failed before getting upload count.");
802         return ret;
803     }
804 
805     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
806         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
807     if (ret != E_OK) {
808         // GetUploadCount will return E_OK when upload count is zero.
809         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
810     }
811     // No need Rollback when GetUploadCount failed
812     storageProxy_->Commit();
813     return ret;
814 }
815 
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)816 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
817     bool needNotify)
818 {
819     LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
820     InnerProcessInfo innerProcessInfo;
821     innerProcessInfo.tableName = tableName;
822     innerProcessInfo.upLoadInfo.total = 0;  // count is zero
823     innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
824     {
825         std::lock_guard<std::mutex> autoLock(dataLock_);
826         if (!needNotify) {
827             currentContext_.notifier->UpdateProcess(innerProcessInfo);
828         } else {
829             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
830         }
831     }
832 }
833 
SetNeedUpload(bool isNeedUpload)834 void CloudSyncer::SetNeedUpload(bool isNeedUpload)
835 {
836     std::lock_guard<std::mutex> autoLock(dataLock_);
837     currentContext_.isNeedUpload = isNeedUpload;
838 }
839 
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)840 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
841 {
842     std::vector<std::string> needNotifyTables;
843     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
844         std::string table;
845         {
846             std::lock_guard<std::mutex> autoLock(dataLock_);
847             if (currentContext_.processRecorder == nullptr) {
848                 LOGE("[CloudSyncer] process recorder of current context is nullptr.");
849                 return -E_INTERNAL_ERROR;
850             }
851             if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
852                 taskInfo.table[i])) {
853                 continue;
854             }
855             LOGD("[CloudSyncer] try download table, index: %zu, table name: %s, length: %u",
856                 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
857             currentContext_.tableName = taskInfo.table[i];
858             table = currentContext_.tableName;
859         }
860         int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
861         if (errCode != E_OK) {
862             return errCode;
863         }
864         CheckDataAfterDownload(table);
865         MarkDownloadFinishIfNeed(table);
866         // needUpload indicate that the syncMode need push
867         if (needUpload) {
868             int64_t count = 0;
869             errCode = GetUploadCountByTable(taskInfo.taskId, count);
870             if (errCode != E_OK) {
871                 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
872                 return errCode;
873             }
874             // count > 0 means current table need upload actually
875             if (count > 0) {
876                 SetNeedUpload(true);
877                 continue;
878             }
879             needNotifyTables.emplace_back(table);
880         }
881         errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
882         if (errCode != E_OK) {
883             LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
884             return errCode;
885         }
886     }
887     DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
888     TriggerAsyncDownloadAssetsInTaskIfNeed(isFirstDownload);
889     return E_OK;
890 }
891 
IsNeedGetLocalWater(TaskId taskId)892 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
893 {
894     return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
895         !IsCompensatedTask(taskId);
896 }
897 
TryToAddSyncTask(CloudTaskInfo && taskInfo)898 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
899 {
900     if (closed_) {
901         LOGW("[CloudSyncer] syncer is closed, should not sync now");
902         return -E_DB_CLOSED;
903     }
904     std::shared_ptr<DataBaseSchema> cloudSchema;
905     int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
906     if (errCode != E_OK) {
907         LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
908         return errCode;
909     }
910     std::lock_guard<std::mutex> autoLock(dataLock_);
911     taskInfo.priorityLevel = (!taskInfo.priorityTask || taskInfo.compensatedTask)
912                                  ? CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL
913                                  : taskInfo.priorityLevel;
914     errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
915     if (errCode != E_OK) {
916         return errCode;
917     }
918     errCode = GenerateTaskIdIfNeed(taskInfo);
919     if (errCode != E_OK) {
920         return errCode;
921     }
922     auto taskId = taskInfo.taskId;
923     cloudTaskInfos_[taskId] = std::move(taskInfo);
924     taskQueue_.insert({cloudTaskInfos_[taskId].priorityLevel, taskId});
925     LOGI("[CloudSyncer]Add task ok, storeId %.3s, priority %d, priorityLevel %" PRId32 ", taskId %" PRIu64 " async %d",
926         cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].priorityTask,
927         cloudTaskInfos_[taskId].priorityLevel, cloudTaskInfos_[taskId].taskId,
928         static_cast<int>(cloudTaskInfos_[taskId].asyncDownloadAssets));
929     MarkCurrentTaskPausedIfNeed(taskInfo);
930     if (!cloudTaskInfos_[taskId].priorityTask) {
931         MergeTaskInfo(cloudSchema, taskId);
932     }
933     return E_OK;
934 }
935 
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)936 void CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
937 {
938     if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
939         return;
940     }
941     bool isMerge = false;
942     TaskId checkTaskId = taskId;
943     do {
944         std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
945     } while (isMerge);
946 }
947 
RemoveTaskFromQueue(int32_t priorityLevel,TaskId taskId)948 void CloudSyncer::RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId)
949 {
950     for (auto it = taskQueue_.find(priorityLevel); it != taskQueue_.end(); ++it) {
951         if (it->second == taskId) {
952             taskQueue_.erase(it);
953             return;
954         }
955     }
956 }
957 
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)958 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
959 {
960     std::pair<bool, TaskId> res;
961     auto &[merge, nextTryTask] = res;
962     TaskId beMergeTask = INVALID_TASK_ID;
963     TaskId runningTask = currentContext_.currentTaskId;
964     auto commonLevelTask = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
965     for (auto it = commonLevelTask.first; it != commonLevelTask.second; ++it) {
966         TaskId taskId = it->second;
967         if (taskId == runningTask || taskId == tryTaskId) {  // LCOV_EXCL_BR_LINE
968             continue;
969         }
970         if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
971             continue;
972         }
973         if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
974             beMergeTask = taskId;
975             nextTryTask = tryTaskId;
976             merge = true;
977             break;
978         }
979         if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
980             beMergeTask = tryTaskId;
981             nextTryTask = taskId;
982             merge = true;
983             break;
984         }
985     }
986     if (!merge) { // LCOV_EXCL_BR_LINE
987         return res;
988     }
989     if (beMergeTask > nextTryTask) { // LCOV_EXCL_BR_LINE
990         std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
991     }
992     AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
993     auto processNotifier = std::make_shared<ProcessNotifier>(this);
994     processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
995         cloudTaskInfos_[beMergeTask].users);
996     cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
997     cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
998     processNotifier->SetAllTableFinish();
999     processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
1000     RemoveTaskFromQueue(cloudTaskInfos_[beMergeTask].priorityLevel, beMergeTask);
1001     cloudTaskInfos_.erase(beMergeTask);
1002     LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
1003     return res;
1004 }
1005 
IsTaskCanMerge(const CloudTaskInfo & taskInfo)1006 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
1007 {
1008     return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
1009         taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
1010 }
1011 
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)1012 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
1013 {
1014     const auto &taskInfo = cloudTaskInfos_[taskId];
1015     const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
1016     return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
1017         taskInfo.devices == tryMergeTaskInfo.devices &&
1018         taskInfo.asyncDownloadAssets == tryMergeTaskInfo.asyncDownloadAssets;
1019 }
1020 
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)1021 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
1022 {
1023     const auto &source = cloudTaskInfos_[sourceId];
1024     const auto &target = cloudTaskInfos_[targetId];
1025     bool isMerge = true;
1026     for (const auto &table : source.table) {
1027         if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
1028             isMerge = false;
1029             break;
1030         }
1031     }
1032     return isMerge;
1033 }
1034 
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)1035 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
1036     CloudTaskInfo &taskInfo)
1037 {
1038     std::vector<std::string> tmpTables = taskInfo.table;
1039     taskInfo.table.clear();
1040     taskInfo.queryList.clear();
1041     for (const auto &table : cloudSchema->tables) {
1042         if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
1043             taskInfo.table.push_back(table.name);
1044             QuerySyncObject querySyncObject;
1045             querySyncObject.SetTableName(table.name);
1046             taskInfo.queryList.push_back(querySyncObject);
1047         }
1048     }
1049 }
1050 
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)1051 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
1052 {
1053     cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
1054     cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
1055     std::set<std::string> users;
1056     users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
1057     users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
1058     cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
1059     return {target, source};
1060 }
1061 
IsQueryListEmpty(TaskId taskId)1062 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
1063 {
1064     std::lock_guard<std::mutex> autoLock(dataLock_);
1065     return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
1066         [](const auto &item) {
1067             return item.IsContainQueryNodes();
1068     });
1069 }
1070 
IsNeedLock(const UploadParam & param)1071 bool CloudSyncer::IsNeedLock(const UploadParam &param)
1072 {
1073     return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
1074 }
1075 
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)1076 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
1077 {
1078     std::pair<int, Timestamp> res = { E_OK, 0u };
1079     if (IsNeedGetLocalWater(uploadParam.taskId)) {
1080         res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
1081     }
1082     uploadParam.localMark = res.second;
1083     return res;
1084 }
1085 
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken,std::vector<ReviseModTimeInfo> & revisedData)1086 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
1087     CloudSyncData &uploadData, ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData)
1088 {
1089     int ret = E_OK;
1090     uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
1091     bool isLocked = false;
1092     while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
1093         revisedData.insert(revisedData.end(), uploadData.revisedData.begin(), uploadData.revisedData.end());
1094         ret = PreProcessBatchUpload(uploadParam, info, uploadData);
1095         if (ret != E_OK) {
1096             break;
1097         }
1098         info.upLoadInfo.batchIndex = ++batchIndex;
1099         if (IsNeedLock(uploadParam) && !isLocked) {
1100             ret = LockCloudIfNeed(uploadParam.taskId);
1101             if (ret != E_OK) {
1102                 break;
1103             }
1104             isLocked = true;
1105         }
1106         ret = DoBatchUpload(uploadData, uploadParam, info);
1107         if (ret != E_OK) {
1108             break;
1109         }
1110         uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
1111         if (continueStmtToken == nullptr) {
1112             break;
1113         }
1114         SetUploadDataFlag(uploadParam.taskId, uploadData);
1115         LOGI("[CloudSyncer] Write local water after upload one batch, table[%s length[%u]], water[%llu]",
1116             DBCommon::StringMiddleMasking(uploadData.tableName).c_str(), uploadData.tableName.length(),
1117             uploadParam.localMark);
1118         RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
1119         ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1120         if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1121             LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1122             break;
1123         }
1124         ChkIgnoredProcess(info, uploadData, uploadParam);
1125     }
1126     if ((ret != E_OK) && (ret != -E_UNFINISHED) && (ret != -E_TASK_PAUSED)) {
1127         NotifyUploadFailed(ret, info);
1128     }
1129     if (isLocked && IsNeedLock(uploadParam)) {
1130         UnlockIfNeed();
1131     }
1132     return ret;
1133 }
1134 
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1135 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1136 {
1137     InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
1138     static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
1139     int errCode = E_OK;
1140     for (const auto &waterType: waterTypes) {
1141         uploadParam.mode = waterType;
1142         errCode = DoUploadByMode(tableName, uploadParam, info);
1143         if (errCode != E_OK) {
1144             break;
1145         }
1146     }
1147     int ret = E_OK;
1148     if (info.upLoadInfo.successCount > 0) {
1149         ret = UploadVersionRecordIfNeed(uploadParam);
1150     }
1151     return errCode != E_OK ? errCode : ret;
1152 }
1153 
UploadVersionRecordIfNeed(const UploadParam & uploadParam)1154 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
1155 {
1156     if (uploadParam.count == 0) {
1157         // no record upload
1158         return E_OK;
1159     }
1160     if (!cloudDB_.IsExistCloudVersionCallback()) {
1161         return E_OK;
1162     }
1163     auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
1164     if (errCode != E_OK) {
1165         return errCode;
1166     }
1167     bool isInsert = !uploadData.insData.record.empty();
1168     CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
1169     if (batchData.record.empty()) {
1170         LOGE("[CloudSyncer] Get invalid cloud version record");
1171         return -E_INTERNAL_ERROR;
1172     }
1173     std::string oriVersion;
1174     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1175     std::string newVersion;
1176     std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1177     if (errCode != E_OK) {
1178         LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1179         return errCode;
1180     }
1181     batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1182     InnerProcessInfo processInfo;
1183     Info info;
1184     std::vector<VBucket> copyRecord = batchData.record;
1185     WaterMark waterMark;
1186     CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1187     errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1188     batchData.record = copyRecord;
1189     CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1190     auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1191     return errCode != E_OK ? errCode : ret;
1192 }
1193 
TagUploadAssets(CloudSyncData & uploadData)1194 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1195 {
1196     if (!IsDataContainAssets()) {
1197         return;
1198     }
1199     std::vector<Field> assetFields;
1200     {
1201         std::lock_guard<std::mutex> autoLock(dataLock_);
1202         assetFields = currentContext_.assetFields[currentContext_.tableName];
1203     }
1204 
1205     for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1206         for (const Field &assetField : assetFields) {
1207             (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1208         }
1209     }
1210     for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1211         for (const Field &assetField : assetFields) {
1212             (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1213         }
1214     }
1215 }
1216 
IsLockInDownload()1217 bool CloudSyncer::IsLockInDownload()
1218 {
1219     std::lock_guard<std::mutex> autoLock(dataLock_);
1220     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1221         return false;
1222     }
1223     auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1224     return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1225 }
1226 
SetCurrentTaskFailedInMachine(int errCode)1227 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1228 {
1229     std::lock_guard<std::mutex> autoLock(dataLock_);
1230     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1231     return CloudSyncEvent::ERROR_EVENT;
1232 }
1233 
InitCloudSyncStateMachine()1234 void CloudSyncer::InitCloudSyncStateMachine()
1235 {
1236     CloudSyncStateMachine::Initialize();
1237     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1238         return SyncMachineDoDownload();
1239     });
1240     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1241         return SyncMachineDoUpload();
1242     });
1243     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1244         return SyncMachineDoFinished();
1245     });
1246     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1247         return SyncMachineDoRepeatCheck();
1248     });
1249 }
1250 
SyncMachineDoRepeatCheck()1251 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1252 {
1253     auto config = storageProxy_->GetCloudSyncConfig();
1254     {
1255         std::lock_guard<std::mutex> autoLock(dataLock_);
1256         if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1257             return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1258         }
1259         currentContext_.repeatCount++;
1260         if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1261             LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1262                 config.maxRetryConflictTimes);
1263             SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1264             return CloudSyncEvent::ERROR_EVENT;
1265         }
1266         LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1267             currentContext_.repeatCount);
1268     }
1269     return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1270 }
1271 
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1272 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1273 {
1274     // table exist reference should download every times
1275     if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1276         return;
1277     }
1278     std::lock_guard<std::mutex> autoLock(dataLock_);
1279     currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1280 }
1281 
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1282 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1283 {
1284     CloudSyncData uploadData(tableName, uploadParam.mode);
1285     SetUploadDataFlag(uploadParam.taskId, uploadData);
1286     auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1287     LOGI("[CloudSyncer] Get local water before upload result: %d, table[%s length[%u]], water[%llu]", err,
1288         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), localWater);
1289     if (err != E_OK) {
1290         return err;
1291     }
1292     ContinueToken continueStmtToken = nullptr;
1293     int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1294     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1295         LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1296         return ret;
1297     }
1298     uploadParam.count -= uploadData.ignoredCount;
1299     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1300     std::vector<ReviseModTimeInfo> revisedData;
1301     ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken, revisedData);
1302     if (ret != -E_TASK_PAUSED) {
1303         // reset watermark to zero when task no paused
1304         RecordWaterMark(uploadParam.taskId, 0u);
1305     }
1306     if (continueStmtToken != nullptr) {
1307         storageProxy_->ReleaseContinueToken(continueStmtToken);
1308     }
1309     if (!revisedData.empty()) {
1310         int errCode = storageProxy_->ReviseLocalModTime(tableName, revisedData);
1311         if (errCode != E_OK) {
1312             LOGE("[CloudSyncer] Failed to revise local modify time: %d, table: %s",
1313                 errCode, DBCommon::StringMiddleMasking(tableName).c_str());
1314         }
1315     }
1316     return ret;
1317 }
1318 
IsTableFinishInUpload(const std::string & table)1319 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1320 {
1321     std::lock_guard<std::mutex> autoLock(dataLock_);
1322     return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1323 }
1324 
MarkUploadFinishIfNeed(const std::string & table)1325 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1326 {
1327     // table exist reference or reference by should upload every times
1328     if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1329         return;
1330     }
1331     std::lock_guard<std::mutex> autoLock(dataLock_);
1332     currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1333 }
1334 
GetCloudTaskStatus(uint64_t taskId) const1335 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1336 {
1337     std::lock_guard<std::mutex> autoLock(dataLock_);
1338     auto iter = cloudTaskInfos_.find(taskId);
1339     SyncProcess syncProcess;
1340     if (iter == cloudTaskInfos_.end()) {
1341         syncProcess.process = ProcessStatus::FINISHED;
1342         syncProcess.errCode = NOT_FOUND;
1343         LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1344         return syncProcess;
1345     }
1346     syncProcess.process = iter->second.status;
1347     syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1348     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1349     if (currentContext_.currentTaskId == taskId) {
1350         notifier = currentContext_.notifier;
1351     }
1352     bool hasNotifier = notifier != nullptr;
1353     if (hasNotifier) {
1354         syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1355     }
1356     LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1357         iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1358     return syncProcess;
1359 }
1360 
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1361 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1362 {
1363     if (taskInfo.taskId != INVALID_TASK_ID) {
1364         if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1365             LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1366                 taskInfo.storeId.c_str());
1367             return -E_INVALID_ARGS;
1368         }
1369         LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1370         return E_OK;
1371     }
1372     lastTaskId_--;
1373     if (lastTaskId_ == INVALID_TASK_ID) {
1374         lastTaskId_ = UINT64_MAX;
1375     }
1376     taskInfo.taskId = lastTaskId_;
1377     return E_OK;
1378 }
1379 
ProcessVersionConflictInfo(InnerProcessInfo & innerProcessInfo,uint32_t retryCount)1380 void CloudSyncer::ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount)
1381 {
1382     innerProcessInfo.retryInfo.uploadBatchRetryCount = retryCount;
1383     CloudSyncConfig config = storageProxy_->GetCloudSyncConfig();
1384     {
1385         std::lock_guard<std::mutex> autoLock(dataLock_);
1386         if (config.maxRetryConflictTimes >= 0 &&
1387             currentContext_.repeatCount + 1 > config.maxRetryConflictTimes) {
1388             innerProcessInfo.upLoadInfo.failCount =
1389                 innerProcessInfo.upLoadInfo.total - innerProcessInfo.upLoadInfo.successCount;
1390         }
1391     }
1392 }
1393 
GetStoreIdByTask(TaskId taskId)1394 std::string CloudSyncer::GetStoreIdByTask(TaskId taskId)
1395 {
1396     std::lock_guard<std::mutex> autoLock(dataLock_);
1397     return cloudTaskInfos_[taskId].storeId;
1398 }
1399 
CleanKvCloudData(std::function<int (void)> & removeFunc)1400 int CloudSyncer::CleanKvCloudData(std::function<int(void)> &removeFunc)
1401 {
1402     hasKvRemoveTask = true;
1403     CloudSyncer::TaskId currentTask;
1404     {
1405         // stop task if exist
1406         std::lock_guard<std::mutex> autoLock(dataLock_);
1407         currentTask = currentContext_.currentTaskId;
1408     }
1409     if (currentTask != INVALID_TASK_ID) {
1410         StopAllTasks(-E_CLOUD_ERROR);
1411     }
1412     int errCode = E_OK;
1413     {
1414         std::lock_guard<std::mutex> lock(syncMutex_);
1415         errCode = removeFunc();
1416         hasKvRemoveTask = false;
1417     }
1418     if (errCode != E_OK) {
1419         LOGE("[CloudSyncer] removeFunc execute failed errCode: %d.", errCode);
1420     }
1421     return errCode;
1422 }
1423 
StopAllTasks(int errCode)1424 void CloudSyncer::StopAllTasks(int errCode)
1425 {
1426     CloudSyncer::TaskId currentTask;
1427     {
1428         std::lock_guard<std::mutex> autoLock(dataLock_);
1429         currentTask = currentContext_.currentTaskId;
1430     }
1431     // mark current task user_change
1432     SetTaskFailed(currentTask, errCode);
1433     UnlockIfNeed();
1434     WaitCurTaskFinished();
1435 
1436     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
1437     for (auto &info: infoList) {
1438         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with errCode %d, isPriority %d.", info.taskId, errCode,
1439             info.priorityTask);
1440         auto processNotifier = std::make_shared<ProcessNotifier>(this);
1441         processNotifier->Init(info.table, info.devices, info.users);
1442         info.errCode = errCode;
1443         info.status = ProcessStatus::FINISHED;
1444         processNotifier->NotifyProcess(info, {}, true);
1445     }
1446 }
1447 
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)1448 int CloudSyncer::TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
1449 {
1450     OpType strategyOpResult = OpType::NOT_HANDLE;
1451     int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
1452     if (errCode != E_OK) {
1453         return errCode;
1454     }
1455     param.downloadData.opType[idx] = strategyOpResult;
1456     if (!IsDataContainAssets()) {
1457         return E_OK;
1458     }
1459     Key hashKey;
1460     if (isExist) {
1461         hashKey = dataInfo.localInfo.logInfo.hashKey;
1462     }
1463     if (param.isAssetsOnly) {
1464         return strategyOpResult == OpType::LOCKED_NOT_HANDLE ?
1465             E_OK : TagDownloadAssetsForAssetOnly(hashKey, idx, param, dataInfo, localAssetInfo);
1466     }
1467     return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
1468 }
1469 
CloudDbBatchDownloadAssets(TaskId taskId,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets)1470 int CloudSyncer::CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList,
1471     const std::set<Key> &dupHashKeySet, InnerProcessInfo &info, ChangedData &changedAssets)
1472 {
1473     int errorCode = CheckTaskIdValid(taskId);
1474     if (errorCode != E_OK) {
1475         return errorCode;
1476     }
1477     bool isSharedTable = false;
1478     errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1479     if (errorCode != E_OK) {
1480         LOGE("[CloudSyncer] check is shared table failed %d", errorCode);
1481         return errorCode;
1482     }
1483     // prepare download data
1484     auto [downloadRecord, removeAssets, downloadAssets] =
1485         GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, IsAsyncDownloadAssets(taskId), info);
1486     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1487         std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1488     };
1489     return BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1490 }
1491 
FillDownloadItem(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,const InnerProcessInfo & info,bool isSharedTable,DownloadItems & record)1492 void CloudSyncer::FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1493     const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record)
1494 {
1495     CloudStorageUtils::EraseNoChangeAsset(record.downloadItem.assets);
1496     if (record.downloadItem.assets.empty()) { // Download data (include deleting)
1497         return;
1498     }
1499     if (isSharedTable) {
1500         // share table will not download asset, need to reset the status
1501         for (auto &entry: record.downloadItem.assets) {
1502             for (auto &asset: entry.second) {
1503                 asset.status = AssetStatus::NORMAL;
1504             }
1505         }
1506         return;
1507     }
1508     int errCode = E_OK;
1509     VBucket dbAssets;
1510     if (!IsNeedSkipDownload(isSharedTable, errCode, info, record.downloadItem, dbAssets)) {
1511         GetAssetsToRemove(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToRemove);
1512         GetAssetsToDownload(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToDownload, record.flags);
1513     }
1514 }
1515 
GetDownloadRecords(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,bool isSharedTable,bool isAsyncDownloadAssets,const InnerProcessInfo & info)1516 CloudSyncer::DownloadAssetDetail CloudSyncer::GetDownloadRecords(const DownloadList &downloadList,
1517     const std::set<Key> &dupHashKeySet, bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info)
1518 {
1519     DownloadItemRecords downloadRecord;
1520     RemoveAssetsRecords removeAssets;
1521     DownloadAssetsRecords downloadAssets;
1522     for (size_t i = 0; i < downloadList.size(); i++) {
1523         DownloadItems record;
1524         GetDownloadItem(downloadList, i, record.downloadItem);
1525         FillDownloadItem(dupHashKeySet, downloadList, info, isSharedTable, record);
1526 
1527         IAssetLoader::AssetRecord removeAsset = {
1528             record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToRemove)
1529         };
1530         removeAssets.push_back(std::move(removeAsset));
1531         if (isAsyncDownloadAssets) {
1532             record.assetsToDownload.clear();
1533         }
1534         IAssetLoader::AssetRecord downloadAsset = {
1535             record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToDownload)
1536         };
1537         downloadAssets.push_back(std::move(downloadAsset));
1538         downloadRecord.push_back(std::move(record));
1539     }
1540     return {downloadRecord, removeAssets, downloadAssets};
1541 }
1542 
BatchDownloadAndCommitRes(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets,std::tuple<DownloadItemRecords,RemoveAssetsRecords,DownloadAssetsRecords,bool> & downloadDetail)1543 int CloudSyncer::BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
1544     InnerProcessInfo &info, ChangedData &changedAssets,
1545     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail)
1546 {
1547     auto &[downloadRecord, removeAssets, downloadAssets, isSharedTable] = downloadDetail;
1548     // download and remove in batch
1549     auto deleteRes = cloudDB_.BatchRemoveLocalAssets(info.tableName, removeAssets);
1550     auto downloadRes = cloudDB_.BatchDownload(info.tableName, downloadAssets);
1551     if (deleteRes == -E_NOT_SET || downloadRes == -E_NOT_SET) {
1552         return -E_NOT_SET;
1553     }
1554     int errorCode = E_OK;
1555     int index = 0;
1556     for (auto &item : downloadRecord) {
1557         auto deleteCode = removeAssets[index].status == OK ? E_OK : -E_REMOVE_ASSETS_FAILED;
1558         auto downloadCode = CloudDBProxy::GetInnerErrorCode(downloadAssets[index].status);
1559         downloadCode = downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT ? E_OK : downloadCode;
1560         if (!isSharedTable) {
1561             item.downloadItem.assets = BackFillAssetsAfterDownload(downloadCode, deleteCode, item.flags,
1562                 downloadAssets[index].assets, removeAssets[index].assets);
1563         }
1564         StatisticDownloadRes(downloadAssets[index], removeAssets[index], info, item.downloadItem);
1565         AddNotifyDataFromDownloadAssets(dupHashKeySet, item.downloadItem, changedAssets);
1566         if (item.downloadItem.strategy == OpType::DELETE) {
1567             item.downloadItem.assets = {};
1568             item.downloadItem.gid = "";
1569         }
1570         // commit download res
1571         DownloadCommitList commitList;
1572         // Process result of each asset
1573         downloadCode = downloadAssets[index].status == SKIP_ASSET ? E_OK : downloadCode;
1574         commitList.push_back(std::make_tuple(item.downloadItem.gid, std::move(item.downloadItem.assets),
1575             deleteCode == E_OK && downloadCode == E_OK));
1576         errorCode = (errorCode != E_OK) ? errorCode : deleteCode;
1577         errorCode = (errorCode != E_OK) ? errorCode : downloadCode;
1578         int currErrorCode = (deleteCode != E_OK) ? deleteCode : downloadCode;
1579         int ret = CommitDownloadResult(item.downloadItem, info, commitList, currErrorCode);
1580         if (ret != E_OK) {
1581             errorCode = errorCode == E_OK ? ret : errorCode;
1582         }
1583         index++;
1584     }
1585     storageProxy_->PrintCursorChange(info.tableName);
1586     return errorCode;
1587 }
1588 
StatisticDownloadRes(const IAssetLoader::AssetRecord & downloadRecord,const IAssetLoader::AssetRecord & removeRecord,InnerProcessInfo & info,DownloadItem & downloadItem)1589 void CloudSyncer::StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
1590     const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem)
1591 {
1592     if ((downloadRecord.status == OK || downloadRecord.status == SKIP_ASSET) && (removeRecord.status == OK)) {
1593         return;
1594     }
1595     if ((downloadRecord.status == CLOUD_RECORD_EXIST_CONFLICT) ||
1596         (removeRecord.status == CLOUD_RECORD_EXIST_CONFLICT)) {
1597         downloadItem.recordConflict = true;
1598         return;
1599     }
1600     info.downLoadInfo.failCount += 1;
1601     if (info.downLoadInfo.successCount == 0) {
1602         LOGW("[CloudSyncer] Invalid successCount");
1603     } else {
1604         info.downLoadInfo.successCount -= 1;
1605     }
1606 }
1607 
AddNotifyDataFromDownloadAssets(const std::set<Key> & dupHashKeySet,DownloadItem & downloadItem,ChangedData & changedAssets)1608 void CloudSyncer::AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
1609     ChangedData &changedAssets)
1610 {
1611     if (downloadItem.assets.empty()) {
1612         return;
1613     }
1614     if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1615         if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
1616             LOGW("[CloudSyncer] [AddNotifyDataFromDownloadAssets] strategy is invalid.");
1617         } else {
1618             changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1619                 downloadItem.primaryKeyValList);
1620         }
1621     } else if (downloadItem.strategy == OpType::INSERT) {
1622         changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1623     }
1624 }
1625 
CheckDataAfterDownload(const std::string & tableName)1626 void CloudSyncer::CheckDataAfterDownload(const std::string &tableName)
1627 {
1628     int dataCount = 0;
1629     int logicDeleteDataCount = 0;
1630     int errCode = storageProxy_->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
1631     if (errCode == E_OK) {
1632         LOGI("[CloudSyncer] Check local data after download[%s[%u]], data count: %d, logic delete data count: %d",
1633             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), dataCount, logicDeleteDataCount);
1634     } else {
1635         LOGW("[CloudSyncer] Get local data after download fail: %d", errCode);
1636     }
1637 }
1638 
WaitCurTaskFinished()1639 void CloudSyncer::WaitCurTaskFinished()
1640 {
1641     CancelBackgroundDownloadAssetsTask();
1642     std::unique_lock<std::mutex> uniqueLock(dataLock_);
1643     if (currentContext_.currentTaskId != INVALID_TASK_ID) {
1644         LOGI("[CloudSyncer] begin wait current task %" PRIu64 " finished", currentContext_.currentTaskId);
1645         contextCv_.wait(uniqueLock, [this]() {
1646             return currentContext_.currentTaskId == INVALID_TASK_ID;
1647         });
1648         LOGI("[CloudSyncer] current task has been finished");
1649     }
1650 }
1651 
IsAsyncDownloadAssets(TaskId taskId)1652 bool CloudSyncer::IsAsyncDownloadAssets(TaskId taskId)
1653 {
1654     std::lock_guard<std::mutex> autoLock(dataLock_);
1655     return cloudTaskInfos_[taskId].asyncDownloadAssets;
1656 }
1657 
TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)1658 void CloudSyncer::TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)
1659 {
1660     {
1661         std::lock_guard<std::mutex> autoLock(dataLock_);
1662         if (!cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets) {
1663             return;
1664         }
1665     }
1666     if (isFirstDownload) {
1667         std::lock_guard<std::mutex> autoLock(dataLock_);
1668         if (currentContext_.isNeedUpload) {
1669             return;
1670         }
1671     }
1672     TriggerAsyncDownloadAssetsIfNeed();
1673 }
1674 
TriggerAsyncDownloadAssetsIfNeed()1675 void CloudSyncer::TriggerAsyncDownloadAssetsIfNeed()
1676 {
1677     if (!storageProxy_->IsExistTableContainAssets()) {
1678         LOGD("[CloudSyncer] No exist table contain assets, skip async download asset check");
1679         return;
1680     }
1681     TaskId taskId = INVALID_TASK_ID;
1682     {
1683         std::lock_guard<std::mutex> autoLock(dataLock_);
1684         if (asyncTaskId_ != INVALID_TASK_ID || closed_) {
1685             LOGI("[CloudSyncer] No need generate async task now asyncTaskId %" PRIu64 " closed %d",
1686                 static_cast<int>(asyncTaskId_), static_cast<int>(closed_));
1687             return;
1688         }
1689         lastTaskId_--;
1690         if (lastTaskId_ == INVALID_TASK_ID) {
1691             lastTaskId_ = UINT64_MAX;
1692         }
1693         asyncTaskId_ = lastTaskId_;
1694         taskId = asyncTaskId_;
1695         IncObjRef(this);
1696     }
1697     int errCode = RuntimeContext::GetInstance()->ScheduleTask([taskId, this]() {
1698         LOGI("[CloudSyncer] Exec asyncTaskId %" PRIu64 " begin", taskId);
1699         BackgroundDownloadAssetsTask();
1700         LOGI("[CloudSyncer] Exec asyncTaskId %" PRIu64 " finished", taskId);
1701         {
1702             std::lock_guard<std::mutex> autoLock(dataLock_);
1703             asyncTaskId_ = INVALID_TASK_ID;
1704         }
1705         asyncTaskCv_.notify_all();
1706         DecObjRef(this);
1707     });
1708     if (errCode == E_OK) {
1709         LOGI("[CloudSyncer] Schedule asyncTaskId %" PRIu64 " success", taskId);
1710     } else {
1711         LOGW("[CloudSyncer] Schedule BackgroundDownloadAssetsTask failed %d", errCode);
1712         DecObjRef(this);
1713     }
1714 }
1715 
BackgroundDownloadAssetsTask()1716 void CloudSyncer::BackgroundDownloadAssetsTask()
1717 {
1718     // remove listener first
1719     CancelDownloadListener();
1720     // add download count and register listener if failed
1721     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
1722     IncObjRef(this);
1723     auto [errCode, listener] = manager->BeginDownloadWithListener([this](void *) {
1724         TriggerAsyncDownloadAssetsIfNeed();
1725     }, [this]() {
1726         DecObjRef(this);
1727     });
1728     if (errCode == E_OK) {
1729         // increase download count success
1730         CancelDownloadListener();
1731         DoBackgroundDownloadAssets();
1732         RuntimeContext::GetInstance()->GetAssetsDownloadManager()->FinishDownload();
1733         DecObjRef(this);
1734         return;
1735     }
1736     if (listener != nullptr) {
1737         std::lock_guard<std::mutex> autoLock(listenerMutex_);
1738         waitDownloadListener_ = listener;
1739         return;
1740     }
1741     LOGW("[CloudSyncer] BeginDownloadWithListener failed %d", errCode);
1742     DecObjRef(this);
1743 }
1744 
CancelDownloadListener()1745 void CloudSyncer::CancelDownloadListener()
1746 {
1747     NotificationChain::Listener *waitDownloadListener = nullptr;
1748     {
1749         std::lock_guard<std::mutex> autoLock(listenerMutex_);
1750         if (waitDownloadListener_ != nullptr) {
1751             waitDownloadListener = waitDownloadListener_;
1752             waitDownloadListener_ = nullptr;
1753         }
1754     }
1755     if (waitDownloadListener != nullptr) {
1756         waitDownloadListener->Drop(true);
1757         waitDownloadListener = nullptr;
1758     }
1759 }
1760 
DoBackgroundDownloadAssets()1761 void CloudSyncer::DoBackgroundDownloadAssets()
1762 {
1763     bool allDownloadFinish = true;
1764     std::map<std::string, int64_t> downloadBeginTime;
1765     do {
1766         auto [errCode, tables] = storageProxy_->GetDownloadAssetTable();
1767         if (errCode != E_OK) {
1768             LOGE("[CloudSyncer] Get download asset table failed %d", errCode);
1769             return;
1770         }
1771         allDownloadFinish = true;
1772         std::list<std::string> tableQueue(tables.begin(), tables.end());
1773         while (!tableQueue.empty()) {
1774             if (cancelAsyncTask_ || closed_) {
1775                 LOGW("[CloudSyncer] exit task by cancel %d closed %d", static_cast<int>(cancelAsyncTask_),
1776                     static_cast<int>(closed_));
1777                 return;
1778             }
1779             errCode = BackgroundDownloadAssetsByTable(tableQueue.front(), downloadBeginTime);
1780             if (errCode == E_OK) {
1781                 allDownloadFinish = false;
1782             } else if (errCode == -E_FINISHED) {
1783                 tableQueue.pop_front();
1784                 errCode = E_OK;
1785             } else {
1786                 LOGW("[CloudSyncer] BackgroundDownloadAssetsByTable table %s failed %d",
1787                     DBCommon::StringMiddleMasking(tableQueue.front()).c_str(), errCode);
1788                 allDownloadFinish = false;
1789             }
1790         }
1791     } while (!allDownloadFinish);
1792 }
1793 
CancelBackgroundDownloadAssetsTaskIfNeed()1794 void CloudSyncer::CancelBackgroundDownloadAssetsTaskIfNeed()
1795 {
1796     bool cancelDownload = true;
1797     {
1798         std::unique_lock<std::mutex> uniqueLock(dataLock_);
1799         if (cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets || asyncTaskId_ == INVALID_TASK_ID) {
1800             return;
1801         }
1802         if (cloudTaskInfos_[currentContext_.currentTaskId].compensatedTask) {
1803             cancelDownload = false;
1804         }
1805     }
1806     CancelBackgroundDownloadAssetsTask(cancelDownload);
1807 }
1808 
CancelBackgroundDownloadAssetsTask(bool cancelDownload)1809 void CloudSyncer::CancelBackgroundDownloadAssetsTask(bool cancelDownload)
1810 {
1811     cancelAsyncTask_ = true;
1812     if (cancelDownload) {
1813         cloudDB_.CancelDownload();
1814     }
1815     std::unique_lock<std::mutex> uniqueLock(dataLock_);
1816     if (asyncTaskId_ != INVALID_TASK_ID) {
1817         LOGI("[CloudSyncer] begin wait async download task % " PRIu64 " finished", asyncTaskId_);
1818         asyncTaskCv_.wait(uniqueLock, [this]() {
1819             return asyncTaskId_ == INVALID_TASK_ID;
1820         });
1821         LOGI("[CloudSyncer] async download task has been finished");
1822     }
1823     cancelAsyncTask_ = false;
1824 }
1825 
BackgroundDownloadAssetsByTable(const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)1826 int CloudSyncer::BackgroundDownloadAssetsByTable(const std::string &table,
1827     std::map<std::string, int64_t> &downloadBeginTime)
1828 {
1829     auto [errCode, downloadData] = storageProxy_->GetDownloadAssetRecords(table, downloadBeginTime[table]);
1830     if (errCode != E_OK) {
1831         return errCode;
1832     }
1833     if (downloadData.empty()) {
1834         LOGD("[CloudSyncer] table %s async download finished", DBCommon::StringMiddleMasking(table).c_str());
1835         return -E_FINISHED;
1836     }
1837 
1838     bool isSharedTable = false;
1839     errCode = storageProxy_->IsSharedTable(table, isSharedTable);
1840     if (errCode != E_OK) {
1841         LOGE("[CloudSyncer] check is shared table failed %d", errCode);
1842         return errCode;
1843     }
1844     DownloadList downloadList;
1845     ChangedData changedAssets;
1846     std::tie(errCode, downloadList, changedAssets) = CloudSyncUtils::GetDownloadListByGid(storageProxy_, downloadData,
1847         table);
1848     if (errCode != E_OK) {
1849         return errCode;
1850     }
1851     CloudSyncUtils::UpdateMaxTimeWithDownloadList(downloadList, table, downloadBeginTime);
1852     std::set<Key> dupHashKeySet;
1853     InnerProcessInfo info;
1854     info.tableName = table;
1855     info.isAsyncDownload = true;
1856 
1857     // prepare download data
1858     auto [downloadRecord, removeAssets, downloadAssets] =
1859         GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, false, info);
1860     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1861         std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1862     };
1863     errCode = BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1864     NotifyChangedDataWithDefaultDev(std::move(changedAssets));
1865     return errCode;
1866 }
1867 
TagDownloadAssetsForAssetOnly(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)1868 int CloudSyncer::TagDownloadAssetsForAssetOnly(
1869     const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo, VBucket &localAssetInfo)
1870 {
1871     Type prefix;
1872     std::vector<Type> pkVals;
1873     int ret = CloudSyncUtils::GetCloudPkVals(
1874         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
1875     if (ret != E_OK) {
1876         // if get pk vals failed, mean cloud data is deteled.
1877         LOGE("[CloudSyncer] TagDownloadAssetsForAssetOnly cannot get primary key value list. %d",
1878             -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1879         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1880     }
1881     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
1882     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
1883         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
1884         return -E_INTERNAL_ERROR;
1885     }
1886     std::map<std::string, Assets> downloadAssetsMap{};
1887     ret = CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(idx, param, downloadAssetsMap);
1888     if (ret != E_OK) {
1889         return ret;
1890     }
1891     param.assetsDownloadList.push_back(
1892         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, OpType::UPDATE, downloadAssetsMap, hashKey,
1893         pkVals, dataInfo.cloudLogInfo.timestamp));
1894     return ret;
1895 }
1896 
PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam & param,std::vector<VBucket> & localInfo)1897 int CloudSyncer::PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam &param, std::vector<VBucket> &localInfo)
1898 {
1899     int ret = E_OK;
1900     if (param.isAssetsOnly) {
1901         // download and save only asset, ignore other data.
1902         for (auto &item : localInfo) {
1903             ret = storageProxy_->UpdateAssetStatusForAssetOnly(param.tableName, item);
1904             if (ret != E_OK) {
1905                 LOGE("[CloudSyncer] Cannot save asset data due to error code %d", ret);
1906                 return ret;
1907             }
1908         }
1909     } else {
1910         ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
1911         if (ret != E_OK) {
1912             param.info.downLoadInfo.failCount += param.downloadData.data.size();
1913             LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
1914         }
1915     }
1916     return ret;
1917 }
1918 
QueryCloudGidForAssetsOnly(TaskId taskId,SyncParam & param,int64_t groupIdx,std::vector<std::string> & cloudGid)1919 int CloudSyncer::QueryCloudGidForAssetsOnly(
1920     TaskId taskId, SyncParam &param, int64_t groupIdx, std::vector<std::string> &cloudGid)
1921 {
1922     auto tableName = param.info.tableName;
1923     QuerySyncObject syncObj = GetQuerySyncObject(tableName);
1924     VBucket extend = {{CloudDbConstant::CURSOR_FIELD, param.cloudWaterMarkForAssetsOnly}};
1925     QuerySyncObject obj;
1926     int ret = syncObj.GetQuerySyncObjectFromGroup(groupIdx, obj);
1927     if (ret != E_OK) {
1928         LOGE("Get query obj from group fail, errCode = %d", ret);
1929         return ret;
1930     }
1931     ret = GetCloudGid(taskId, tableName, obj, cloudGid);
1932     if (ret != E_OK) {
1933         LOGE("Get cloud gid fail, errCode = %d", ret);
1934     }
1935     return ret;
1936 }
1937 
GetEmptyGidAssetsMapFromDownloadData(const std::vector<VBucket> & data,std::map<std::string,AssetsMap> & gidAssetsMap)1938 int CloudSyncer::GetEmptyGidAssetsMapFromDownloadData(
1939     const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap)
1940 {
1941     for (uint32_t i = 0; i < data.size(); i++) {
1942         std::string gidStr;
1943         int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, data[i], gidStr);
1944         if (errCode != E_OK) {
1945             LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1946             return errCode;
1947         }
1948         gidAssetsMap[gidStr] = AssetsMap{};
1949     }
1950     return E_OK;
1951 }
1952 
SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId,SyncParam & param,std::vector<VBucket> & downloadData,std::map<std::string,AssetsMap> & gidAssetsMap)1953 int CloudSyncer::SetAssetsMapAndEraseDataForAssetsOnly(
1954     TaskId taskId, SyncParam &param, std::vector<VBucket> &downloadData, std::map<std::string, AssetsMap> &gidAssetsMap)
1955 {
1956     for (uint32_t i = 0; i < param.groupNum; i++) {
1957         std::vector<std::string> cloudGid;
1958         int ret = QueryCloudGidForAssetsOnly(taskId, param, i, cloudGid);
1959         if ((ret != E_OK && ret != -E_QUERY_END) || cloudGid.empty()) {
1960             LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1961             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1962         }
1963         if (!CloudSyncUtils::SetAssetsMapByCloudGid(cloudGid, param.assetsGroupMap[i], gidAssetsMap)) {
1964             // if group no match data, return error code.
1965             LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1966             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1967         }
1968     }
1969     for (auto iter = downloadData.begin(); iter != downloadData.end();) {
1970         std::string gidStr;
1971         int ret = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, *iter, gidStr);
1972         if (ret != E_OK) {
1973             LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", ret);
1974             return ret;
1975         }
1976 
1977         if (DBCommon::IsRecordDelete(*iter)) {
1978             iter = downloadData.erase(iter);
1979             gidAssetsMap.erase(gidStr);
1980             continue;
1981         }
1982 
1983         auto assetsMap = gidAssetsMap[gidStr];
1984         if (!CloudSyncUtils::IsAssetOnlyData(*iter, assetsMap, false)) {
1985             iter = downloadData.erase(iter);
1986             gidAssetsMap.erase(gidStr);
1987             continue;
1988         }
1989         ++iter;
1990     }
1991     return E_OK;
1992 }
1993 
CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId,SyncParam & param)1994 int CloudSyncer::CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam &param)
1995 {
1996     {
1997         std::lock_guard<std::mutex> autoLock(dataLock_);
1998         if (!param.isAssetsOnly || cloudTaskInfos_[taskId].compensatedTask) {
1999             return E_OK;
2000         }
2001         if (!param.isVaildForAssetsOnly) {
2002             param.downloadData.data.clear();
2003             return E_OK;
2004         }
2005         cloudTaskInfos_[taskId].isAssetsOnly = param.isAssetsOnly;
2006         cloudTaskInfos_[taskId].groupNum = param.groupNum;
2007         cloudTaskInfos_[taskId].assetsGroupMap = param.assetsGroupMap;
2008     }
2009 
2010     std::vector<VBucket> &downloadData = param.downloadData.data;
2011     auto &gidAssetsMap = param.gidAssetsMap;
2012     gidAssetsMap.clear();
2013     int ret = GetEmptyGidAssetsMapFromDownloadData(downloadData, gidAssetsMap);
2014     if (ret != E_OK) {
2015         return ret;
2016     }
2017 
2018     // set assets map for every record and erase not match data.
2019     ret = SetAssetsMapAndEraseDataForAssetsOnly(taskId, param, downloadData, gidAssetsMap);
2020     if (ret != E_OK) {
2021         return ret;
2022     }
2023 
2024     for (uint32_t i = 0; i < param.groupNum; i++) {
2025         bool isEmpty = CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(gidAssetsMap, param.assetsGroupMap[i]);
2026         if (isEmpty) {
2027             LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2028             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2029         }
2030     }
2031     return E_OK;
2032 }
2033 
CheckLocalQueryAssetsOnlyIfNeed(VBucket & localAssetInfo,SyncParam & param,DataInfoWithLog & logInfo)2034 int CloudSyncer::CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam &param, DataInfoWithLog &logInfo)
2035 {
2036     if (!param.isAssetsOnly) {
2037         return E_OK;
2038     }
2039     std::string gid = logInfo.logInfo.cloudGid;
2040     auto iter = param.gidAssetsMap.find(gid);
2041     if (iter == param.gidAssetsMap.end()) {
2042         LOGE("query assets failed, error code:%d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2043         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2044     }
2045 
2046     auto assetsMap = iter->second;
2047     if (!CloudSyncUtils::IsAssetOnlyData(localAssetInfo, assetsMap, true)) {
2048         LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2049         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2050     }
2051     return E_OK;
2052 }
2053 
IsCurrentAsyncDownloadTask()2054 bool CloudSyncer::IsCurrentAsyncDownloadTask()
2055 {
2056     std::lock_guard<std::mutex> autoLock(dataLock_);
2057     return cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets;
2058 }
2059 
CanStartAsyncDownload() const2060 bool CloudSyncer::CanStartAsyncDownload() const
2061 {
2062     if (!RuntimeContext::GetInstance()->GetAssetsDownloadManager()->CanStartNewTask()) {
2063         LOGW("[CloudSyncer] Too many download tasks");
2064         return false;
2065     }
2066     return asyncTaskId_ == INVALID_TASK_ID;
2067 }
2068 
NotifyChangedDataWithDefaultDev(ChangedData && changedData)2069 void CloudSyncer::NotifyChangedDataWithDefaultDev(ChangedData &&changedData)
2070 {
2071     auto table = changedData.tableName;
2072     int ret = CloudSyncUtils::NotifyChangeData(CloudDbConstant::DEFAULT_CLOUD_DEV, storageProxy_,
2073         std::move(changedData));
2074     if (ret != E_OK) {
2075         LOGW("[CloudSyncer] Notify %s change data failed %d", DBCommon::StringMiddleMasking(table).c_str(), ret);
2076     }
2077 }
2078 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2079 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2080 {
2081     cloudDB_.SetGenCloudVersionCallback(callback);
2082 }
2083 
GetDownloadAssetIndex(TaskId taskId)2084 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2085 {
2086     size_t index = 0u;
2087     std::lock_guard<std::mutex> autoLock(dataLock_);
2088     if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2089         index = resumeTaskInfos_[taskId].lastDownloadIndex;
2090         resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2091     }
2092     return index;
2093 }
2094 
GetCurrentTableUploadBatchIndex()2095 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2096 {
2097     std::lock_guard<std::mutex> autoLock(dataLock_);
2098     return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2099 }
2100 
ResetCurrentTableUploadBatchIndex()2101 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2102 {
2103     std::lock_guard<std::mutex> autoLock(dataLock_);
2104     if (currentContext_.notifier == nullptr) {
2105         return;
2106     }
2107     currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2108 }
2109 
RecordWaterMark(TaskId taskId,Timestamp waterMark)2110 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2111 {
2112     std::lock_guard<std::mutex> autoLock(dataLock_);
2113     resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2114 }
2115 
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2116 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2117 {
2118     InnerProcessInfo info;
2119     info.tableName = tableName;
2120     info.tableStatus = ProcessStatus::PROCESSING;
2121     ReloadUploadInfoIfNeed(uploadParam, info);
2122     return info;
2123 }
2124 
CopyAndClearTaskInfos()2125 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2126 {
2127     std::vector<CloudTaskInfo> infoList;
2128     std::lock_guard<std::mutex> autoLock(dataLock_);
2129     for (const auto &item: cloudTaskInfos_) {
2130         infoList.push_back(item.second);
2131     }
2132     taskQueue_.clear();
2133     cloudTaskInfos_.clear();
2134     resumeTaskInfos_.clear();
2135     currentContext_.notifier = nullptr;
2136     return infoList;
2137 }
2138 
TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)2139 bool CloudSyncer::TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)
2140 {
2141     std::vector<QuerySyncObject> syncQuery;
2142     std::vector<std::string> users;
2143     int errCode =
2144         CloudSyncUtils::GetQueryAndUsersForCompensatedSync(CanStartAsyncDownload(), storageProxy_, users, syncQuery);
2145     if (errCode != E_OK) {
2146         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
2147         // if failed, finshed the task directly.
2148         DoFinished(triggerTaskId, errCode);
2149         return false;
2150     }
2151     if (syncQuery.empty()) {
2152         // if quey is empty, finshed the task directly.
2153         DoFinished(triggerTaskId, E_OK);
2154         return false;
2155     }
2156     std::vector<std::string> userList;
2157     CloudSyncUtils::GetUserListForCompensatedSync(cloudDB_, users, userList);
2158     std::lock_guard<std::mutex> autoLock(dataLock_);
2159     if (cloudTaskInfos_.find(triggerTaskId) == cloudTaskInfos_.end()) {
2160         return false;
2161     }
2162     cloudTaskInfos_[triggerTaskId].users = userList;
2163     cloudTaskInfos_[triggerTaskId].table.clear();
2164     cloudTaskInfos_[triggerTaskId].queryList.clear();
2165     cloudTaskInfos_[triggerTaskId].table.push_back(syncQuery[0].GetRelationTableName());
2166     cloudTaskInfos_[triggerTaskId].queryList.push_back(syncQuery[0]);
2167     return true;
2168 }
2169 }