• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_SYNCER_H
17 #define CLOUD_SYNCER_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <utility>
22 
23 #include "cloud/cloud_db_proxy.h"
24 #include "cloud/cloud_store_types.h"
25 #include "cloud/cloud_sync_state_machine.h"
26 #include "cloud/cloud_sync_strategy.h"
27 #include "cloud/icloud_db.h"
28 #include "cloud/icloud_syncer.h"
29 #include "cloud/process_notifier.h"
30 #include "cloud/process_recorder.h"
31 #include "cloud_locker.h"
32 #include "data_transformer.h"
33 #include "db_common.h"
34 #include "ref_object.h"
35 #include "runtime_context.h"
36 #include "storage_proxy.h"
37 #include "store_observer.h"
38 
39 namespace DistributedDB {
40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>;
41 const std::string CLOUD_PRIORITY_TASK_STRING = "priority";
42 const std::string CLOUD_COMMON_TASK_STRING = "common";
43 class CloudSyncer : public ICloudSyncer {
44 public:
45     explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false,
46         SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN);
47     void InitCloudSyncStateMachine();
48     ~CloudSyncer() override = default;
49     DISABLE_COPY_ASSIGN_MOVE(CloudSyncer);
50 
51     int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables,
52         const SyncProcessCallback &callback, int64_t waitTime);
53 
54     int Sync(const CloudTaskInfo &taskInfo);
55 
56     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
57 
58     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
59 
60     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
61         const RelationalSchemaObject &localSchema);
62 
63     int ClearCloudWatermark(const std::vector<std::string> &tableNameList);
64 
65     int StopSyncTask(std::function<int(void)> &removeFunc);
66 
67     int StopTaskBeforeSetReference(std::function<int(void)> &setReferenceFunc);
68 
69     int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList);
70 
71     int32_t GetCloudSyncTaskCount();
72 
73     void Close();
74 
75     void StopAllTasks(int errCode = -E_USER_CHANGE);
76 
77     std::string GetIdentify() const override;
78 
79     bool IsClosed() const override;
80 
81     void GenerateCompensatedSync(CloudTaskInfo &taskInfo);
82 
83     int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs);
84 
85     const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const;
86 
87     void CleanAllWaterMark();
88 
89     CloudSyncEvent SyncMachineDoDownload();
90 
91     CloudSyncEvent SyncMachineDoUpload();
92 
93     CloudSyncEvent SyncMachineDoFinished();
94 
95     void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback);
96 
97     SyncProcess GetCloudTaskStatus(uint64_t taskId) const;
98 
99     int ClearCloudWatermark(std::function<int(void)> &clearFunc);
100 protected:
101     struct TaskContext {
102         TaskId currentTaskId = 0u;
103         std::string tableName;
104         std::shared_ptr<ProcessNotifier> notifier;
105         std::shared_ptr<CloudSyncStrategy> strategy;
106         std::shared_ptr<ProcessRecorder> processRecorder;
107         std::map<TableName, std::vector<Field>> assetFields;
108         // should be cleared after each Download
109         DownloadList assetDownloadList;
110         // store GID and assets, using in upload procedure
111         std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo;
112         // struct: <currentUserIndex, <tableName, waterMark>>
113         std::map<int, std::map<TableName, std::string>> cloudWaterMarks;
114         std::shared_ptr<CloudLocker> locker;
115         bool isNeedUpload = false;  // whether the current task need to do upload
116         bool isRealNeedUpload = false;
117         bool isFirstDownload = false;
118         int currentUserIndex = 0;
119         int repeatCount = 0;
120     };
121     struct UploadParam {
122         int64_t count = 0;
123         TaskId taskId = 0u;
124         Timestamp localMark = 0u;
125         bool lastTable = false;
126         CloudWaterType mode = CloudWaterType::DELETE;
127         LockAction lockAction = LockAction::INSERT;
128     };
129     struct DownloadItem {
130         std::string gid;
131         Type prefix;
132         OpType strategy;
133         std::map<std::string, Assets> assets;
134         Key hashKey;
135         std::vector<Type> primaryKeyValList;
136         Timestamp timestamp;
137         bool recordConflict = false;
138     };
139     struct ResumeTaskInfo {
140         TaskContext context;
141         SyncParam syncParam;
142         bool upload = false; // task pause when upload
143         size_t lastDownloadIndex = 0u;
144         Timestamp lastLocalWatermark = 0u;
145         int downloadStatus = E_OK;
146     };
147     struct DownloadItems {
148         DownloadItem downloadItem;
149         std::map<std::string, Assets> assetsToRemove;
150         std::map<std::string, Assets> assetsToDownload;
151         std::map<std::string, std::vector<uint32_t>> flags;
152     };
153 
154     int TriggerSync();
155 
156     void DoSyncIfNeed();
157 
158     int DoSync(TaskId taskId);
159 
160     int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index);
161 
162     int DoSyncInner(const CloudTaskInfo &taskInfo);
163 
164     int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload);
165 
166     void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
167         const bool isFirstDownload);
168 
169     int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count);
170 
171     void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify);
172 
173     virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload);
174 
175     void SetNeedUpload(bool isNeedUpload);
176 
177     void DoFinished(TaskId taskId, int errCode);
178 
179     virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload);
180 
181     int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
182 
183     void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info);
184 
185     int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark);
186 
187     int PreCheck(TaskId &taskId, const TableName &tableName);
188 
189     int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
190         InnerProcessInfo &innerProcessInfo);
191 
192     int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo);
193 
194     int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
195         CloudSyncData &uploadData);
196 
197     int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam);
198 
199     virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction);
200 
201     void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData);
202 
203     bool IsModeForcePush(TaskId taskId);
204 
205     bool IsModeForcePull(const TaskId taskId);
206 
207     bool IsPriorityTask(TaskId taskId);
208 
209     bool IsCompensatedTask(TaskId taskId);
210 
211     int DoUploadInner(const std::string &tableName, UploadParam &uploadParam);
212 
213     int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info);
214 
215     int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames);
216 
217     int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
218         DownloadData &downloadData);
219 
220     size_t GetCurrentCommonTaskNum();
221 
222     int CheckTaskIdValid(TaskId taskId);
223 
224     int GetCurrentTableName(std::string &tableName);
225 
226     int TryToAddSyncTask(CloudTaskInfo &&taskInfo);
227 
228     int CheckQueueSizeWithNoLock(bool priorityTask);
229 
230     int PrepareSync(TaskId taskId);
231 
232     int LockCloud(TaskId taskId);
233 
234     int UnlockCloud();
235 
236     int StartHeartBeatTimer(int period, TaskId taskId);
237 
238     void FinishHeartBeatTimer();
239 
240     void HeartBeat(TimerId timerId, TaskId taskId);
241 
242     void HeartBeatFailed(TaskId taskId, int errCode);
243 
244     void SetTaskFailed(TaskId taskId, int errCode);
245 
246     int SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
247         std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo);
248 
249     int SaveData(CloudSyncer::TaskId taskId, SyncParam &param);
250 
251     void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
252 
253     int SaveDataInTransaction(CloudSyncer::TaskId taskId,  SyncParam &param);
254 
255     int DoDownloadAssets(SyncParam &param);
256 
257     int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param, bool downloadAssetOnly);
258 
259     void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch);
260 
261     bool NeedNotifyChangedData(const ChangedData &changedData);
262 
263     int NotifyChangedDataInCurrentTask(ChangedData &&changedData);
264 
265     std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
266         bool setNormalStatus, bool isForcePullAseets, int &errCode);
267 
268     int TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo);
269 
270     int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
271         VBucket &localAssetInfo);
272 
273     int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
274         VBucket &localAssetInfo);
275 
276     int TagDownloadAssetsForAssetOnly(
277         const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo, VBucket &localAssetInfo);
278 
279     void TagUploadAssets(CloudSyncData &uploadData);
280 
281     int FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets);
282 
283     int HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
284         DownloadCommitList &commitList, uint32_t &successCount);
285 
286     int HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
287     DownloadCommitList &commitList, uint32_t &successCount);
288 
289     int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
290         VBucket &extend);
291 
292     int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj);
293 
294     int GetCloudGid(
295         TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid);
296 
297     int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
298         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
299 
300     int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
301         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
302 
303     void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem);
304 
305     bool IsDataContainAssets();
306 
307     int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId);
308 
309     bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data);
310 
311     int UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList);
312 
313     void UpdateCloudWaterMark(TaskId taskId, const SyncParam &param);
314 
315     int TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult);
316 
317     int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
318         DownloadCommitList &commitList, int errCode);
319 
320     int GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
321         std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo);
322 
323     TaskId GetNextTaskId();
324 
325     void MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo);
326 
327     void SetCurrentTaskFailedWithoutLock(int errCode);
328 
329     int LockCloudIfNeed(TaskId taskId);
330 
331     void UnlockIfNeed();
332 
333     void ClearCurrentContextWithoutLock();
334 
335     void ClearContextAndNotify(TaskId taskId, int errCode);
336 
337     int DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload);
338 
339     int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
340         DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets);
341 
342     int GetSyncParamForDownload(TaskId taskId, SyncParam &param);
343 
344     bool IsCurrentTaskResume(TaskId taskId);
345 
346     bool IsCurrentTableResume(TaskId taskId, bool upload);
347 
348     int DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool isFirstDownload);
349 
350     size_t GetDownloadAssetIndex(TaskId taskId);
351 
352     uint32_t GetCurrentTableUploadBatchIndex();
353 
354     void ResetCurrentTableUploadBatchIndex();
355 
356     void RecordWaterMark(TaskId taskId, Timestamp waterMark);
357 
358     Timestamp GetResumeWaterMark(TaskId taskId);
359 
360     void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark);
361 
362     void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark);
363 
364     void ReloadUploadInfoIfNeed(const UploadParam &param, InnerProcessInfo &info);
365 
366     void GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo);
367 
368     QuerySyncObject GetQuerySyncObject(const std::string &tableName);
369 
370     InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam);
371 
372     void NotifyUploadFailed(int errCode, InnerProcessInfo &info);
373 
374     void UpdateProcessWhenUploadFailed(InnerProcessInfo &info);
375 
376     int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
377 
378     int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
379 
380     int BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
381         bool isInsert);
382 
383     int BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult);
384 
385     int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
386 
387     int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
388         std::map<std::string, Assets> &downloadAssets);
389 
390     std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
391         const DownloadItem &downloadItem, VBucket &dbAssets);
392 
393     std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
394         std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
395         std::map<std::string, Assets> &tmpAssetsToDelete);
396 
397     int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
398         const DownloadItem &downloadItem, VBucket &dbAssets);
399 
400     bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem);
401 
402     int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem,
403         std::map<std::string, Assets> &downloadAssets);
404 
405     int CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
406         DownloadCommitList &commitList, uint32_t &successCount);
407 
408     int CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
409         DownloadCommitList &commitList, uint32_t &successCount);
410 
411     void SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
412         VBucket &failedAssets);
413 
414     void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam);
415 
416     int SaveCursorIfNeed(const std::string &tableName);
417 
418     int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload);
419 
420     int UpdateFlagForSavedRecord(const SyncParam &param);
421 
422     bool IsNeedGetLocalWater(TaskId taskId);
423 
424     bool IsNeedProcessCloudCursor(TaskId taskId);
425 
426     void SetProxyUser(const std::string &user);
427 
428     void MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId);
429 
430     void RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId);
431 
432     std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId);
433 
434     bool IsTaskCanMerge(const CloudTaskInfo &taskInfo);
435 
436     bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId);
437 
438     bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId);
439 
440     void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo);
441 
442     std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target);
443 
444     bool IsQueryListEmpty(TaskId taskId);
445 
446     std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam);
447 
448     int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData,
449         ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData);
450 
451     bool IsNeedLock(const UploadParam &param);
452 
453     int UploadVersionRecordIfNeed(const UploadParam &uploadParam);
454 
455     std::vector<CloudTaskInfo> CopyAndClearTaskInfos();
456 
457     void WaitCurTaskFinished();
458 
459     bool IsLockInDownload();
460 
461     CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode);
462 
463     CloudSyncEvent SyncMachineDoRepeatCheck();
464 
465     void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true);
466 
467     bool IsTableFinishInUpload(const std::string &table);
468 
469     void MarkUploadFinishIfNeed(const std::string &table);
470 
471     int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo);
472 
473     void ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount);
474 
475     std::string GetStoreIdByTask(TaskId taskId);
476 
477     int CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
478         InnerProcessInfo &info, ChangedData &changedAssets);
479 
480     void FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
481         const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record);
482 
483     using DownloadItemRecords = std::vector<DownloadItems>;
484     using RemoveAssetsRecords = std::vector<IAssetLoader::AssetRecord>;
485     using DownloadAssetsRecords = std::vector<IAssetLoader::AssetRecord>;
486     using DownloadAssetDetail = std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords>;
487     DownloadAssetDetail GetDownloadRecords(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
488         bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info);
489 
490     int BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
491         InnerProcessInfo &info, ChangedData &changedAssets,
492         std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail);
493 
494     static void StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
495         const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem);
496 
497     static void AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
498         ChangedData &changedAssets);
499 
500     void CheckDataAfterDownload(const std::string &tableName);
501 
502     bool IsAsyncDownloadAssets(TaskId taskId);
503 
504     void TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload);
505 
506     void TriggerAsyncDownloadAssetsIfNeed();
507 
508     void BackgroundDownloadAssetsTask();
509 
510     void CancelDownloadListener();
511 
512     void DoBackgroundDownloadAssets();
513 
514     void CancelBackgroundDownloadAssetsTaskIfNeed();
515 
516     void CancelBackgroundDownloadAssetsTask(bool cancelDownload = true);
517 
518     int BackgroundDownloadAssetsByTable(const std::string &table, std::map<std::string, int64_t> &downloadBeginTime);
519 
520     int CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam &param);
521 
522     int CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam &param, DataInfoWithLog &logInfo);
523 
524     int PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam &param, std::vector<VBucket> &localInfo);
525 
526     bool IsCurrentAsyncDownloadTask();
527 
528     int GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj, VBucket &extend);
529 
530     int QueryCloudGidForAssetsOnly(
531         TaskId taskId, SyncParam &param, int64_t groupIdx, std::vector<std::string> &cloudGid);
532 
533     int GetEmptyGidAssetsMapFromDownloadData(
534         const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap);
535 
536     int SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId, SyncParam &param, std::vector<VBucket> &downloadData,
537         std::map<std::string, AssetsMap> &gidAssetsMap);
538 
539     void NotifyChangedDataWithDefaultDev(ChangedData &&changedData);
540 
541     bool IsAlreadyHaveCompensatedSyncTask();
542 
543     bool TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId);
544 
545     int FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
546         InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail);
547 
548     int UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem, InnerProcessInfo &info,
549         bool isExistAssetDownloadFail);
550 
551     static void ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info);
552 
553     void ChangeProcessStatusAndNotifyIfNeed(UploadParam &uploadParam, InnerProcessInfo &info);
554 
555     void ExecuteAsyncDownloadAssets(TaskId taskId);
556 
557     bool IsCloudForcePush(TaskId taskId);
558 
559     mutable std::mutex dataLock_;
560     TaskId lastTaskId_;
561     std::multimap<int, TaskId, std::greater<int>> taskQueue_;
562     std::map<TaskId, CloudTaskInfo> cloudTaskInfos_;
563     std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_;
564 
565     TaskContext currentContext_;
566     std::condition_variable contextCv_;
567     std::mutex syncMutex_;  // Clean Cloud Data and Sync are mutually exclusive
568 
569     CloudDBProxy cloudDB_;
570 
571     std::shared_ptr<StorageProxy> storageProxy_;
572     std::atomic<int32_t> queuedManualSyncLimit_;
573 
574     std::atomic<bool> closed_;
575     std::atomic<bool> hasKvRemoveTask;
576     std::atomic<TimerId> timerId_;
577     std::mutex heartbeatMutex_;
578     std::condition_variable heartbeatCv_;
579     std::map<TaskId, int32_t> heartbeatCount_;
580     std::map<TaskId, int32_t> failedHeartbeatCount_;
581 
582     std::string id_;
583 
584     // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios:
585     // 1. Whether upload to the cloud after delete local data that does not have a gid.
586     // 2. Whether the local data need update for different flag when the local time is larger.
587     bool isKvScene_;
588     std::atomic<SingleVerConflictResolvePolicy> policy_;
589     std::condition_variable asyncTaskCv_;
590     TaskId asyncTaskId_;
591     std::atomic<bool> cancelAsyncTask_;
592     std::atomic<int> scheduleTaskCount_;
593     std::mutex listenerMutex_;
594     NotificationChain::Listener *waitDownloadListener_;
595 
596     static constexpr const TaskId INVALID_TASK_ID = 0u;
597     static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2;
598     static constexpr const int HEARTBEAT_PERIOD = 3;
599 
600     CloudSyncStateMachine cloudSyncStateMachine_;
601 };
602 }
603 #endif // CLOUD_SYNCER_H
604