1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_SYNCER_H 17 #define CLOUD_SYNCER_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <utility> 22 23 #include "cloud/cloud_db_proxy.h" 24 #include "cloud/cloud_store_types.h" 25 #include "cloud/cloud_sync_state_machine.h" 26 #include "cloud/cloud_sync_strategy.h" 27 #include "cloud/icloud_db.h" 28 #include "cloud/icloud_syncer.h" 29 #include "cloud/process_notifier.h" 30 #include "cloud/process_recorder.h" 31 #include "cloud_locker.h" 32 #include "data_transformer.h" 33 #include "db_common.h" 34 #include "ref_object.h" 35 #include "runtime_context.h" 36 #include "storage_proxy.h" 37 #include "store_observer.h" 38 39 namespace DistributedDB { 40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>; 41 const std::string CLOUD_PRIORITY_TASK_STRING = "priority"; 42 const std::string CLOUD_COMMON_TASK_STRING = "common"; 43 class CloudSyncer : public ICloudSyncer { 44 public: 45 explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false, 46 SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN); 47 void InitCloudSyncStateMachine(); 48 ~CloudSyncer() override = default; 49 DISABLE_COPY_ASSIGN_MOVE(CloudSyncer); 50 51 int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables, 52 const SyncProcessCallback &callback, int64_t waitTime); 53 54 int Sync(const CloudTaskInfo &taskInfo); 55 56 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 57 58 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 59 60 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 61 const RelationalSchemaObject &localSchema); 62 63 int ClearCloudWatermark(const std::vector<std::string> &tableNameList); 64 65 int StopSyncTask(std::function<int(void)> &removeFunc); 66 67 int StopTaskBeforeSetReference(std::function<int(void)> &setReferenceFunc); 68 69 int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList); 70 71 int32_t GetCloudSyncTaskCount(); 72 73 void Close(); 74 75 void StopAllTasks(int errCode = -E_USER_CHANGE); 76 77 std::string GetIdentify() const override; 78 79 bool IsClosed() const override; 80 81 void GenerateCompensatedSync(CloudTaskInfo &taskInfo); 82 83 int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs); 84 85 const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const; 86 87 void CleanAllWaterMark(); 88 89 CloudSyncEvent SyncMachineDoDownload(); 90 91 CloudSyncEvent SyncMachineDoUpload(); 92 93 CloudSyncEvent SyncMachineDoFinished(); 94 95 void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); 96 97 SyncProcess GetCloudTaskStatus(uint64_t taskId) const; 98 99 int ClearCloudWatermark(std::function<int(void)> &clearFunc); 100 protected: 101 struct TaskContext { 102 TaskId currentTaskId = 0u; 103 std::string tableName; 104 std::shared_ptr<ProcessNotifier> notifier; 105 std::shared_ptr<CloudSyncStrategy> strategy; 106 std::shared_ptr<ProcessRecorder> processRecorder; 107 std::map<TableName, std::vector<Field>> assetFields; 108 // should be cleared after each Download 109 DownloadList assetDownloadList; 110 // store GID and assets, using in upload procedure 111 std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo; 112 // struct: <currentUserIndex, <tableName, waterMark>> 113 std::map<int, std::map<TableName, std::string>> cloudWaterMarks; 114 std::shared_ptr<CloudLocker> locker; 115 bool isNeedUpload = false; // whether the current task need to do upload 116 bool isRealNeedUpload = false; 117 bool isFirstDownload = false; 118 int currentUserIndex = 0; 119 int repeatCount = 0; 120 }; 121 struct UploadParam { 122 int64_t count = 0; 123 TaskId taskId = 0u; 124 Timestamp localMark = 0u; 125 bool lastTable = false; 126 CloudWaterType mode = CloudWaterType::DELETE; 127 LockAction lockAction = LockAction::INSERT; 128 }; 129 struct DownloadItem { 130 std::string gid; 131 Type prefix; 132 OpType strategy; 133 std::map<std::string, Assets> assets; 134 Key hashKey; 135 std::vector<Type> primaryKeyValList; 136 Timestamp timestamp; 137 bool recordConflict = false; 138 }; 139 struct ResumeTaskInfo { 140 TaskContext context; 141 SyncParam syncParam; 142 bool upload = false; // task pause when upload 143 size_t lastDownloadIndex = 0u; 144 Timestamp lastLocalWatermark = 0u; 145 int downloadStatus = E_OK; 146 }; 147 struct DownloadItems { 148 DownloadItem downloadItem; 149 std::map<std::string, Assets> assetsToRemove; 150 std::map<std::string, Assets> assetsToDownload; 151 std::map<std::string, std::vector<uint32_t>> flags; 152 }; 153 154 int TriggerSync(); 155 156 void DoSyncIfNeed(); 157 158 int DoSync(TaskId taskId); 159 160 int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index); 161 162 int DoSyncInner(const CloudTaskInfo &taskInfo); 163 164 int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); 165 166 void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables, 167 const bool isFirstDownload); 168 169 int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count); 170 171 void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify); 172 173 virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload); 174 175 void SetNeedUpload(bool isNeedUpload); 176 177 void DoFinished(TaskId taskId, int errCode); 178 179 virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload); 180 181 int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 182 183 void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); 184 185 int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark); 186 187 int PreCheck(TaskId &taskId, const TableName &tableName); 188 189 int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData, 190 InnerProcessInfo &innerProcessInfo); 191 192 int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo); 193 194 int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, 195 CloudSyncData &uploadData); 196 197 int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); 198 199 virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction); 200 201 void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); 202 203 bool IsModeForcePush(TaskId taskId); 204 205 bool IsModeForcePull(const TaskId taskId); 206 207 bool IsPriorityTask(TaskId taskId); 208 209 bool IsCompensatedTask(TaskId taskId); 210 211 int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); 212 213 int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info); 214 215 int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames); 216 217 int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark, 218 DownloadData &downloadData); 219 220 size_t GetCurrentCommonTaskNum(); 221 222 int CheckTaskIdValid(TaskId taskId); 223 224 int GetCurrentTableName(std::string &tableName); 225 226 int TryToAddSyncTask(CloudTaskInfo &&taskInfo); 227 228 int CheckQueueSizeWithNoLock(bool priorityTask); 229 230 int PrepareSync(TaskId taskId); 231 232 int LockCloud(TaskId taskId); 233 234 int UnlockCloud(); 235 236 int StartHeartBeatTimer(int period, TaskId taskId); 237 238 void FinishHeartBeatTimer(); 239 240 void HeartBeat(TimerId timerId, TaskId taskId); 241 242 void HeartBeatFailed(TaskId taskId, int errCode); 243 244 void SetTaskFailed(TaskId taskId, int errCode); 245 246 int SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList, 247 std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo); 248 249 int SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m); 250 251 void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 252 253 int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); 254 255 int DoDownloadAssets(SyncParam ¶m); 256 257 int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m, bool downloadAssetOnly); 258 259 void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); 260 261 bool NeedNotifyChangedData(const ChangedData &changedData); 262 263 int NotifyChangedDataInCurrentTask(ChangedData &&changedData); 264 265 std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, 266 bool setNormalStatus, bool isForcePullAseets, int &errCode); 267 268 int TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); 269 270 int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 271 VBucket &localAssetInfo); 272 273 int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, 274 VBucket &localAssetInfo); 275 276 int TagDownloadAssetsForAssetOnly( 277 const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, VBucket &localAssetInfo); 278 279 void TagUploadAssets(CloudSyncData &uploadData); 280 281 int FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets); 282 283 int HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info, 284 DownloadCommitList &commitList, uint32_t &successCount); 285 286 int HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, 287 DownloadCommitList &commitList, uint32_t &successCount); 288 289 int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark, 290 VBucket &extend); 291 292 int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj); 293 294 int GetCloudGid( 295 TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid); 296 297 int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames, 298 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 299 300 int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList, 301 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 302 303 void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem); 304 305 bool IsDataContainAssets(); 306 307 int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId); 308 309 bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data); 310 311 int UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList); 312 313 void UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m); 314 315 int TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult); 316 317 int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info, 318 DownloadCommitList &commitList, int errCode); 319 320 int GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo, 321 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo); 322 323 TaskId GetNextTaskId(); 324 325 void MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo); 326 327 void SetCurrentTaskFailedWithoutLock(int errCode); 328 329 int LockCloudIfNeed(TaskId taskId); 330 331 void UnlockIfNeed(); 332 333 void ClearCurrentContextWithoutLock(); 334 335 void ClearContextAndNotify(TaskId taskId, int errCode); 336 337 int DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload); 338 339 int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList, 340 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets); 341 342 int GetSyncParamForDownload(TaskId taskId, SyncParam ¶m); 343 344 bool IsCurrentTaskResume(TaskId taskId); 345 346 bool IsCurrentTableResume(TaskId taskId, bool upload); 347 348 int DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isFirstDownload); 349 350 size_t GetDownloadAssetIndex(TaskId taskId); 351 352 uint32_t GetCurrentTableUploadBatchIndex(); 353 354 void ResetCurrentTableUploadBatchIndex(); 355 356 void RecordWaterMark(TaskId taskId, Timestamp waterMark); 357 358 Timestamp GetResumeWaterMark(TaskId taskId); 359 360 void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark); 361 362 void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark); 363 364 void ReloadUploadInfoIfNeed(const UploadParam ¶m, InnerProcessInfo &info); 365 366 void GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo); 367 368 QuerySyncObject GetQuerySyncObject(const std::string &tableName); 369 370 InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam); 371 372 void NotifyUploadFailed(int errCode, InnerProcessInfo &info); 373 374 void UpdateProcessWhenUploadFailed(InnerProcessInfo &info); 375 376 int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 377 378 int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 379 380 int BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo, 381 bool isInsert); 382 383 int BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult); 384 385 int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 386 387 int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem, 388 std::map<std::string, Assets> &downloadAssets); 389 390 std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info, 391 const DownloadItem &downloadItem, VBucket &dbAssets); 392 393 std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode, 394 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload, 395 std::map<std::string, Assets> &tmpAssetsToDelete); 396 397 int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info, 398 const DownloadItem &downloadItem, VBucket &dbAssets); 399 400 bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem); 401 402 int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem, 403 std::map<std::string, Assets> &downloadAssets); 404 405 int CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info, 406 DownloadCommitList &commitList, uint32_t &successCount); 407 408 int CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, 409 DownloadCommitList &commitList, uint32_t &successCount); 410 411 void SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets, 412 VBucket &failedAssets); 413 414 void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam); 415 416 int SaveCursorIfNeed(const std::string &tableName); 417 418 int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload); 419 420 int UpdateFlagForSavedRecord(const SyncParam ¶m); 421 422 bool IsNeedGetLocalWater(TaskId taskId); 423 424 bool IsNeedProcessCloudCursor(TaskId taskId); 425 426 void SetProxyUser(const std::string &user); 427 428 void MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId); 429 430 void RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId); 431 432 std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId); 433 434 bool IsTaskCanMerge(const CloudTaskInfo &taskInfo); 435 436 bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId); 437 438 bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId); 439 440 void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo); 441 442 std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target); 443 444 bool IsQueryListEmpty(TaskId taskId); 445 446 std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam); 447 448 int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData, 449 ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData); 450 451 bool IsNeedLock(const UploadParam ¶m); 452 453 int UploadVersionRecordIfNeed(const UploadParam &uploadParam); 454 455 std::vector<CloudTaskInfo> CopyAndClearTaskInfos(); 456 457 void WaitCurTaskFinished(); 458 459 bool IsLockInDownload(); 460 461 CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode); 462 463 CloudSyncEvent SyncMachineDoRepeatCheck(); 464 465 void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true); 466 467 bool IsTableFinishInUpload(const std::string &table); 468 469 void MarkUploadFinishIfNeed(const std::string &table); 470 471 int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo); 472 473 void ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount); 474 475 std::string GetStoreIdByTask(TaskId taskId); 476 477 int CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 478 InnerProcessInfo &info, ChangedData &changedAssets); 479 480 void FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList, 481 const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record); 482 483 using DownloadItemRecords = std::vector<DownloadItems>; 484 using RemoveAssetsRecords = std::vector<IAssetLoader::AssetRecord>; 485 using DownloadAssetsRecords = std::vector<IAssetLoader::AssetRecord>; 486 using DownloadAssetDetail = std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords>; 487 DownloadAssetDetail GetDownloadRecords(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 488 bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info); 489 490 int BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 491 InnerProcessInfo &info, ChangedData &changedAssets, 492 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail); 493 494 static void StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord, 495 const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem); 496 497 static void AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem, 498 ChangedData &changedAssets); 499 500 void CheckDataAfterDownload(const std::string &tableName); 501 502 bool IsAsyncDownloadAssets(TaskId taskId); 503 504 void TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload); 505 506 void TriggerAsyncDownloadAssetsIfNeed(); 507 508 void BackgroundDownloadAssetsTask(); 509 510 void CancelDownloadListener(); 511 512 void DoBackgroundDownloadAssets(); 513 514 void CancelBackgroundDownloadAssetsTaskIfNeed(); 515 516 void CancelBackgroundDownloadAssetsTask(bool cancelDownload = true); 517 518 int BackgroundDownloadAssetsByTable(const std::string &table, std::map<std::string, int64_t> &downloadBeginTime); 519 520 int CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam ¶m); 521 522 int CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam ¶m, DataInfoWithLog &logInfo); 523 524 int PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam ¶m, std::vector<VBucket> &localInfo); 525 526 bool IsCurrentAsyncDownloadTask(); 527 528 int GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj, VBucket &extend); 529 530 int QueryCloudGidForAssetsOnly( 531 TaskId taskId, SyncParam ¶m, int64_t groupIdx, std::vector<std::string> &cloudGid); 532 533 int GetEmptyGidAssetsMapFromDownloadData( 534 const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap); 535 536 int SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId, SyncParam ¶m, std::vector<VBucket> &downloadData, 537 std::map<std::string, AssetsMap> &gidAssetsMap); 538 539 void NotifyChangedDataWithDefaultDev(ChangedData &&changedData); 540 541 bool IsAlreadyHaveCompensatedSyncTask(); 542 543 bool TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId); 544 545 int FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap, 546 InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail); 547 548 int UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem, InnerProcessInfo &info, 549 bool isExistAssetDownloadFail); 550 551 static void ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info); 552 553 void ChangeProcessStatusAndNotifyIfNeed(UploadParam &uploadParam, InnerProcessInfo &info); 554 555 void ExecuteAsyncDownloadAssets(TaskId taskId); 556 557 bool IsCloudForcePush(TaskId taskId); 558 559 mutable std::mutex dataLock_; 560 TaskId lastTaskId_; 561 std::multimap<int, TaskId, std::greater<int>> taskQueue_; 562 std::map<TaskId, CloudTaskInfo> cloudTaskInfos_; 563 std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_; 564 565 TaskContext currentContext_; 566 std::condition_variable contextCv_; 567 std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive 568 569 CloudDBProxy cloudDB_; 570 571 std::shared_ptr<StorageProxy> storageProxy_; 572 std::atomic<int32_t> queuedManualSyncLimit_; 573 574 std::atomic<bool> closed_; 575 std::atomic<bool> hasKvRemoveTask; 576 std::atomic<TimerId> timerId_; 577 std::mutex heartbeatMutex_; 578 std::condition_variable heartbeatCv_; 579 std::map<TaskId, int32_t> heartbeatCount_; 580 std::map<TaskId, int32_t> failedHeartbeatCount_; 581 582 std::string id_; 583 584 // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios: 585 // 1. Whether upload to the cloud after delete local data that does not have a gid. 586 // 2. Whether the local data need update for different flag when the local time is larger. 587 bool isKvScene_; 588 std::atomic<SingleVerConflictResolvePolicy> policy_; 589 std::condition_variable asyncTaskCv_; 590 TaskId asyncTaskId_; 591 std::atomic<bool> cancelAsyncTask_; 592 std::atomic<int> scheduleTaskCount_; 593 std::mutex listenerMutex_; 594 NotificationChain::Listener *waitDownloadListener_; 595 596 static constexpr const TaskId INVALID_TASK_ID = 0u; 597 static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2; 598 static constexpr const int HEARTBEAT_PERIOD = 3; 599 600 CloudSyncStateMachine cloudSyncStateMachine_; 601 }; 602 } 603 #endif // CLOUD_SYNCER_H 604