• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "store_types.h"
31 #include "strategy_factory.h"
32 #include "version.h"
33 
34 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)35 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
36 {
37     Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
38     waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
39     RecordWaterMark(taskId, 0u);
40 }
41 
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)42 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
43 {
44     std::lock_guard<std::mutex> autoLock(dataLock_);
45     std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
46     cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
47 }
48 
ReloadUploadInfoIfNeed(const UploadParam & param,InnerProcessInfo & info)49 void CloudSyncer::ReloadUploadInfoIfNeed(const UploadParam &param, InnerProcessInfo &info)
50 {
51     info.upLoadInfo.total = static_cast<uint32_t>(param.count);
52     Info lastUploadInfo;
53     UploadRetryInfo retryInfo;
54     GetLastUploadInfo(info.tableName, lastUploadInfo, retryInfo);
55     info.upLoadInfo.total += lastUploadInfo.successCount;
56     info.upLoadInfo.successCount += lastUploadInfo.successCount;
57     info.upLoadInfo.failCount += lastUploadInfo.failCount;
58     info.upLoadInfo.insertCount += lastUploadInfo.insertCount;
59     info.upLoadInfo.updateCount += lastUploadInfo.updateCount;
60     info.upLoadInfo.deleteCount += lastUploadInfo.deleteCount;
61     info.retryInfo.uploadBatchRetryCount = retryInfo.uploadBatchRetryCount;
62     LOGD("[CloudSyncer] resume upload, last success count %" PRIu32 ", last fail count %" PRIu32,
63         lastUploadInfo.successCount, lastUploadInfo.failCount);
64 }
65 
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,UploadRetryInfo & retryInfo)66 void CloudSyncer::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo)
67 {
68     std::lock_guard<std::mutex> autoLock(dataLock_);
69     return currentContext_.notifier->GetLastUploadInfo(tableName, lastUploadInfo, retryInfo);
70 }
71 
GetCloudGidAndFillExtend(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,VBucket & extend)72 int CloudSyncer::GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj,
73     VBucket &extend)
74 {
75     int errCode = GetCloudGid(taskId, tableName, obj);
76     if (errCode != E_OK) {
77         LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
78         return errCode;
79     }
80     return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
81 }
82 
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)83 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
84     VBucket &extend)
85 {
86     extend = {
87         {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
88     };
89 
90     QuerySyncObject obj = GetQuerySyncObject(tableName);
91     if (obj.IsContainQueryNodes()) {
92         return GetCloudGidAndFillExtend(taskId, tableName, obj, extend);
93     }
94     if (IsCompensatedTask(taskId)) {
95         int errCode = GetCloudGid(taskId, tableName, obj);
96         if (errCode != E_OK) {
97             return errCode;
98         }
99         if (obj.IsContainQueryNodes()) {
100             return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
101         }
102     }
103     extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
104     return E_OK;
105 }
106 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)107 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
108 {
109     std::vector<std::string> cloudGid;
110     int errCode = storageProxy_->GetCloudGid(obj, IsCloudForcePush(taskId), IsCompensatedTask(taskId), cloudGid);
111     if (errCode != E_OK) {
112         LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
113     } else if (!cloudGid.empty()) {
114         obj.SetCloudGid(cloudGid);
115     }
116     LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
117     return errCode;
118 }
119 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,std::vector<std::string> & cloudGid)120 int CloudSyncer::GetCloudGid(
121     TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid)
122 {
123     int errCode = storageProxy_->GetCloudGid(obj, IsCloudForcePush(taskId), IsCompensatedTask(taskId), cloudGid);
124     if (errCode != E_OK) {
125         LOGE("[CloudSyncer] Failed to get cloud gid, taskid:%" PRIu64 ", table name: %s, length: %zu, %d.",
126             taskId, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode);
127     }
128     return errCode;
129 }
130 
GetQuerySyncObject(const std::string & tableName)131 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
132 {
133     std::lock_guard<std::mutex> autoLock(dataLock_);
134     for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
135         if (item.GetTableName() == tableName) {
136             return item;
137         }
138     }
139     LOGW("[CloudSyncer] not found query in cache");
140     QuerySyncObject querySyncObject;
141     querySyncObject.SetTableName(tableName);
142     return querySyncObject;
143 }
144 
UpdateProcessWhenUploadFailed(InnerProcessInfo & info)145 void CloudSyncer::UpdateProcessWhenUploadFailed(InnerProcessInfo &info)
146 {
147     info.tableStatus = ProcessStatus::FINISHED;
148     std::lock_guard<std::mutex> autoLock(dataLock_);
149     currentContext_.notifier->UpdateProcess(info);
150     currentContext_.notifier->UpdateUploadRetryInfo(info);
151 }
152 
NotifyUploadFailed(int errCode,InnerProcessInfo & info)153 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
154 {
155     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
156         LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
157     } else {
158         LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
159         info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
160     }
161     UpdateProcessWhenUploadFailed(info);
162 }
163 
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)164 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
165 {
166     return BatchInsertOrUpdate(insertInfo, uploadData, innerProcessInfo, true);
167 }
168 
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)169 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
170 {
171     return BatchInsertOrUpdate(updateInfo, uploadData, innerProcessInfo, false);
172 }
173 
BatchInsertOrUpdate(Info & uploadInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo,bool isInsert)174 int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
175     bool isInsert)
176 {
177     uint32_t retryCount = 0;
178     int errCode = E_OK;
179     if (isInsert) {
180         errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
181             uploadData.insData.extend, uploadInfo, retryCount);
182         innerProcessInfo.upLoadInfo.insertCount += uploadInfo.successCount;
183     } else {
184         errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
185             uploadData.updData.extend, uploadInfo, retryCount);
186         innerProcessInfo.upLoadInfo.updateCount += uploadInfo.successCount;
187     }
188     innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount;
189     innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount;
190     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
191         ProcessVersionConflictInfo(innerProcessInfo, retryCount);
192     }
193     if (errCode != E_OK) {
194         LOGE("[CloudSyncer][BatchInsertOrUpdate] BatchInsertOrUpdate with error, ret is %d.", errCode);
195     }
196     if (uploadData.isCloudVersionRecord) {
197         return errCode;
198     }
199     return BackFillAfterBatchUpload(uploadData, isInsert, errCode);
200 }
201 
BackFillAfterBatchUpload(CloudSyncData & uploadData,bool isInsert,int batchUploadResult)202 int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult)
203 {
204     bool isSharedTable = false;
205     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
206     if (ret != E_OK) {
207         LOGE("[CloudSyncer] BackFillAfterBatchUpload cannot judge the table is shared table. %d", ret);
208         return ret;
209     }
210     int errCode = batchUploadResult;
211     if (!isSharedTable) {
212         CloudWaterType type = isInsert ? CloudWaterType::INSERT : CloudWaterType::UPDATE;
213         CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
214         ret = CloudSyncUtils::FillAssetIdToAssets(data, errCode, type);
215         if (ret != E_OK) {
216             LOGW("[CloudSyncer][BackFillAfterBatchUpload] FillAssetIdToAssets with error, ret is %d.", ret);
217         }
218     }
219     OpType opType = isInsert ? OpType::INSERT : OpType::UPDATE;
220     if (errCode != E_OK) {
221         storageProxy_->FillCloudGidIfSuccess(opType, uploadData);
222         CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
223         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend);
224         if (isSkip) {
225             LOGI("[CloudSyncer][BackFillAfterBatchUpload] Try to FillCloudLogAndAsset when assets missing: %d",
226                 errCode);
227             return E_OK;
228         } else {
229             LOGE("[CloudSyncer][BackFillAfterBatchUpload] errCode: %d, can not skip assets missing record.", errCode);
230             return errCode;
231         }
232     }
233     int errorCode = storageProxy_->FillCloudLogAndAsset(opType, uploadData);
234     if ((errorCode != E_OK) || (ret != E_OK)) {
235         LOGE("[CloudSyncer] Failed to fill back when doing upload insData or updData, %d.", errorCode);
236         return ret == E_OK ? errorCode : ret;
237     }
238     return E_OK;
239 }
240 
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)241 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
242     std::map<std::string, Assets> &downloadAssets)
243 {
244     bool isSharedTable = false;
245     int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
246     if (errCode != E_OK) {
247         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
248         return errCode;
249     }
250     int transactionCode = E_OK;
251     // shared table don't download, so just begin transaction once
252     if (isSharedTable) {
253         transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
254     }
255     if (transactionCode != E_OK) {
256         LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
257         return transactionCode;
258     }
259     errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
260     if (isSharedTable) {
261         transactionCode = storageProxy_->Commit();
262         if (transactionCode != E_OK) {
263             LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
264         }
265     }
266     return (errCode == E_OK) ? transactionCode : errCode;
267 }
268 
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)269 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
270     const DownloadItem &downloadItem, VBucket &dbAssets)
271 {
272     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
273     auto &errCode = res.first;
274     if (!isSharedTable) {
275         errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
276     }
277     if (errCode != E_OK) {
278         LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
279         return res;
280     }
281     res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
282         downloadItem.hashKey, dbAssets);
283     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
284         if (errCode != -E_CLOUD_GID_MISMATCH) {
285             LOGE("[CloudSyncer] get assets from db failed %d", errCode);
286         }
287         if (!isSharedTable) {
288             (void)storageProxy_->Rollback();
289         }
290         return res;
291     }
292     if (!isSharedTable) {
293         errCode = storageProxy_->Commit();
294     }
295     if (errCode != E_OK) {
296         LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
297     }
298     return res;
299 }
300 
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)301 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
302     std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
303     std::map<std::string, Assets> &tmpAssetsToDelete)
304 {
305     std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
306     for (auto &[col, assets] : tmpAssetsToDownload) {
307         int i = 0;
308         for (auto &asset : assets) {
309             asset.flag = tmpFlags[col][i++];
310             if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
311                 continue;
312             }
313             if (downloadCode == E_OK) {
314                 asset.status = NORMAL;
315             } else {
316                 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
317             }
318         }
319     }
320     for (auto &[col, assets] : tmpAssetsToDelete) {
321         for (auto &asset : assets) {
322             asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
323             if (deleteCode == E_OK) {
324                 asset.status = NORMAL;
325             } else {
326                 asset.status = ABNORMAL;
327             }
328             downloadAssets[col].push_back(asset);
329         }
330     }
331     return downloadAssets;
332 }
333 
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)334 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
335     const DownloadItem &downloadItem, VBucket &dbAssets)
336 {
337     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK)};
338     auto &ret = res.first;
339     auto &status = res.second;
340     if (info.isAsyncDownload) {
341         res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
342             downloadItem.hashKey, dbAssets);
343     } else {
344         res = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
345     }
346     if (ret == -E_CLOUD_GID_MISMATCH) {
347         LOGW("[CloudSyncer] skip download asset because gid mismatch");
348         errCode = E_OK;
349         return true;
350     }
351     if (CloudStorageUtils::IsDataLocked(status)) {
352         LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
353         errCode = E_OK;
354         return true;
355     }
356     if (ret != E_OK) {
357         LOGE("[CloudSyncer]%s download get assets from DB failed: %d, return errCode: %d",
358             info.isAsyncDownload ? "Async" : "Sync", ret, errCode);
359         errCode = (errCode != E_OK) ? errCode : ret;
360         return true;
361     }
362     return false;
363 }
364 
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)365 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
366 {
367     if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
368         downloadItem.recordConflict = true;
369         errCode = E_OK;
370         return false;
371     }
372     errCode = (errCode != E_OK) ? errCode : deleteCode;
373     errCode = (errCode != E_OK) ? errCode : downloadCode;
374     if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
375         return false;
376     }
377     return true;
378 }
379 
GetAssetsToDownload(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToDownload,std::map<std::string,std::vector<uint32_t>> & tmpFlags)380 void GetAssetsToDownload(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
381     std::map<std::string, Assets> &assetsToDownload, std::map<std::string, std::vector<uint32_t>> &tmpFlags)
382 {
383     if (isSharedTable) {
384         LOGD("[CloudSyncer] skip download for shared table");
385         return;
386     }
387     for (auto &[col, assets] : downloadAssets) {
388         for (auto &asset : assets) {
389             if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE) &&
390                 AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
391                 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
392                 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
393                     static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
394                 assetsToDownload[col].push_back(asset);
395                 tmpFlags[col].push_back(asset.flag);
396             } else {
397                 LOGD("[CloudSyncer] skip download asset...");
398             }
399         }
400     }
401 }
402 
GetAssetsToRemove(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToRemove)403 void GetAssetsToRemove(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
404     std::map<std::string, Assets> &assetsToRemove)
405 {
406     if (isSharedTable) {
407         LOGD("[CloudSyncer] skip remove for shared table");
408         return;
409     }
410     for (auto &[col, assets] : downloadAssets) {
411         for (auto &asset : assets) {
412             if (asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) &&
413                 AssetOperationUtils::CalAssetRemoveOperation(col, asset, dbAssets) ==
414                 AssetOperationUtils::AssetOpType::HANDLE) {
415                 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
416                 assetsToRemove[col].push_back(asset);
417             }
418         }
419     }
420 }
421 
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)422 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
423     DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
424 {
425     int errCode = E_OK;
426     VBucket dbAssets;
427     std::map<std::string, Assets> tmpAssetsToRemove;
428     if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
429         GetAssetsToRemove(downloadAssets, dbAssets, isSharedTable, tmpAssetsToRemove);
430     }
431     auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
432         tmpAssetsToRemove);
433 
434     std::map<std::string, Assets> tmpAssetsToDownload;
435     std::map<std::string, std::vector<uint32_t>> tmpFlags;
436     if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
437         GetAssetsToDownload(downloadAssets, dbAssets, isSharedTable, tmpAssetsToDownload, tmpFlags);
438     }
439     auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
440     if (downloadCode != SKIP_ASSET && !CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
441         return errCode;
442     }
443 
444     // copy asset back
445     downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
446         tmpAssetsToRemove);
447     return errCode;
448 }
449 
SeparateNormalAndFailAssets(const std::map<std::string,Assets> & assetsMap,VBucket & normalAssets,VBucket & failedAssets)450 void CloudSyncer::SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
451     VBucket &failedAssets)
452 {
453     for (auto &[key, assets] : assetsMap) {
454         Assets tempFailedAssets;
455         Assets tempNormalAssets;
456         int32_t otherStatusCnt = 0;
457         for (auto &asset : assets) {
458             if (asset.status == AssetStatus::NORMAL) {
459                 tempNormalAssets.push_back(asset);
460             } else if (asset.status == AssetStatus::ABNORMAL) {
461                 tempFailedAssets.push_back(asset);
462             } else {
463                 otherStatusCnt++;
464                 LOGW("[CloudSyncer] download err, statue %d count %d", asset.status, otherStatusCnt);
465             }
466         }
467         LOGI("[CloudSyncer] download asset times, normalCount %zu, abnormalCount %zu, otherStatusCnt %d",
468             tempNormalAssets.size(), tempFailedAssets.size(), otherStatusCnt);
469         if (tempFailedAssets.size() > 0) {
470             failedAssets[key] = std::move(tempFailedAssets);
471         }
472         if (tempNormalAssets.size() > 0) {
473             normalAssets[key] = std::move(tempNormalAssets);
474         }
475     }
476 }
477 
CommitDownloadAssets(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)478 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
479     DownloadCommitList &commitList, uint32_t &successCount)
480 {
481     int errCode = storageProxy_->SetLogTriggerStatus(false);
482     if (errCode != E_OK) {
483         return errCode;
484     }
485     for (auto &item : commitList) {
486         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
487         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
488         std::map<std::string, Assets> assetsMap = std::get<1>(item);
489         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
490         VBucket normalAssets;
491         VBucket failedAssets;
492         normalAssets[CloudDbConstant::GID_FIELD] = gid;
493         failedAssets[CloudDbConstant::GID_FIELD] = gid;
494         if (setAllNormal) {
495             for (auto &[key, asset] : assetsMap) {
496                 normalAssets[key] = std::move(asset);
497             }
498         } else {
499             SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
500         }
501         if (!downloadItem.recordConflict) {
502             errCode = FillCloudAssets(info, normalAssets, failedAssets);
503             if (errCode != E_OK) {
504                 break;
505             }
506         }
507         LogInfo logInfo;
508         logInfo.cloudGid = gid;
509         // download must contain gid, just set the default value here.
510         logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
511         logInfo.hashKey = downloadItem.hashKey;
512         logInfo.timestamp = downloadItem.timestamp;
513         // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
514         if (failedAssets.size() > 1) {
515             logInfo.timestamp = 0u;
516         }
517 
518         errCode = storageProxy_->UpdateRecordFlag(
519             info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
520         if (errCode != E_OK) {
521             break;
522         }
523         successCount++;
524     }
525     int ret = storageProxy_->SetLogTriggerStatus(true);
526     return errCode == E_OK ? ret : errCode;
527 }
528 
FillCloudAssetsForOneRecord(const std::string & gid,const std::map<std::string,Assets> & assetsMap,InnerProcessInfo & info,bool setAllNormal,bool & isExistAssetDownloadFail)529 int CloudSyncer::FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
530     InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail)
531 {
532     VBucket normalAssets;
533     VBucket failedAssets;
534     normalAssets[CloudDbConstant::GID_FIELD] = gid;
535     failedAssets[CloudDbConstant::GID_FIELD] = gid;
536     if (setAllNormal) {
537         for (auto &[key, asset] : assetsMap) {
538             normalAssets[key] = std::move(asset);
539         }
540     } else {
541         SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
542     }
543     isExistAssetDownloadFail = failedAssets.size() > 1; // There is initially one element present
544     return FillCloudAssets(info, normalAssets, failedAssets);
545 }
546 
UpdateRecordFlagForOneRecord(const std::string & gid,const DownloadItem & downloadItem,InnerProcessInfo & info,bool isExistAssetDownloadFail)547 int CloudSyncer::UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem,
548     InnerProcessInfo &info, bool isExistAssetDownloadFail)
549 {
550     LogInfo logInfo;
551     logInfo.cloudGid = gid;
552     // download must contain gid, just set the default value here.
553     logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
554     logInfo.hashKey = downloadItem.hashKey;
555     logInfo.timestamp = downloadItem.timestamp;
556     // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
557     if (isExistAssetDownloadFail) {
558         logInfo.timestamp = 0u;
559     }
560     return storageProxy_->UpdateRecordFlag(info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
561 }
562 
CommitDownloadAssetsForAsyncDownload(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)563 int CloudSyncer::CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
564     DownloadCommitList &commitList, uint32_t &successCount)
565 {
566     int errCode = storageProxy_->SetLogTriggerStatus(false, true);
567     if (errCode != E_OK) {
568         return errCode;
569     }
570     for (auto &item : commitList) {
571         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
572         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
573         std::map<std::string, Assets> assetsMap = std::get<1>(item);
574         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
575         LockStatus status = LockStatus::BUTT;
576         errCode = storageProxy_->GetLockStatusByGid(info.tableName, gid, status);
577         if (errCode == E_OK && status != LockStatus::UNLOCK) {
578             continue;
579         }
580 
581         bool isExistAssetDownloadFail = false;
582         if (!downloadItem.recordConflict) {
583             errCode = FillCloudAssetsForOneRecord(gid, assetsMap, info, setAllNormal, isExistAssetDownloadFail);
584             if (errCode != E_OK) {
585                 break;
586             }
587         }
588 
589         errCode = UpdateRecordFlagForOneRecord(gid, downloadItem, info, isExistAssetDownloadFail);
590         if (errCode != E_OK) {
591             break;
592         }
593         successCount++;
594     }
595     int ret = storageProxy_->SetLogTriggerStatus(true, true);
596     return errCode == E_OK ? ret : errCode;
597 }
598 
GenerateCompensatedSync(CloudTaskInfo & taskInfo)599 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
600 {
601     std::vector<QuerySyncObject> syncQuery;
602     std::vector<std::string> users;
603     int errCode = CloudSyncUtils::GetQueryAndUsersForCompensatedSync(
604         CloudSyncUtils::CanStartAsyncDownload(scheduleTaskCount_), storageProxy_, users, syncQuery);
605     if (errCode != E_OK) {
606         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
607         return;
608     }
609     if (syncQuery.empty()) {
610         LOGD("[CloudSyncer] Not need generate compensated sync");
611         return;
612     }
613     for (const auto &it : syncQuery) {
614         CloudTaskInfo compensatedTaskInfo = taskInfo;
615         compensatedTaskInfo.queryList.push_back(it);
616         Sync(compensatedTaskInfo);
617         taskInfo.callback = nullptr;
618         LOGI("[CloudSyncer] Generate compensated sync finished");
619     }
620 }
621 
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)622 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
623 {
624     if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
625         return;
626     }
627     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
628     if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
629         return;
630     }
631     if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
632         return;
633     }
634     info.tableStatus = ProcessStatus::FINISHED;
635     info.upLoadInfo.batchIndex++;
636     NotifyInBatchUpload(uploadParam, info, true);
637 }
638 
SaveCursorIfNeed(const std::string & tableName)639 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
640 {
641     std::string cursor = "";
642     int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
643     if (errCode != E_OK) {
644         LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
645         return errCode;
646     }
647     if (!cursor.empty()) {
648         return E_OK;
649     }
650     auto res = cloudDB_.GetEmptyCursor(tableName);
651     if (res.first != E_OK) {
652         LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
653         return res.first;
654     }
655     if (res.second.empty()) {
656         LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
657         return -E_CLOUD_ERROR;
658     }
659     errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
660     if (errCode != E_OK) {
661         LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
662     }
663     return errCode;
664 }
665 
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)666 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
667 {
668     std::string hashDev;
669     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
670     if (errCode != E_OK) {
671         LOGE("[CloudSyncer] Failed to get local identity.");
672         return errCode;
673     }
674     errCode = SaveCursorIfNeed(table);
675     if (errCode != E_OK) {
676         return errCode;
677     }
678     errCode = CheckTaskIdValid(taskInfo.taskId);
679     if (errCode != E_OK) {
680         LOGW("[CloudSyncer] task is invalid, abort sync");
681         return errCode;
682     }
683     errCode = DoDownload(taskInfo.taskId, isFirstDownload);
684     if (errCode != E_OK) {
685         LOGE("[CloudSyncer] download failed %d", errCode);
686     }
687     return errCode;
688 }
689 
IsClosed() const690 bool CloudSyncer::IsClosed() const
691 {
692     return closed_ || IsKilled();
693 }
694 
UpdateFlagForSavedRecord(const SyncParam & param)695 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam &param)
696 {
697     DownloadList downloadList;
698     {
699         std::lock_guard<std::mutex> autoLock(dataLock_);
700         downloadList = currentContext_.assetDownloadList;
701     }
702     std::set<std::string> downloadGid;
703     for (const auto &tuple : downloadList) {
704         if (CloudSyncUtils::IsContainDownloading(tuple)) {
705             downloadGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
706         }
707     }
708     if (IsCurrentAsyncDownloadTask()) {
709         int errCode = storageProxy_->MarkFlagAsAssetAsyncDownload(param.tableName, param.downloadData, downloadGid);
710         if (errCode != E_OK) {
711             LOGE("[CloudSyncer] Failed to mark flag consistent errCode %d", errCode);
712             return errCode;
713         }
714     }
715     return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, downloadGid);
716 }
717 
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)718 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
719 {
720     uint32_t retryCount = 0;
721     int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
722         uploadData.delData.extend, deleteInfo, retryCount);
723     innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
724     innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
725     innerProcessInfo.upLoadInfo.failCount += deleteInfo.failCount;
726     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
727         ProcessVersionConflictInfo(innerProcessInfo, retryCount);
728     }
729     if (errCode != E_OK) {
730         LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
731         storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
732         return errCode;
733     }
734     errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
735     if (errCode != E_OK) {
736         LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
737     }
738     return errCode;
739 }
740 
IsCompensatedTask(TaskId taskId)741 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
742 {
743     std::lock_guard<std::mutex> autoLock(dataLock_);
744     return cloudTaskInfos_[taskId].compensatedTask;
745 }
746 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)747 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
748 {
749     return cloudDB_.SetCloudDB(cloudDBs);
750 }
751 
CleanAllWaterMark()752 void CloudSyncer::CleanAllWaterMark()
753 {
754     storageProxy_->CleanAllWaterMark();
755 }
756 
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)757 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
758 {
759     downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
760     downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
761     downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
762     downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
763     downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
764     downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
765     downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
766 }
767 
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)768 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
769     const bool isFirstDownload)
770 {
771     bool isNeedNotify = false;
772     bool isLockAction = IsLockInDownload();
773     {
774         std::lock_guard<std::mutex> autoLock(dataLock_);
775         // only when the first download and the task no need upload actually, notify the process, otherwise,
776         // the process will notify in the upload procedure, which can guarantee the notify order of the tables
777         isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload && isLockAction;
778     }
779     if (!isNeedNotify) {
780         return;
781     }
782     for (size_t i = 0; i < needNotifyTables.size(); ++i) {
783         UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
784     }
785 }
786 
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)787 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
788 {
789     std::string tableName;
790     int ret = GetCurrentTableName(tableName);
791     if (ret != E_OK) {
792         LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
793         return ret;
794     }
795 
796     ret = storageProxy_->StartTransaction();
797     if (ret != E_OK) {
798         LOGE("[CloudSyncer] start transaction failed before getting upload count.");
799         return ret;
800     }
801 
802     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
803         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
804     if (ret != E_OK) {
805         // GetUploadCount will return E_OK when upload count is zero.
806         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
807     }
808     // No need Rollback when GetUploadCount failed
809     storageProxy_->Commit();
810     return ret;
811 }
812 
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)813 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
814     bool needNotify)
815 {
816     LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
817     InnerProcessInfo innerProcessInfo;
818     innerProcessInfo.tableName = tableName;
819     innerProcessInfo.upLoadInfo.total = 0;  // count is zero
820     innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
821     {
822         std::lock_guard<std::mutex> autoLock(dataLock_);
823         if (!needNotify) {
824             currentContext_.notifier->UpdateProcess(innerProcessInfo);
825         } else {
826             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
827         }
828     }
829 }
830 
SetNeedUpload(bool isNeedUpload)831 void CloudSyncer::SetNeedUpload(bool isNeedUpload)
832 {
833     std::lock_guard<std::mutex> autoLock(dataLock_);
834     currentContext_.isNeedUpload = isNeedUpload;
835 }
836 
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)837 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
838 {
839     std::vector<std::string> needNotifyTables;
840     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
841         std::string table;
842         {
843             std::lock_guard<std::mutex> autoLock(dataLock_);
844             if (currentContext_.processRecorder == nullptr) {
845                 LOGE("[CloudSyncer] process recorder of current context is nullptr.");
846                 return -E_INTERNAL_ERROR;
847             }
848             if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
849                 taskInfo.table[i])) {
850                 continue;
851             }
852             LOGD("[CloudSyncer] try download table, index: %zu, table name: %s, length: %zu",
853                 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
854             currentContext_.tableName = taskInfo.table[i];
855             table = currentContext_.tableName;
856         }
857         int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
858         if (errCode != E_OK) {
859             return errCode;
860         }
861         CheckDataAfterDownload(table);
862         MarkDownloadFinishIfNeed(table);
863         // needUpload indicate that the syncMode need push
864         if (needUpload) {
865             int64_t count = 0;
866             errCode = GetUploadCountByTable(taskInfo.taskId, count);
867             if (errCode != E_OK) {
868                 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
869                 return errCode;
870             }
871             // count > 0 means current table need upload actually
872             if (count > 0) {
873                 SetNeedUpload(true);
874                 continue;
875             }
876             needNotifyTables.emplace_back(table);
877         }
878         errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
879         if (errCode != E_OK) {
880             LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
881             return errCode;
882         }
883     }
884     DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
885     TriggerAsyncDownloadAssetsInTaskIfNeed(isFirstDownload);
886     return E_OK;
887 }
888 
IsNeedGetLocalWater(TaskId taskId)889 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
890 {
891     return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
892         !IsCompensatedTask(taskId);
893 }
894 
TryToAddSyncTask(CloudTaskInfo && taskInfo)895 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
896 {
897     if (closed_) {
898         LOGW("[CloudSyncer] syncer is closed, should not sync now");
899         return -E_DB_CLOSED;
900     }
901     std::shared_ptr<DataBaseSchema> cloudSchema;
902     int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
903     if (errCode != E_OK) {
904         LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
905         return errCode;
906     }
907     std::lock_guard<std::mutex> autoLock(dataLock_);
908     taskInfo.priorityLevel = (!taskInfo.priorityTask || taskInfo.compensatedTask)
909                                  ? CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL
910                                  : taskInfo.priorityLevel;
911     errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
912     if (errCode != E_OK) {
913         return errCode;
914     }
915     errCode = GenerateTaskIdIfNeed(taskInfo);
916     if (errCode != E_OK) {
917         return errCode;
918     }
919     auto taskId = taskInfo.taskId;
920     cloudTaskInfos_[taskId] = std::move(taskInfo);
921     taskQueue_.insert({cloudTaskInfos_[taskId].priorityLevel, taskId});
922     LOGI("[CloudSyncer]Add task ok, storeId %.3s, priority %d, priorityLevel %" PRId32 ", taskId %" PRIu64 " async %d",
923         cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].priorityTask,
924         cloudTaskInfos_[taskId].priorityLevel, cloudTaskInfos_[taskId].taskId,
925         static_cast<int>(cloudTaskInfos_[taskId].asyncDownloadAssets));
926     MarkCurrentTaskPausedIfNeed(taskInfo);
927     if (!cloudTaskInfos_[taskId].priorityTask) {
928         MergeTaskInfo(cloudSchema, taskId);
929     }
930     return E_OK;
931 }
932 
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)933 void CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
934 {
935     if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
936         return;
937     }
938     bool isMerge = false;
939     TaskId checkTaskId = taskId;
940     do {
941         std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
942     } while (isMerge);
943 }
944 
RemoveTaskFromQueue(int32_t priorityLevel,TaskId taskId)945 void CloudSyncer::RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId)
946 {
947     for (auto it = taskQueue_.find(priorityLevel); it != taskQueue_.end(); ++it) {
948         if (it->second == taskId) {
949             taskQueue_.erase(it);
950             return;
951         }
952     }
953 }
954 
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)955 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
956 {
957     std::pair<bool, TaskId> res;
958     auto &[merge, nextTryTask] = res;
959     TaskId beMergeTask = INVALID_TASK_ID;
960     TaskId runningTask = currentContext_.currentTaskId;
961     auto commonLevelTask = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
962     for (auto it = commonLevelTask.first; it != commonLevelTask.second; ++it) {
963         TaskId taskId = it->second;
964         if (taskId == runningTask || taskId == tryTaskId) {  // LCOV_EXCL_BR_LINE
965             continue;
966         }
967         if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
968             continue;
969         }
970         if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
971             beMergeTask = taskId;
972             nextTryTask = tryTaskId;
973             merge = true;
974             break;
975         }
976         if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
977             beMergeTask = tryTaskId;
978             nextTryTask = taskId;
979             merge = true;
980             break;
981         }
982     }
983     if (!merge) { // LCOV_EXCL_BR_LINE
984         return res;
985     }
986     if (beMergeTask > nextTryTask) { // LCOV_EXCL_BR_LINE
987         std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
988     }
989     AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
990     auto processNotifier = std::make_shared<ProcessNotifier>(this);
991     processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
992         cloudTaskInfos_[beMergeTask].users);
993     cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
994     cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
995     processNotifier->SetAllTableFinish();
996     processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
997     RemoveTaskFromQueue(cloudTaskInfos_[beMergeTask].priorityLevel, beMergeTask);
998     cloudTaskInfos_.erase(beMergeTask);
999     LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
1000     return res;
1001 }
1002 
IsTaskCanMerge(const CloudTaskInfo & taskInfo)1003 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
1004 {
1005     return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
1006         taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
1007 }
1008 
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)1009 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
1010 {
1011     const auto &taskInfo = cloudTaskInfos_[taskId];
1012     const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
1013     return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
1014         taskInfo.devices == tryMergeTaskInfo.devices &&
1015         taskInfo.asyncDownloadAssets == tryMergeTaskInfo.asyncDownloadAssets;
1016 }
1017 
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)1018 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
1019 {
1020     const auto &source = cloudTaskInfos_[sourceId];
1021     const auto &target = cloudTaskInfos_[targetId];
1022     bool isMerge = true;
1023     for (const auto &table : source.table) {
1024         if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
1025             isMerge = false;
1026             break;
1027         }
1028     }
1029     return isMerge;
1030 }
1031 
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)1032 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
1033     CloudTaskInfo &taskInfo)
1034 {
1035     std::vector<std::string> tmpTables = taskInfo.table;
1036     taskInfo.table.clear();
1037     taskInfo.queryList.clear();
1038     for (const auto &table : cloudSchema->tables) {
1039         if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
1040             taskInfo.table.push_back(table.name);
1041             QuerySyncObject querySyncObject;
1042             querySyncObject.SetTableName(table.name);
1043             taskInfo.queryList.push_back(querySyncObject);
1044         }
1045     }
1046 }
1047 
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)1048 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
1049 {
1050     cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
1051     cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
1052     std::set<std::string> users;
1053     users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
1054     users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
1055     cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
1056     return {target, source};
1057 }
1058 
IsQueryListEmpty(TaskId taskId)1059 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
1060 {
1061     std::lock_guard<std::mutex> autoLock(dataLock_);
1062     return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
1063         [](const auto &item) {
1064             return item.IsContainQueryNodes();
1065     });
1066 }
1067 
IsNeedProcessCloudCursor(TaskId taskId)1068 bool CloudSyncer::IsNeedProcessCloudCursor(TaskId taskId)
1069 {
1070     // Compensated task no need to save/get cloud cursor
1071     return IsQueryListEmpty(taskId) && !IsCompensatedTask(taskId);
1072 }
1073 
IsNeedLock(const UploadParam & param)1074 bool CloudSyncer::IsNeedLock(const UploadParam &param)
1075 {
1076     return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
1077 }
1078 
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)1079 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
1080 {
1081     std::pair<int, Timestamp> res = { E_OK, 0u };
1082     if (IsNeedGetLocalWater(uploadParam.taskId)) {
1083         res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
1084     }
1085     uploadParam.localMark = res.second;
1086     return res;
1087 }
1088 
ChangeProcessStatusAndNotifyIfNeed(UploadParam & uploadParam,InnerProcessInfo & info)1089 void CloudSyncer::ChangeProcessStatusAndNotifyIfNeed(UploadParam &uploadParam, InnerProcessInfo &info)
1090 {
1091     if (info.tableStatus == ProcessStatus::FINISHED) {
1092         // if process here, the process should't be finished and notify.
1093         info.tableStatus = ProcessStatus::PROCESSING;
1094         NotifyInBatchUpload(uploadParam, info, false);
1095     }
1096 }
1097 
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken,std::vector<ReviseModTimeInfo> & revisedData)1098 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
1099     CloudSyncData &uploadData, ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData)
1100 {
1101     int ret = E_OK;
1102     uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
1103     bool isLocked = false;
1104     while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
1105         ChangeProcessStatusAndNotifyIfNeed(uploadParam, info);
1106         revisedData.insert(revisedData.end(), uploadData.revisedData.begin(), uploadData.revisedData.end());
1107         ret = PreProcessBatchUpload(uploadParam, info, uploadData);
1108         if (ret != E_OK) {
1109             break;
1110         }
1111         info.upLoadInfo.batchIndex = ++batchIndex;
1112         if (IsNeedLock(uploadParam) && !isLocked) {
1113             ret = LockCloudIfNeed(uploadParam.taskId);
1114             if (ret != E_OK) {
1115                 break;
1116             }
1117             isLocked = true;
1118         }
1119         ret = DoBatchUpload(uploadData, uploadParam, info);
1120         if (ret != E_OK) {
1121             break;
1122         }
1123         uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
1124         if (continueStmtToken == nullptr) {
1125             break;
1126         }
1127         SetUploadDataFlag(uploadParam.taskId, uploadData);
1128         LOGI("[CloudSyncer] Write local water after upload one batch, table[%s length[%zu]], water[%llu]",
1129             DBCommon::StringMiddleMasking(uploadData.tableName).c_str(), uploadData.tableName.length(),
1130             uploadParam.localMark);
1131         RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
1132         ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1133         if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1134             LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1135             break;
1136         }
1137         ChkIgnoredProcess(info, uploadData, uploadParam);
1138     }
1139     if ((ret != E_OK) && (ret != -E_UNFINISHED) && (ret != -E_TASK_PAUSED)) {
1140         NotifyUploadFailed(ret, info);
1141     }
1142     if (isLocked && IsNeedLock(uploadParam)) {
1143         UnlockIfNeed();
1144     }
1145     return ret;
1146 }
1147 
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1148 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1149 {
1150     InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
1151     static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
1152     int errCode = E_OK;
1153     for (const auto &waterType: waterTypes) {
1154         uploadParam.mode = waterType;
1155         errCode = DoUploadByMode(tableName, uploadParam, info);
1156         if (errCode != E_OK) {
1157             break;
1158         }
1159     }
1160     int ret = E_OK;
1161     if (info.upLoadInfo.successCount > 0) {
1162         ret = UploadVersionRecordIfNeed(uploadParam);
1163     }
1164     return errCode != E_OK ? errCode : ret;
1165 }
1166 
UploadVersionRecordIfNeed(const UploadParam & uploadParam)1167 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
1168 {
1169     if (uploadParam.count == 0) {
1170         // no record upload
1171         return E_OK;
1172     }
1173     if (!cloudDB_.IsExistCloudVersionCallback()) {
1174         return E_OK;
1175     }
1176     auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
1177     if (errCode != E_OK) {
1178         return errCode;
1179     }
1180     bool isInsert = !uploadData.insData.record.empty();
1181     CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
1182     if (batchData.record.empty()) {
1183         LOGE("[CloudSyncer] Get invalid cloud version record");
1184         return -E_INTERNAL_ERROR;
1185     }
1186     std::string oriVersion;
1187     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1188     std::string newVersion;
1189     std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1190     if (errCode != E_OK) {
1191         LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1192         return errCode;
1193     }
1194     batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1195     InnerProcessInfo processInfo;
1196     Info info;
1197     std::vector<VBucket> copyRecord = batchData.record;
1198     WaterMark waterMark;
1199     CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1200     errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1201     batchData.record = copyRecord;
1202     CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1203     auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1204     return errCode != E_OK ? errCode : ret;
1205 }
1206 
TagUploadAssets(CloudSyncData & uploadData)1207 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1208 {
1209     if (!IsDataContainAssets()) {
1210         return;
1211     }
1212     std::vector<Field> assetFields;
1213     {
1214         std::lock_guard<std::mutex> autoLock(dataLock_);
1215         assetFields = currentContext_.assetFields[currentContext_.tableName];
1216     }
1217 
1218     for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1219         for (const Field &assetField : assetFields) {
1220             (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1221         }
1222     }
1223     for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1224         for (const Field &assetField : assetFields) {
1225             (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1226         }
1227     }
1228 }
1229 
IsLockInDownload()1230 bool CloudSyncer::IsLockInDownload()
1231 {
1232     std::lock_guard<std::mutex> autoLock(dataLock_);
1233     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1234         return false;
1235     }
1236     auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1237     return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1238 }
1239 
SetCurrentTaskFailedInMachine(int errCode)1240 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1241 {
1242     std::lock_guard<std::mutex> autoLock(dataLock_);
1243     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1244     return CloudSyncEvent::ERROR_EVENT;
1245 }
1246 
InitCloudSyncStateMachine()1247 void CloudSyncer::InitCloudSyncStateMachine()
1248 {
1249     CloudSyncStateMachine::Initialize();
1250     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1251         return SyncMachineDoDownload();
1252     });
1253     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1254         return SyncMachineDoUpload();
1255     });
1256     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1257         return SyncMachineDoFinished();
1258     });
1259     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1260         return SyncMachineDoRepeatCheck();
1261     });
1262 }
1263 
SyncMachineDoRepeatCheck()1264 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1265 {
1266     auto config = storageProxy_->GetCloudSyncConfig();
1267     {
1268         std::lock_guard<std::mutex> autoLock(dataLock_);
1269         if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1270             return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1271         }
1272         currentContext_.repeatCount++;
1273         if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1274             LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1275                 config.maxRetryConflictTimes);
1276             SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1277             return CloudSyncEvent::ERROR_EVENT;
1278         }
1279         LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1280             currentContext_.repeatCount);
1281     }
1282     return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1283 }
1284 
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1285 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1286 {
1287     // table exist reference should download every times
1288     if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1289         return;
1290     }
1291     std::lock_guard<std::mutex> autoLock(dataLock_);
1292     currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1293 }
1294 
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1295 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1296 {
1297     CloudSyncData uploadData(tableName, uploadParam.mode);
1298     SetUploadDataFlag(uploadParam.taskId, uploadData);
1299     auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1300     LOGI("[CloudSyncer] Get local water before upload result: %d, table[%s length[%zu]], water[%llu]", err,
1301         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), localWater);
1302     if (err != E_OK) {
1303         return err;
1304     }
1305     ContinueToken continueStmtToken = nullptr;
1306     int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1307     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1308         LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1309         return ret;
1310     }
1311     uploadParam.count -= uploadData.ignoredCount;
1312     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1313     std::vector<ReviseModTimeInfo> revisedData;
1314     ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken, revisedData);
1315     if (ret != -E_TASK_PAUSED) {
1316         // reset watermark to zero when task no paused
1317         RecordWaterMark(uploadParam.taskId, 0u);
1318     }
1319     if (continueStmtToken != nullptr) {
1320         storageProxy_->ReleaseContinueToken(continueStmtToken);
1321     }
1322     if (!revisedData.empty()) {
1323         int errCode = storageProxy_->ReviseLocalModTime(tableName, revisedData);
1324         if (errCode != E_OK) {
1325             LOGE("[CloudSyncer] Failed to revise local modify time: %d, table: %s",
1326                 errCode, DBCommon::StringMiddleMasking(tableName).c_str());
1327         }
1328     }
1329     return ret;
1330 }
1331 
IsTableFinishInUpload(const std::string & table)1332 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1333 {
1334     std::lock_guard<std::mutex> autoLock(dataLock_);
1335     return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1336 }
1337 
MarkUploadFinishIfNeed(const std::string & table)1338 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1339 {
1340     // table exist reference or reference by should upload every times
1341     if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1342         return;
1343     }
1344     std::lock_guard<std::mutex> autoLock(dataLock_);
1345     currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1346 }
1347 
GetCloudTaskStatus(uint64_t taskId) const1348 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1349 {
1350     std::lock_guard<std::mutex> autoLock(dataLock_);
1351     auto iter = cloudTaskInfos_.find(taskId);
1352     SyncProcess syncProcess;
1353     if (iter == cloudTaskInfos_.end()) {
1354         syncProcess.process = ProcessStatus::FINISHED;
1355         syncProcess.errCode = NOT_FOUND;
1356         LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1357         return syncProcess;
1358     }
1359     syncProcess.process = iter->second.status;
1360     syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1361     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1362     if (currentContext_.currentTaskId == taskId) {
1363         notifier = currentContext_.notifier;
1364     }
1365     bool hasNotifier = notifier != nullptr;
1366     if (hasNotifier) {
1367         syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1368     }
1369     LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1370         iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1371     return syncProcess;
1372 }
1373 
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1374 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1375 {
1376     if (taskInfo.taskId != INVALID_TASK_ID) {
1377         if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1378             LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1379                 taskInfo.storeId.c_str());
1380             return -E_INVALID_ARGS;
1381         }
1382         LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1383         return E_OK;
1384     }
1385     lastTaskId_--;
1386     if (lastTaskId_ == INVALID_TASK_ID) {
1387         lastTaskId_ = UINT64_MAX;
1388     }
1389     taskInfo.taskId = lastTaskId_;
1390     return E_OK;
1391 }
1392 
ProcessVersionConflictInfo(InnerProcessInfo & innerProcessInfo,uint32_t retryCount)1393 void CloudSyncer::ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount)
1394 {
1395     innerProcessInfo.retryInfo.uploadBatchRetryCount = retryCount;
1396     CloudSyncConfig config = storageProxy_->GetCloudSyncConfig();
1397     {
1398         std::lock_guard<std::mutex> autoLock(dataLock_);
1399         if (config.maxRetryConflictTimes >= 0 &&
1400             currentContext_.repeatCount + 1 > config.maxRetryConflictTimes) {
1401             innerProcessInfo.upLoadInfo.failCount =
1402                 innerProcessInfo.upLoadInfo.total - innerProcessInfo.upLoadInfo.successCount;
1403         }
1404     }
1405 }
1406 
GetStoreIdByTask(TaskId taskId)1407 std::string CloudSyncer::GetStoreIdByTask(TaskId taskId)
1408 {
1409     std::lock_guard<std::mutex> autoLock(dataLock_);
1410     return cloudTaskInfos_[taskId].storeId;
1411 }
1412 
StopSyncTask(std::function<int (void)> & removeFunc)1413 int CloudSyncer::StopSyncTask(std::function<int(void)> &removeFunc)
1414 {
1415     hasKvRemoveTask = true;
1416     CloudSyncer::TaskId currentTask;
1417     {
1418         // stop task if exist
1419         std::lock_guard<std::mutex> autoLock(dataLock_);
1420         currentTask = currentContext_.currentTaskId;
1421     }
1422     if (currentTask != INVALID_TASK_ID || asyncTaskId_ != INVALID_TASK_ID) {
1423         StopAllTasks(-E_CLOUD_ERROR);
1424     }
1425     int errCode = E_OK;
1426     {
1427         std::lock_guard<std::mutex> lock(syncMutex_);
1428         errCode = removeFunc();
1429         hasKvRemoveTask = false;
1430     }
1431     if (errCode != E_OK) {
1432         LOGE("[CloudSyncer] removeFunc execute failed errCode: %d.", errCode);
1433     }
1434     return errCode;
1435 }
1436 
StopAllTasks(int errCode)1437 void CloudSyncer::StopAllTasks(int errCode)
1438 {
1439     CloudSyncer::TaskId currentTask;
1440     {
1441         std::lock_guard<std::mutex> autoLock(dataLock_);
1442         currentTask = currentContext_.currentTaskId;
1443     }
1444     // mark current task user_change
1445     SetTaskFailed(currentTask, errCode);
1446     UnlockIfNeed();
1447     WaitCurTaskFinished();
1448 
1449     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
1450     for (auto &info: infoList) {
1451         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with errCode %d, isPriority %d.", info.taskId, errCode,
1452             info.priorityTask);
1453         auto processNotifier = std::make_shared<ProcessNotifier>(this);
1454         processNotifier->Init(info.table, info.devices, info.users);
1455         info.errCode = errCode;
1456         info.status = ProcessStatus::FINISHED;
1457         processNotifier->NotifyProcess(info, {}, true);
1458     }
1459 }
1460 
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)1461 int CloudSyncer::TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
1462 {
1463     OpType strategyOpResult = OpType::NOT_HANDLE;
1464     int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
1465     if (errCode != E_OK) {
1466         return errCode;
1467     }
1468     param.downloadData.opType[idx] = strategyOpResult;
1469     if (!IsDataContainAssets()) {
1470         return E_OK;
1471     }
1472     Key hashKey;
1473     if (isExist) {
1474         hashKey = dataInfo.localInfo.logInfo.hashKey;
1475     }
1476     if (param.isAssetsOnly) {
1477         return strategyOpResult == OpType::LOCKED_NOT_HANDLE ?
1478             E_OK : TagDownloadAssetsForAssetOnly(hashKey, idx, param, dataInfo, localAssetInfo);
1479     }
1480     return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
1481 }
1482 
CloudDbBatchDownloadAssets(TaskId taskId,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets)1483 int CloudSyncer::CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList,
1484     const std::set<Key> &dupHashKeySet, InnerProcessInfo &info, ChangedData &changedAssets)
1485 {
1486     int errorCode = CheckTaskIdValid(taskId);
1487     if (errorCode != E_OK) {
1488         return errorCode;
1489     }
1490     bool isSharedTable = false;
1491     errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1492     if (errorCode != E_OK) {
1493         LOGE("[CloudSyncer] check is shared table failed %d", errorCode);
1494         return errorCode;
1495     }
1496     // prepare download data
1497     auto [downloadRecord, removeAssets, downloadAssets] =
1498         GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, IsAsyncDownloadAssets(taskId), info);
1499     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1500         std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1501     };
1502     return BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1503 }
1504 
FillDownloadItem(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,const InnerProcessInfo & info,bool isSharedTable,DownloadItems & record)1505 void CloudSyncer::FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1506     const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record)
1507 {
1508     CloudStorageUtils::EraseNoChangeAsset(record.downloadItem.assets);
1509     if (record.downloadItem.assets.empty()) { // Download data (include deleting)
1510         return;
1511     }
1512     if (isSharedTable) {
1513         // share table will not download asset, need to reset the status
1514         for (auto &entry: record.downloadItem.assets) {
1515             for (auto &asset: entry.second) {
1516                 asset.status = AssetStatus::NORMAL;
1517             }
1518         }
1519         return;
1520     }
1521     int errCode = E_OK;
1522     VBucket dbAssets;
1523     if (!IsNeedSkipDownload(isSharedTable, errCode, info, record.downloadItem, dbAssets)) {
1524         GetAssetsToRemove(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToRemove);
1525         GetAssetsToDownload(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToDownload, record.flags);
1526     }
1527 }
1528 
GetDownloadRecords(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,bool isSharedTable,bool isAsyncDownloadAssets,const InnerProcessInfo & info)1529 CloudSyncer::DownloadAssetDetail CloudSyncer::GetDownloadRecords(const DownloadList &downloadList,
1530     const std::set<Key> &dupHashKeySet, bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info)
1531 {
1532     DownloadItemRecords downloadRecord;
1533     RemoveAssetsRecords removeAssets;
1534     DownloadAssetsRecords downloadAssets;
1535     for (size_t i = 0; i < downloadList.size(); i++) {
1536         DownloadItems record;
1537         GetDownloadItem(downloadList, i, record.downloadItem);
1538         FillDownloadItem(dupHashKeySet, downloadList, info, isSharedTable, record);
1539 
1540         IAssetLoader::AssetRecord removeAsset = {
1541             record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToRemove)
1542         };
1543         removeAssets.push_back(std::move(removeAsset));
1544         if (isAsyncDownloadAssets) {
1545             record.assetsToDownload.clear();
1546         }
1547         IAssetLoader::AssetRecord downloadAsset = {
1548             record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToDownload)
1549         };
1550         downloadAssets.push_back(std::move(downloadAsset));
1551         downloadRecord.push_back(std::move(record));
1552     }
1553     return {downloadRecord, removeAssets, downloadAssets};
1554 }
1555 
BatchDownloadAndCommitRes(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets,std::tuple<DownloadItemRecords,RemoveAssetsRecords,DownloadAssetsRecords,bool> & downloadDetail)1556 int CloudSyncer::BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
1557     InnerProcessInfo &info, ChangedData &changedAssets,
1558     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail)
1559 {
1560     auto &[downloadRecord, removeAssets, downloadAssets, isSharedTable] = downloadDetail;
1561     // download and remove in batch
1562     auto deleteRes = cloudDB_.BatchRemoveLocalAssets(info.tableName, removeAssets);
1563     auto downloadRes = cloudDB_.BatchDownload(info.tableName, downloadAssets);
1564     if (deleteRes == -E_NOT_SET || downloadRes == -E_NOT_SET) {
1565         return -E_NOT_SET;
1566     }
1567     int errorCode = E_OK;
1568     int index = 0;
1569     for (auto &item : downloadRecord) {
1570         auto deleteCode = removeAssets[index].status == OK ? E_OK : -E_REMOVE_ASSETS_FAILED;
1571         auto downloadCode = CloudDBProxy::GetInnerErrorCode(downloadAssets[index].status);
1572         downloadCode = downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT ? E_OK : downloadCode;
1573         if (!isSharedTable) {
1574             item.downloadItem.assets = BackFillAssetsAfterDownload(downloadCode, deleteCode, item.flags,
1575                 downloadAssets[index].assets, removeAssets[index].assets);
1576         }
1577         StatisticDownloadRes(downloadAssets[index], removeAssets[index], info, item.downloadItem);
1578         AddNotifyDataFromDownloadAssets(dupHashKeySet, item.downloadItem, changedAssets);
1579         if (item.downloadItem.strategy == OpType::DELETE) {
1580             item.downloadItem.assets = {};
1581             item.downloadItem.gid = "";
1582         }
1583         // commit download res
1584         DownloadCommitList commitList;
1585         // Process result of each asset
1586         downloadCode = downloadAssets[index].status == SKIP_ASSET ? E_OK : downloadCode;
1587         commitList.push_back(std::make_tuple(item.downloadItem.gid, std::move(item.downloadItem.assets),
1588             deleteCode == E_OK && downloadCode == E_OK));
1589         errorCode = (errorCode != E_OK) ? errorCode : deleteCode;
1590         errorCode = (errorCode != E_OK) ? errorCode : downloadCode;
1591         int currErrorCode = (deleteCode != E_OK) ? deleteCode : downloadCode;
1592         int ret = CommitDownloadResult(item.downloadItem, info, commitList, currErrorCode);
1593         if (ret != E_OK) {
1594             errorCode = errorCode == E_OK ? ret : errorCode;
1595         }
1596         index++;
1597     }
1598     storageProxy_->PrintCursorChange(info.tableName);
1599     return errorCode;
1600 }
1601 
StatisticDownloadRes(const IAssetLoader::AssetRecord & downloadRecord,const IAssetLoader::AssetRecord & removeRecord,InnerProcessInfo & info,DownloadItem & downloadItem)1602 void CloudSyncer::StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
1603     const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem)
1604 {
1605     if ((downloadRecord.status == OK || downloadRecord.status == SKIP_ASSET) && (removeRecord.status == OK)) {
1606         return;
1607     }
1608     if ((downloadRecord.status == CLOUD_RECORD_EXIST_CONFLICT) ||
1609         (removeRecord.status == CLOUD_RECORD_EXIST_CONFLICT)) {
1610         downloadItem.recordConflict = true;
1611         return;
1612     }
1613     info.downLoadInfo.failCount += 1;
1614     if (info.downLoadInfo.successCount == 0) {
1615         LOGW("[CloudSyncer] Invalid successCount");
1616     } else {
1617         info.downLoadInfo.successCount -= 1;
1618     }
1619 }
1620 
AddNotifyDataFromDownloadAssets(const std::set<Key> & dupHashKeySet,DownloadItem & downloadItem,ChangedData & changedAssets)1621 void CloudSyncer::AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
1622     ChangedData &changedAssets)
1623 {
1624     if (downloadItem.assets.empty()) {
1625         return;
1626     }
1627     if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1628         if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
1629             LOGW("[CloudSyncer] [AddNotifyDataFromDownloadAssets] strategy is invalid.");
1630         } else {
1631             changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1632                 downloadItem.primaryKeyValList);
1633         }
1634     } else if (downloadItem.strategy == OpType::INSERT) {
1635         changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1636     }
1637 }
1638 
CheckDataAfterDownload(const std::string & tableName)1639 void CloudSyncer::CheckDataAfterDownload(const std::string &tableName)
1640 {
1641     int dataCount = 0;
1642     int logicDeleteDataCount = 0;
1643     int errCode = storageProxy_->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
1644     if (errCode == E_OK) {
1645         LOGI("[CloudSyncer] Check local data after download[%s[%zu]], data count: %d, logic delete data count: %d",
1646             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), dataCount, logicDeleteDataCount);
1647     } else {
1648         LOGW("[CloudSyncer] Get local data after download fail: %d", errCode);
1649     }
1650 }
1651 
WaitCurTaskFinished()1652 void CloudSyncer::WaitCurTaskFinished()
1653 {
1654     CancelBackgroundDownloadAssetsTask();
1655     std::unique_lock<std::mutex> uniqueLock(dataLock_);
1656     if (currentContext_.currentTaskId != INVALID_TASK_ID) {
1657         LOGI("[CloudSyncer] begin wait current task %" PRIu64 " finished", currentContext_.currentTaskId);
1658         contextCv_.wait(uniqueLock, [this]() {
1659             return currentContext_.currentTaskId == INVALID_TASK_ID;
1660         });
1661         LOGI("[CloudSyncer] current task has been finished");
1662     }
1663 }
1664 
IsAsyncDownloadAssets(TaskId taskId)1665 bool CloudSyncer::IsAsyncDownloadAssets(TaskId taskId)
1666 {
1667     std::lock_guard<std::mutex> autoLock(dataLock_);
1668     return cloudTaskInfos_[taskId].asyncDownloadAssets;
1669 }
1670 
TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)1671 void CloudSyncer::TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)
1672 {
1673     {
1674         std::lock_guard<std::mutex> autoLock(dataLock_);
1675         if (!cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets) {
1676             return;
1677         }
1678     }
1679     if (isFirstDownload) {
1680         bool isLockAction = IsLockInDownload();
1681         std::lock_guard<std::mutex> autoLock(dataLock_);
1682         if (currentContext_.isNeedUpload && isLockAction) {
1683             return;
1684         }
1685     }
1686     TriggerAsyncDownloadAssetsIfNeed();
1687 }
1688 
BackgroundDownloadAssetsTask()1689 void CloudSyncer::BackgroundDownloadAssetsTask()
1690 {
1691     // remove listener first
1692     CancelDownloadListener();
1693     // add download count and register listener if failed
1694     auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
1695     IncObjRef(this);
1696     auto [errCode, listener] = manager->BeginDownloadWithListener([this](void *) {
1697         TriggerAsyncDownloadAssetsIfNeed();
1698     }, [this]() {
1699         DecObjRef(this);
1700     });
1701     if (errCode == E_OK) {
1702         // increase download count success
1703         CancelDownloadListener();
1704         DoBackgroundDownloadAssets();
1705         RuntimeContext::GetInstance()->GetAssetsDownloadManager()->FinishDownload();
1706         DecObjRef(this);
1707         return;
1708     }
1709     if (listener != nullptr) {
1710         std::lock_guard<std::mutex> autoLock(listenerMutex_);
1711         waitDownloadListener_ = listener;
1712         return;
1713     }
1714     LOGW("[CloudSyncer] BeginDownloadWithListener failed %d", errCode);
1715     DecObjRef(this);
1716 }
1717 
CancelDownloadListener()1718 void CloudSyncer::CancelDownloadListener()
1719 {
1720     NotificationChain::Listener *waitDownloadListener = nullptr;
1721     {
1722         std::lock_guard<std::mutex> autoLock(listenerMutex_);
1723         if (waitDownloadListener_ != nullptr) {
1724             waitDownloadListener = waitDownloadListener_;
1725             waitDownloadListener_ = nullptr;
1726         }
1727     }
1728     if (waitDownloadListener != nullptr) {
1729         waitDownloadListener->Drop(true);
1730         waitDownloadListener = nullptr;
1731     }
1732 }
1733 
DoBackgroundDownloadAssets()1734 void CloudSyncer::DoBackgroundDownloadAssets()
1735 {
1736     bool allDownloadFinish = true;
1737     std::map<std::string, int64_t> downloadBeginTime;
1738     do {
1739         auto [errCode, tables] = storageProxy_->GetDownloadAssetTable();
1740         if (errCode != E_OK) {
1741             LOGE("[CloudSyncer] Get download asset table failed %d", errCode);
1742             return;
1743         }
1744         allDownloadFinish = true;
1745         std::list<std::string> tableQueue(tables.begin(), tables.end());
1746         while (!tableQueue.empty()) {
1747             if (cancelAsyncTask_ || closed_) {
1748                 LOGW("[CloudSyncer] exit task by cancel %d closed %d", static_cast<int>(cancelAsyncTask_),
1749                     static_cast<int>(closed_));
1750                 return;
1751             }
1752             errCode = BackgroundDownloadAssetsByTable(tableQueue.front(), downloadBeginTime);
1753             if (errCode == E_OK) {
1754                 allDownloadFinish = false;
1755             } else if (errCode == -E_FINISHED) {
1756                 tableQueue.pop_front();
1757                 errCode = E_OK;
1758             } else {
1759                 LOGW("[CloudSyncer] BackgroundDownloadAssetsByTable table %s failed %d",
1760                     DBCommon::StringMiddleMasking(tableQueue.front()).c_str(), errCode);
1761                 allDownloadFinish = false;
1762             }
1763         }
1764     } while (!allDownloadFinish);
1765 }
1766 
CancelBackgroundDownloadAssetsTaskIfNeed()1767 void CloudSyncer::CancelBackgroundDownloadAssetsTaskIfNeed()
1768 {
1769     bool cancelDownload = true;
1770     {
1771         std::unique_lock<std::mutex> uniqueLock(dataLock_);
1772         if (cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets || asyncTaskId_ == INVALID_TASK_ID) {
1773             return;
1774         }
1775         if (cloudTaskInfos_[currentContext_.currentTaskId].compensatedTask) {
1776             cancelDownload = false;
1777         }
1778     }
1779     CancelBackgroundDownloadAssetsTask(cancelDownload);
1780 }
1781 
CancelBackgroundDownloadAssetsTask(bool cancelDownload)1782 void CloudSyncer::CancelBackgroundDownloadAssetsTask(bool cancelDownload)
1783 {
1784     cancelAsyncTask_ = true;
1785     if (cancelDownload) {
1786         cloudDB_.CancelDownload();
1787     }
1788     std::unique_lock<std::mutex> uniqueLock(dataLock_);
1789     if (asyncTaskId_ != INVALID_TASK_ID) {
1790         LOGI("[CloudSyncer] begin wait async download task % " PRIu64 " finished", asyncTaskId_);
1791         asyncTaskCv_.wait(uniqueLock, [this]() {
1792             return asyncTaskId_ == INVALID_TASK_ID;
1793         });
1794         LOGI("[CloudSyncer] async download task has been finished");
1795     }
1796     cancelAsyncTask_ = false;
1797 }
1798 
BackgroundDownloadAssetsByTable(const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)1799 int CloudSyncer::BackgroundDownloadAssetsByTable(const std::string &table,
1800     std::map<std::string, int64_t> &downloadBeginTime)
1801 {
1802     auto [errCode, downloadData] = storageProxy_->GetDownloadAssetRecords(table, downloadBeginTime[table]);
1803     if (errCode != E_OK) {
1804         return errCode;
1805     }
1806     if (downloadData.empty()) {
1807         LOGD("[CloudSyncer] table %s async download finished", DBCommon::StringMiddleMasking(table).c_str());
1808         return -E_FINISHED;
1809     }
1810 
1811     bool isSharedTable = false;
1812     errCode = storageProxy_->IsSharedTable(table, isSharedTable);
1813     if (errCode != E_OK) {
1814         LOGE("[CloudSyncer] check is shared table failed %d", errCode);
1815         return errCode;
1816     }
1817     DownloadList downloadList;
1818     ChangedData changedAssets;
1819     std::tie(errCode, downloadList, changedAssets) = CloudSyncUtils::GetDownloadListByGid(storageProxy_, downloadData,
1820         table);
1821     if (errCode != E_OK) {
1822         return errCode;
1823     }
1824     CloudSyncUtils::UpdateMaxTimeWithDownloadList(downloadList, table, downloadBeginTime);
1825     std::set<Key> dupHashKeySet;
1826     InnerProcessInfo info;
1827     info.tableName = table;
1828     info.isAsyncDownload = true;
1829 
1830     // prepare download data
1831     auto [downloadRecord, removeAssets, downloadAssets] =
1832         GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, false, info);
1833     std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1834         std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1835     };
1836     errCode = BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1837     NotifyChangedDataWithDefaultDev(std::move(changedAssets));
1838     return errCode;
1839 }
1840 
TagDownloadAssetsForAssetOnly(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)1841 int CloudSyncer::TagDownloadAssetsForAssetOnly(
1842     const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo, VBucket &localAssetInfo)
1843 {
1844     Type prefix;
1845     std::vector<Type> pkVals;
1846     int ret = CloudSyncUtils::GetCloudPkVals(
1847         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
1848     if (ret != E_OK) {
1849         // if get pk vals failed, mean cloud data is deteled.
1850         LOGE("[CloudSyncer] TagDownloadAssetsForAssetOnly cannot get primary key value list. %d",
1851             -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1852         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1853     }
1854     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
1855     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
1856         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
1857         return -E_INTERNAL_ERROR;
1858     }
1859     std::map<std::string, Assets> downloadAssetsMap{};
1860     ret = CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(idx, param, downloadAssetsMap);
1861     if (ret != E_OK) {
1862         return ret;
1863     }
1864     param.assetsDownloadList.push_back(
1865         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, OpType::UPDATE, downloadAssetsMap, hashKey,
1866         pkVals, dataInfo.cloudLogInfo.timestamp));
1867     return ret;
1868 }
1869 
PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam & param,std::vector<VBucket> & localInfo)1870 int CloudSyncer::PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam &param, std::vector<VBucket> &localInfo)
1871 {
1872     int ret = E_OK;
1873     if (param.isAssetsOnly) {
1874         // download and save only asset, ignore other data.
1875         for (auto &item : localInfo) {
1876             ret = storageProxy_->UpdateAssetStatusForAssetOnly(param.tableName, item);
1877             if (ret != E_OK) {
1878                 LOGE("[CloudSyncer] Cannot save asset data due to error code %d", ret);
1879                 return ret;
1880             }
1881         }
1882     } else {
1883         ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
1884         if (ret != E_OK) {
1885             param.info.downLoadInfo.failCount += param.downloadData.data.size();
1886             LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
1887         }
1888     }
1889     return ret;
1890 }
1891 
QueryCloudGidForAssetsOnly(TaskId taskId,SyncParam & param,int64_t groupIdx,std::vector<std::string> & cloudGid)1892 int CloudSyncer::QueryCloudGidForAssetsOnly(
1893     TaskId taskId, SyncParam &param, int64_t groupIdx, std::vector<std::string> &cloudGid)
1894 {
1895     auto tableName = param.info.tableName;
1896     QuerySyncObject syncObj = GetQuerySyncObject(tableName);
1897     VBucket extend = {{CloudDbConstant::CURSOR_FIELD, param.cloudWaterMarkForAssetsOnly}};
1898     QuerySyncObject obj;
1899     int ret = syncObj.GetQuerySyncObjectFromGroup(groupIdx, obj);
1900     if (ret != E_OK) {
1901         LOGE("Get query obj from group fail, errCode = %d", ret);
1902         return ret;
1903     }
1904     ret = GetCloudGid(taskId, tableName, obj, cloudGid);
1905     if (ret != E_OK) {
1906         LOGE("Get cloud gid fail, errCode = %d", ret);
1907     }
1908     return ret;
1909 }
1910 
GetEmptyGidAssetsMapFromDownloadData(const std::vector<VBucket> & data,std::map<std::string,AssetsMap> & gidAssetsMap)1911 int CloudSyncer::GetEmptyGidAssetsMapFromDownloadData(
1912     const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap)
1913 {
1914     for (uint32_t i = 0; i < data.size(); i++) {
1915         std::string gidStr;
1916         int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, data[i], gidStr);
1917         if (errCode != E_OK) {
1918             LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1919             return errCode;
1920         }
1921         gidAssetsMap[gidStr] = AssetsMap{};
1922     }
1923     return E_OK;
1924 }
1925 
SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId,SyncParam & param,std::vector<VBucket> & downloadData,std::map<std::string,AssetsMap> & gidAssetsMap)1926 int CloudSyncer::SetAssetsMapAndEraseDataForAssetsOnly(
1927     TaskId taskId, SyncParam &param, std::vector<VBucket> &downloadData, std::map<std::string, AssetsMap> &gidAssetsMap)
1928 {
1929     for (uint32_t i = 0; i < param.groupNum; i++) {
1930         std::vector<std::string> cloudGid;
1931         int ret = QueryCloudGidForAssetsOnly(taskId, param, i, cloudGid);
1932         if ((ret != E_OK && ret != -E_QUERY_END) || cloudGid.empty()) {
1933             LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1934             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1935         }
1936         if (!CloudSyncUtils::SetAssetsMapByCloudGid(cloudGid, param.assetsGroupMap[i], gidAssetsMap)) {
1937             // if group no match data, return error code.
1938             LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1939             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1940         }
1941     }
1942     for (auto iter = downloadData.begin(); iter != downloadData.end();) {
1943         std::string gidStr;
1944         int ret = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, *iter, gidStr);
1945         if (ret != E_OK) {
1946             LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", ret);
1947             return ret;
1948         }
1949 
1950         if (DBCommon::IsRecordDelete(*iter)) {
1951             iter = downloadData.erase(iter);
1952             gidAssetsMap.erase(gidStr);
1953             continue;
1954         }
1955 
1956         auto assetsMap = gidAssetsMap[gidStr];
1957         if (!CloudSyncUtils::IsAssetOnlyData(*iter, assetsMap, false)) {
1958             iter = downloadData.erase(iter);
1959             gidAssetsMap.erase(gidStr);
1960             continue;
1961         }
1962         ++iter;
1963     }
1964     return E_OK;
1965 }
1966 
CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId,SyncParam & param)1967 int CloudSyncer::CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam &param)
1968 {
1969     {
1970         std::lock_guard<std::mutex> autoLock(dataLock_);
1971         if (!param.isAssetsOnly || cloudTaskInfos_[taskId].compensatedTask) {
1972             return E_OK;
1973         }
1974         if (!param.isVaildForAssetsOnly) {
1975             param.downloadData.data.clear();
1976             return E_OK;
1977         }
1978         cloudTaskInfos_[taskId].isAssetsOnly = param.isAssetsOnly;
1979         cloudTaskInfos_[taskId].groupNum = param.groupNum;
1980         cloudTaskInfos_[taskId].assetsGroupMap = param.assetsGroupMap;
1981     }
1982 
1983     std::vector<VBucket> &downloadData = param.downloadData.data;
1984     auto &gidAssetsMap = param.gidAssetsMap;
1985     gidAssetsMap.clear();
1986     int ret = GetEmptyGidAssetsMapFromDownloadData(downloadData, gidAssetsMap);
1987     if (ret != E_OK) {
1988         return ret;
1989     }
1990 
1991     // set assets map for every record and erase not match data.
1992     ret = SetAssetsMapAndEraseDataForAssetsOnly(taskId, param, downloadData, gidAssetsMap);
1993     if (ret != E_OK) {
1994         return ret;
1995     }
1996 
1997     for (uint32_t i = 0; i < param.groupNum; i++) {
1998         bool isEmpty = CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(gidAssetsMap, param.assetsGroupMap[i]);
1999         if (isEmpty) {
2000             LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2001             return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2002         }
2003     }
2004     return E_OK;
2005 }
2006 
CheckLocalQueryAssetsOnlyIfNeed(VBucket & localAssetInfo,SyncParam & param,DataInfoWithLog & logInfo)2007 int CloudSyncer::CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam &param, DataInfoWithLog &logInfo)
2008 {
2009     if (!param.isAssetsOnly) {
2010         return E_OK;
2011     }
2012     std::string gid = logInfo.logInfo.cloudGid;
2013     auto iter = param.gidAssetsMap.find(gid);
2014     if (iter == param.gidAssetsMap.end()) {
2015         LOGE("query assets failed, error code:%d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2016         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2017     }
2018 
2019     auto assetsMap = iter->second;
2020     if (!CloudSyncUtils::IsAssetOnlyData(localAssetInfo, assetsMap, true)) {
2021         LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2022         return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2023     }
2024     return E_OK;
2025 }
2026 
IsCurrentAsyncDownloadTask()2027 bool CloudSyncer::IsCurrentAsyncDownloadTask()
2028 {
2029     std::lock_guard<std::mutex> autoLock(dataLock_);
2030     return cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets;
2031 }
2032 
NotifyChangedDataWithDefaultDev(ChangedData && changedData)2033 void CloudSyncer::NotifyChangedDataWithDefaultDev(ChangedData &&changedData)
2034 {
2035     auto table = changedData.tableName;
2036     int ret = CloudSyncUtils::NotifyChangeData(CloudDbConstant::DEFAULT_CLOUD_DEV, storageProxy_,
2037         std::move(changedData));
2038     if (ret != E_OK) {
2039         LOGW("[CloudSyncer] Notify %s change data failed %d", DBCommon::StringMiddleMasking(table).c_str(), ret);
2040     }
2041 }
2042 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2043 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2044 {
2045     cloudDB_.SetGenCloudVersionCallback(callback);
2046 }
2047 
GetDownloadAssetIndex(TaskId taskId)2048 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2049 {
2050     size_t index = 0u;
2051     std::lock_guard<std::mutex> autoLock(dataLock_);
2052     if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2053         index = resumeTaskInfos_[taskId].lastDownloadIndex;
2054         resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2055     }
2056     return index;
2057 }
2058 
GetCurrentTableUploadBatchIndex()2059 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2060 {
2061     std::lock_guard<std::mutex> autoLock(dataLock_);
2062     return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2063 }
2064 
ResetCurrentTableUploadBatchIndex()2065 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2066 {
2067     std::lock_guard<std::mutex> autoLock(dataLock_);
2068     if (currentContext_.notifier == nullptr) {
2069         return;
2070     }
2071     currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2072 }
2073 
RecordWaterMark(TaskId taskId,Timestamp waterMark)2074 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2075 {
2076     std::lock_guard<std::mutex> autoLock(dataLock_);
2077     resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2078 }
2079 
GetResumeWaterMark(TaskId taskId)2080 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2081 {
2082     std::lock_guard<std::mutex> autoLock(dataLock_);
2083     return resumeTaskInfos_[taskId].lastLocalWatermark;
2084 }
2085 
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2086 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2087 {
2088     InnerProcessInfo info;
2089     info.tableName = tableName;
2090     info.tableStatus = ProcessStatus::PROCESSING;
2091     ReloadUploadInfoIfNeed(uploadParam, info);
2092     return info;
2093 }
2094 
CopyAndClearTaskInfos()2095 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2096 {
2097     std::vector<CloudTaskInfo> infoList;
2098     std::lock_guard<std::mutex> autoLock(dataLock_);
2099     for (const auto &item: cloudTaskInfos_) {
2100         infoList.push_back(item.second);
2101     }
2102     taskQueue_.clear();
2103     cloudTaskInfos_.clear();
2104     resumeTaskInfos_.clear();
2105     currentContext_.notifier = nullptr;
2106     return infoList;
2107 }
2108 
TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)2109 bool CloudSyncer::TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)
2110 {
2111     std::vector<QuerySyncObject> syncQuery;
2112     std::vector<std::string> users;
2113     int errCode = CloudSyncUtils::GetQueryAndUsersForCompensatedSync(
2114         CloudSyncUtils::CanStartAsyncDownload(scheduleTaskCount_), storageProxy_, users, syncQuery);
2115     if (errCode != E_OK) {
2116         LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
2117         // if failed, finshed the task directly.
2118         DoFinished(triggerTaskId, errCode);
2119         return false;
2120     }
2121     if (syncQuery.empty()) {
2122         // if quey is empty, finshed the task directly.
2123         DoFinished(triggerTaskId, E_OK);
2124         return false;
2125     }
2126     std::vector<std::string> userList;
2127     CloudSyncUtils::GetUserListForCompensatedSync(cloudDB_, users, userList);
2128     std::lock_guard<std::mutex> autoLock(dataLock_);
2129     if (cloudTaskInfos_.find(triggerTaskId) == cloudTaskInfos_.end()) {
2130         return false;
2131     }
2132     cloudTaskInfos_[triggerTaskId].users = userList;
2133     cloudTaskInfos_[triggerTaskId].table.clear();
2134     cloudTaskInfos_[triggerTaskId].queryList.clear();
2135     cloudTaskInfos_[triggerTaskId].table.push_back(syncQuery[0].GetRelationTableName());
2136     cloudTaskInfos_[triggerTaskId].queryList.push_back(syncQuery[0]);
2137     return true;
2138 }
2139 
ClearCloudWatermark(std::function<int (void)> & clearFunc)2140 int CloudSyncer::ClearCloudWatermark(std::function<int(void)> &clearFunc)
2141 {
2142     std::lock_guard<std::mutex> lock(syncMutex_);
2143     return clearFunc();
2144 }
2145 
IsCloudForcePush(TaskId taskId)2146 bool CloudSyncer::IsCloudForcePush(TaskId taskId)
2147 {
2148     std::lock_guard<std::mutex> autoLock(dataLock_);
2149     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
2150 }
2151 }