• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_SYNCER_H
17 #define CLOUD_SYNCER_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <utility>
22 
23 #include "cloud/cloud_db_proxy.h"
24 #include "cloud/cloud_store_types.h"
25 #include "cloud/cloud_sync_state_machine.h"
26 #include "cloud/cloud_sync_strategy.h"
27 #include "cloud/icloud_db.h"
28 #include "cloud/icloud_syncer.h"
29 #include "cloud/process_notifier.h"
30 #include "cloud/process_recorder.h"
31 #include "cloud_locker.h"
32 #include "data_transformer.h"
33 #include "db_common.h"
34 #include "ref_object.h"
35 #include "runtime_context.h"
36 #include "storage_proxy.h"
37 #include "store_observer.h"
38 
39 namespace DistributedDB {
40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>;
41 const std::string CLOUD_PRIORITY_TASK_STRING = "priority";
42 const std::string CLOUD_COMMON_TASK_STRING = "common";
43 class CloudSyncer : public ICloudSyncer {
44 public:
45     explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false,
46         SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN);
47     void InitCloudSyncStateMachine();
48     ~CloudSyncer() override = default;
49     DISABLE_COPY_ASSIGN_MOVE(CloudSyncer);
50 
51     int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables,
52         const SyncProcessCallback &callback, int64_t waitTime);
53 
54     int Sync(const CloudTaskInfo &taskInfo);
55 
56     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
57 
58     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
59 
60     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
61         const RelationalSchemaObject &localSchema);
62 
63     int CleanKvCloudData(std::function<int(void)> &removeFunc);
64 
65     int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList);
66 
67     int32_t GetCloudSyncTaskCount();
68 
69     void Close();
70 
71     void StopAllTasks(int errCode = -E_USER_CHANGE);
72 
73     std::string GetIdentify() const override;
74 
75     bool IsClosed() const override;
76 
77     void GenerateCompensatedSync(CloudTaskInfo &taskInfo);
78 
79     int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs);
80 
81     const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const;
82 
83     void CleanAllWaterMark();
84 
85     CloudSyncEvent SyncMachineDoDownload();
86 
87     CloudSyncEvent SyncMachineDoUpload();
88 
89     CloudSyncEvent SyncMachineDoFinished();
90 
91     void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback);
92 
93     SyncProcess GetCloudTaskStatus(uint64_t taskId) const;
94 protected:
95     struct TaskContext {
96         TaskId currentTaskId = 0u;
97         std::string tableName;
98         std::shared_ptr<ProcessNotifier> notifier;
99         std::shared_ptr<CloudSyncStrategy> strategy;
100         std::shared_ptr<ProcessRecorder> processRecorder;
101         std::map<TableName, std::vector<Field>> assetFields;
102         // should be cleared after each Download
103         DownloadList assetDownloadList;
104         // store GID and assets, using in upload procedure
105         std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo;
106         // struct: <currentUserIndex, <tableName, waterMark>>
107         std::map<int, std::map<TableName, std::string>> cloudWaterMarks;
108         std::shared_ptr<CloudLocker> locker;
109         bool isNeedUpload = false;  // whether the current task need to do upload
110         bool isRealNeedUpload = false;
111         bool isFirstDownload = false;
112         int currentUserIndex = 0;
113         int repeatCount = 0;
114     };
115     struct UploadParam {
116         int64_t count = 0;
117         TaskId taskId = 0u;
118         Timestamp localMark = 0u;
119         bool lastTable = false;
120         CloudWaterType mode = CloudWaterType::DELETE;
121         LockAction lockAction = LockAction::INSERT;
122     };
123     struct DownloadItem {
124         std::string gid;
125         Type prefix;
126         OpType strategy;
127         std::map<std::string, Assets> assets;
128         Key hashKey;
129         std::vector<Type> primaryKeyValList;
130         Timestamp timestamp;
131         bool recordConflict = false;
132     };
133     struct ResumeTaskInfo {
134         TaskContext context;
135         SyncParam syncParam;
136         bool upload = false; // task pause when upload
137         size_t lastDownloadIndex = 0u;
138         Timestamp lastLocalWatermark = 0u;
139         int downloadStatus = E_OK;
140     };
141     struct DownloadItems {
142         DownloadItem downloadItem;
143         std::map<std::string, Assets> assetsToRemove;
144         std::map<std::string, Assets> assetsToDownload;
145         std::map<std::string, std::vector<uint32_t>> flags;
146     };
147 
148     int TriggerSync();
149 
150     void DoSyncIfNeed();
151 
152     int DoSync(TaskId taskId);
153 
154     int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index);
155 
156     int DoSyncInner(const CloudTaskInfo &taskInfo);
157 
158     int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload);
159 
160     void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
161         const bool isFirstDownload);
162 
163     int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count);
164 
165     void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify);
166 
167     virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload);
168 
169     void SetNeedUpload(bool isNeedUpload);
170 
171     void DoFinished(TaskId taskId, int errCode);
172 
173     virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload);
174 
175     int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
176 
177     void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info);
178 
179     int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark);
180 
181     int PreCheck(TaskId &taskId, const TableName &tableName);
182 
183     int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
184         InnerProcessInfo &innerProcessInfo);
185 
186     int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo);
187 
188     int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
189         CloudSyncData &uploadData);
190 
191     int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam);
192 
193     virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction);
194 
195     void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData);
196 
197     bool IsModeForcePush(TaskId taskId);
198 
199     bool IsModeForcePull(const TaskId taskId);
200 
201     bool IsPriorityTask(TaskId taskId);
202 
203     bool IsCompensatedTask(TaskId taskId);
204 
205     int DoUploadInner(const std::string &tableName, UploadParam &uploadParam);
206 
207     int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info);
208 
209     int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames);
210 
211     int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
212         DownloadData &downloadData);
213 
214     size_t GetCurrentCommonTaskNum();
215 
216     int CheckTaskIdValid(TaskId taskId);
217 
218     int GetCurrentTableName(std::string &tableName);
219 
220     int TryToAddSyncTask(CloudTaskInfo &&taskInfo);
221 
222     int CheckQueueSizeWithNoLock(bool priorityTask);
223 
224     int PrepareSync(TaskId taskId);
225 
226     int LockCloud(TaskId taskId);
227 
228     int UnlockCloud();
229 
230     int StartHeartBeatTimer(int period, TaskId taskId);
231 
232     void FinishHeartBeatTimer();
233 
234     void HeartBeat(TimerId timerId, TaskId taskId);
235 
236     void HeartBeatFailed(TaskId taskId, int errCode);
237 
238     void SetTaskFailed(TaskId taskId, int errCode);
239 
240     int SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
241         std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo);
242 
243     int SaveData(CloudSyncer::TaskId taskId, SyncParam &param);
244 
245     void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
246 
247     int SaveDataInTransaction(CloudSyncer::TaskId taskId,  SyncParam &param);
248 
249     int DoDownloadAssets(SyncParam &param);
250 
251     int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param, bool downloadAssetOnly);
252 
253     void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch);
254 
255     bool NeedNotifyChangedData(const ChangedData &changedData);
256 
257     int NotifyChangedDataInCurrentTask(ChangedData &&changedData);
258 
259     std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
260         bool setNormalStatus, bool isForcePullAseets, int &errCode);
261 
262     int TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo);
263 
264     int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
265         VBucket &localAssetInfo);
266 
267     int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
268         VBucket &localAssetInfo);
269 
270     int TagDownloadAssetsForAssetOnly(
271         const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo, VBucket &localAssetInfo);
272 
273     void TagUploadAssets(CloudSyncData &uploadData);
274 
275     int FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets);
276 
277     int HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
278         DownloadCommitList &commitList, uint32_t &successCount);
279 
280     int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
281         VBucket &extend);
282 
283     int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj);
284 
285     int GetCloudGid(
286         TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid);
287 
288     int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
289         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
290 
291     int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
292         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
293 
294     void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem);
295 
296     bool IsDataContainAssets();
297 
298     int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId);
299 
300     bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data);
301 
302     int UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList);
303 
304     void UpdateCloudWaterMark(TaskId taskId, const SyncParam &param);
305 
306     int TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult);
307 
308     int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
309         DownloadCommitList &commitList, int errCode);
310 
311     int GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
312         std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo);
313 
314     TaskId GetNextTaskId();
315 
316     void MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo);
317 
318     void SetCurrentTaskFailedWithoutLock(int errCode);
319 
320     int LockCloudIfNeed(TaskId taskId);
321 
322     void UnlockIfNeed();
323 
324     void ClearCurrentContextWithoutLock();
325 
326     void ClearContextAndNotify(TaskId taskId, int errCode);
327 
328     int DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload);
329 
330     int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
331         DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets);
332 
333     int GetSyncParamForDownload(TaskId taskId, SyncParam &param);
334 
335     bool IsCurrentTaskResume(TaskId taskId);
336 
337     bool IsCurrentTableResume(TaskId taskId, bool upload);
338 
339     int DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool isFirstDownload);
340 
341     size_t GetDownloadAssetIndex(TaskId taskId);
342 
343     uint32_t GetCurrentTableUploadBatchIndex();
344 
345     void ResetCurrentTableUploadBatchIndex();
346 
347     void RecordWaterMark(TaskId taskId, Timestamp waterMark);
348 
349     Timestamp GetResumeWaterMark(TaskId taskId);
350 
351     void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark);
352 
353     void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark);
354 
355     void ReloadUploadInfoIfNeed(const UploadParam &param, InnerProcessInfo &info);
356 
357     void GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo);
358 
359     QuerySyncObject GetQuerySyncObject(const std::string &tableName);
360 
361     InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam);
362 
363     void NotifyUploadFailed(int errCode, InnerProcessInfo &info);
364 
365     void UpdateProcessWhenUploadFailed(InnerProcessInfo &info);
366 
367     int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
368 
369     int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
370 
371     int BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
372         bool isInsert);
373 
374     int BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult);
375 
376     int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
377 
378     int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
379         std::map<std::string, Assets> &downloadAssets);
380 
381     std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
382         const DownloadItem &downloadItem, VBucket &dbAssets);
383 
384     std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
385         std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
386         std::map<std::string, Assets> &tmpAssetsToDelete);
387 
388     int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
389         const DownloadItem &downloadItem, VBucket &dbAssets);
390 
391     bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem);
392 
393     int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem,
394         std::map<std::string, Assets> &downloadAssets);
395 
396     int CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
397         DownloadCommitList &commitList, uint32_t &successCount);
398 
399     int CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
400         DownloadCommitList &commitList, uint32_t &successCount);
401 
402     void SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
403         VBucket &failedAssets);
404 
405     void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam);
406 
407     int SaveCursorIfNeed(const std::string &tableName);
408 
409     int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload);
410 
411     int UpdateFlagForSavedRecord(const SyncParam &param);
412 
413     bool IsNeedGetLocalWater(TaskId taskId);
414 
415     void SetProxyUser(const std::string &user);
416 
417     void MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId);
418 
419     void RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId);
420 
421     std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId);
422 
423     bool IsTaskCanMerge(const CloudTaskInfo &taskInfo);
424 
425     bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId);
426 
427     bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId);
428 
429     void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo);
430 
431     std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target);
432 
433     bool IsQueryListEmpty(TaskId taskId);
434 
435     std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam);
436 
437     int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData,
438         ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData);
439 
440     bool IsNeedLock(const UploadParam &param);
441 
442     int UploadVersionRecordIfNeed(const UploadParam &uploadParam);
443 
444     std::vector<CloudTaskInfo> CopyAndClearTaskInfos();
445 
446     void WaitCurTaskFinished();
447 
448     bool IsLockInDownload();
449 
450     CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode);
451 
452     CloudSyncEvent SyncMachineDoRepeatCheck();
453 
454     void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true);
455 
456     bool IsTableFinishInUpload(const std::string &table);
457 
458     void MarkUploadFinishIfNeed(const std::string &table);
459 
460     int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo);
461 
462     void ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount);
463 
464     std::string GetStoreIdByTask(TaskId taskId);
465 
466     int CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
467         InnerProcessInfo &info, ChangedData &changedAssets);
468 
469     void FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
470         const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record);
471 
472     using DownloadItemRecords = std::vector<DownloadItems>;
473     using RemoveAssetsRecords = std::vector<IAssetLoader::AssetRecord>;
474     using DownloadAssetsRecords = std::vector<IAssetLoader::AssetRecord>;
475     using DownloadAssetDetail = std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords>;
476     DownloadAssetDetail GetDownloadRecords(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
477         bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info);
478 
479     int BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
480         InnerProcessInfo &info, ChangedData &changedAssets,
481         std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail);
482 
483     static void StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
484         const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem);
485 
486     static void AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
487         ChangedData &changedAssets);
488 
489     void CheckDataAfterDownload(const std::string &tableName);
490 
491     bool IsAsyncDownloadAssets(TaskId taskId);
492 
493     void TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload);
494 
495     void TriggerAsyncDownloadAssetsIfNeed();
496 
497     void BackgroundDownloadAssetsTask();
498 
499     void CancelDownloadListener();
500 
501     void DoBackgroundDownloadAssets();
502 
503     void CancelBackgroundDownloadAssetsTaskIfNeed();
504 
505     void CancelBackgroundDownloadAssetsTask(bool cancelDownload = true);
506 
507     int BackgroundDownloadAssetsByTable(const std::string &table, std::map<std::string, int64_t> &downloadBeginTime);
508 
509     int CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam &param);
510 
511     int CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam &param, DataInfoWithLog &logInfo);
512 
513     int PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam &param, std::vector<VBucket> &localInfo);
514 
515     bool IsCurrentAsyncDownloadTask();
516 
517     bool CanStartAsyncDownload() const;
518 
519     int GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj, VBucket &extend);
520 
521     int QueryCloudGidForAssetsOnly(
522         TaskId taskId, SyncParam &param, int64_t groupIdx, std::vector<std::string> &cloudGid);
523 
524     int GetEmptyGidAssetsMapFromDownloadData(
525         const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap);
526 
527     int SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId, SyncParam &param, std::vector<VBucket> &downloadData,
528         std::map<std::string, AssetsMap> &gidAssetsMap);
529 
530     void NotifyChangedDataWithDefaultDev(ChangedData &&changedData);
531 
532     bool IsAlreadyHaveCompensatedSyncTask();
533 
534     bool TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId);
535 
536     int FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
537         InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail);
538 
539     int UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem, InnerProcessInfo &info,
540         bool isExistAssetDownloadFail);
541 
542     static void ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info);
543 
544     mutable std::mutex dataLock_;
545     TaskId lastTaskId_;
546     std::multimap<int, TaskId, std::greater<int>> taskQueue_;
547     std::map<TaskId, CloudTaskInfo> cloudTaskInfos_;
548     std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_;
549 
550     TaskContext currentContext_;
551     std::condition_variable contextCv_;
552     std::mutex syncMutex_;  // Clean Cloud Data and Sync are mutually exclusive
553 
554     CloudDBProxy cloudDB_;
555 
556     std::shared_ptr<StorageProxy> storageProxy_;
557     std::atomic<int32_t> queuedManualSyncLimit_;
558 
559     std::atomic<bool> closed_;
560     std::atomic<bool> hasKvRemoveTask;
561     std::atomic<TimerId> timerId_;
562     std::mutex heartbeatMutex_;
563     std::condition_variable heartbeatCv_;
564     std::map<TaskId, int32_t> heartbeatCount_;
565     std::map<TaskId, int32_t> failedHeartbeatCount_;
566 
567     std::string id_;
568 
569     // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios:
570     // 1. Whether upload to the cloud after delete local data that does not have a gid.
571     // 2. Whether the local data need update for different flag when the local time is larger.
572     bool isKvScene_;
573     std::atomic<SingleVerConflictResolvePolicy> policy_;
574     std::condition_variable asyncTaskCv_;
575     TaskId asyncTaskId_;
576     std::atomic<bool> cancelAsyncTask_;
577     std::mutex listenerMutex_;
578     NotificationChain::Listener *waitDownloadListener_;
579 
580     static constexpr const TaskId INVALID_TASK_ID = 0u;
581     static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2;
582     static constexpr const int HEARTBEAT_PERIOD = 3;
583 
584     CloudSyncStateMachine cloudSyncStateMachine_;
585 };
586 }
587 #endif // CLOUD_SYNCER_H
588