1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_SYNCER_H 17 #define CLOUD_SYNCER_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <utility> 22 23 #include "cloud/cloud_db_proxy.h" 24 #include "cloud/cloud_store_types.h" 25 #include "cloud/cloud_sync_state_machine.h" 26 #include "cloud/cloud_sync_strategy.h" 27 #include "cloud/icloud_db.h" 28 #include "cloud/icloud_syncer.h" 29 #include "cloud/process_notifier.h" 30 #include "cloud/process_recorder.h" 31 #include "cloud_locker.h" 32 #include "data_transformer.h" 33 #include "db_common.h" 34 #include "ref_object.h" 35 #include "runtime_context.h" 36 #include "storage_proxy.h" 37 #include "store_observer.h" 38 39 namespace DistributedDB { 40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>; 41 const std::string CLOUD_PRIORITY_TASK_STRING = "priority"; 42 const std::string CLOUD_COMMON_TASK_STRING = "common"; 43 class CloudSyncer : public ICloudSyncer { 44 public: 45 explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false, 46 SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN); 47 void InitCloudSyncStateMachine(); 48 ~CloudSyncer() override = default; 49 DISABLE_COPY_ASSIGN_MOVE(CloudSyncer); 50 51 int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables, 52 const SyncProcessCallback &callback, int64_t waitTime); 53 54 int Sync(const CloudTaskInfo &taskInfo); 55 56 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 57 58 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 59 60 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 61 const RelationalSchemaObject &localSchema); 62 63 int CleanKvCloudData(std::function<int(void)> &removeFunc); 64 65 int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList); 66 67 int32_t GetCloudSyncTaskCount(); 68 69 void Close(); 70 71 void StopAllTasks(int errCode = -E_USER_CHANGE); 72 73 std::string GetIdentify() const override; 74 75 bool IsClosed() const override; 76 77 void GenerateCompensatedSync(CloudTaskInfo &taskInfo); 78 79 int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs); 80 81 const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const; 82 83 void CleanAllWaterMark(); 84 85 CloudSyncEvent SyncMachineDoDownload(); 86 87 CloudSyncEvent SyncMachineDoUpload(); 88 89 CloudSyncEvent SyncMachineDoFinished(); 90 91 void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); 92 93 SyncProcess GetCloudTaskStatus(uint64_t taskId) const; 94 protected: 95 struct TaskContext { 96 TaskId currentTaskId = 0u; 97 std::string tableName; 98 std::shared_ptr<ProcessNotifier> notifier; 99 std::shared_ptr<CloudSyncStrategy> strategy; 100 std::shared_ptr<ProcessRecorder> processRecorder; 101 std::map<TableName, std::vector<Field>> assetFields; 102 // should be cleared after each Download 103 DownloadList assetDownloadList; 104 // store GID and assets, using in upload procedure 105 std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo; 106 // struct: <currentUserIndex, <tableName, waterMark>> 107 std::map<int, std::map<TableName, std::string>> cloudWaterMarks; 108 std::shared_ptr<CloudLocker> locker; 109 bool isNeedUpload = false; // whether the current task need to do upload 110 bool isRealNeedUpload = false; 111 bool isFirstDownload = false; 112 int currentUserIndex = 0; 113 int repeatCount = 0; 114 }; 115 struct UploadParam { 116 int64_t count = 0; 117 TaskId taskId = 0u; 118 Timestamp localMark = 0u; 119 bool lastTable = false; 120 CloudWaterType mode = CloudWaterType::DELETE; 121 LockAction lockAction = LockAction::INSERT; 122 }; 123 struct DownloadItem { 124 std::string gid; 125 Type prefix; 126 OpType strategy; 127 std::map<std::string, Assets> assets; 128 Key hashKey; 129 std::vector<Type> primaryKeyValList; 130 Timestamp timestamp; 131 bool recordConflict = false; 132 }; 133 struct ResumeTaskInfo { 134 TaskContext context; 135 SyncParam syncParam; 136 bool upload = false; // task pause when upload 137 size_t lastDownloadIndex = 0u; 138 Timestamp lastLocalWatermark = 0u; 139 int downloadStatus = E_OK; 140 }; 141 struct DownloadItems { 142 DownloadItem downloadItem; 143 std::map<std::string, Assets> assetsToRemove; 144 std::map<std::string, Assets> assetsToDownload; 145 std::map<std::string, std::vector<uint32_t>> flags; 146 }; 147 148 int TriggerSync(); 149 150 void DoSyncIfNeed(); 151 152 int DoSync(TaskId taskId); 153 154 int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index); 155 156 int DoSyncInner(const CloudTaskInfo &taskInfo); 157 158 int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); 159 160 void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables, 161 const bool isFirstDownload); 162 163 int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count); 164 165 void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify); 166 167 virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload); 168 169 void SetNeedUpload(bool isNeedUpload); 170 171 void DoFinished(TaskId taskId, int errCode); 172 173 virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload); 174 175 int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 176 177 void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); 178 179 int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark); 180 181 int PreCheck(TaskId &taskId, const TableName &tableName); 182 183 int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData, 184 InnerProcessInfo &innerProcessInfo); 185 186 int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo); 187 188 int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, 189 CloudSyncData &uploadData); 190 191 int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); 192 193 virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction); 194 195 void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); 196 197 bool IsModeForcePush(TaskId taskId); 198 199 bool IsModeForcePull(const TaskId taskId); 200 201 bool IsPriorityTask(TaskId taskId); 202 203 bool IsCompensatedTask(TaskId taskId); 204 205 int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); 206 207 int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info); 208 209 int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames); 210 211 int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark, 212 DownloadData &downloadData); 213 214 size_t GetCurrentCommonTaskNum(); 215 216 int CheckTaskIdValid(TaskId taskId); 217 218 int GetCurrentTableName(std::string &tableName); 219 220 int TryToAddSyncTask(CloudTaskInfo &&taskInfo); 221 222 int CheckQueueSizeWithNoLock(bool priorityTask); 223 224 int PrepareSync(TaskId taskId); 225 226 int LockCloud(TaskId taskId); 227 228 int UnlockCloud(); 229 230 int StartHeartBeatTimer(int period, TaskId taskId); 231 232 void FinishHeartBeatTimer(); 233 234 void HeartBeat(TimerId timerId, TaskId taskId); 235 236 void HeartBeatFailed(TaskId taskId, int errCode); 237 238 void SetTaskFailed(TaskId taskId, int errCode); 239 240 int SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList, 241 std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo); 242 243 int SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m); 244 245 void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 246 247 int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); 248 249 int DoDownloadAssets(SyncParam ¶m); 250 251 int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m, bool downloadAssetOnly); 252 253 void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); 254 255 bool NeedNotifyChangedData(const ChangedData &changedData); 256 257 int NotifyChangedDataInCurrentTask(ChangedData &&changedData); 258 259 std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, 260 bool setNormalStatus, bool isForcePullAseets, int &errCode); 261 262 int TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); 263 264 int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 265 VBucket &localAssetInfo); 266 267 int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, 268 VBucket &localAssetInfo); 269 270 int TagDownloadAssetsForAssetOnly( 271 const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, VBucket &localAssetInfo); 272 273 void TagUploadAssets(CloudSyncData &uploadData); 274 275 int FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets); 276 277 int HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info, 278 DownloadCommitList &commitList, uint32_t &successCount); 279 280 int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark, 281 VBucket &extend); 282 283 int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj); 284 285 int GetCloudGid( 286 TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid); 287 288 int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames, 289 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 290 291 int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList, 292 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 293 294 void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem); 295 296 bool IsDataContainAssets(); 297 298 int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId); 299 300 bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data); 301 302 int UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList); 303 304 void UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m); 305 306 int TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult); 307 308 int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info, 309 DownloadCommitList &commitList, int errCode); 310 311 int GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo, 312 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo); 313 314 TaskId GetNextTaskId(); 315 316 void MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo); 317 318 void SetCurrentTaskFailedWithoutLock(int errCode); 319 320 int LockCloudIfNeed(TaskId taskId); 321 322 void UnlockIfNeed(); 323 324 void ClearCurrentContextWithoutLock(); 325 326 void ClearContextAndNotify(TaskId taskId, int errCode); 327 328 int DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload); 329 330 int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList, 331 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets); 332 333 int GetSyncParamForDownload(TaskId taskId, SyncParam ¶m); 334 335 bool IsCurrentTaskResume(TaskId taskId); 336 337 bool IsCurrentTableResume(TaskId taskId, bool upload); 338 339 int DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isFirstDownload); 340 341 size_t GetDownloadAssetIndex(TaskId taskId); 342 343 uint32_t GetCurrentTableUploadBatchIndex(); 344 345 void ResetCurrentTableUploadBatchIndex(); 346 347 void RecordWaterMark(TaskId taskId, Timestamp waterMark); 348 349 Timestamp GetResumeWaterMark(TaskId taskId); 350 351 void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark); 352 353 void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark); 354 355 void ReloadUploadInfoIfNeed(const UploadParam ¶m, InnerProcessInfo &info); 356 357 void GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo); 358 359 QuerySyncObject GetQuerySyncObject(const std::string &tableName); 360 361 InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam); 362 363 void NotifyUploadFailed(int errCode, InnerProcessInfo &info); 364 365 void UpdateProcessWhenUploadFailed(InnerProcessInfo &info); 366 367 int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 368 369 int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 370 371 int BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo, 372 bool isInsert); 373 374 int BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult); 375 376 int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 377 378 int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem, 379 std::map<std::string, Assets> &downloadAssets); 380 381 std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info, 382 const DownloadItem &downloadItem, VBucket &dbAssets); 383 384 std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode, 385 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload, 386 std::map<std::string, Assets> &tmpAssetsToDelete); 387 388 int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info, 389 const DownloadItem &downloadItem, VBucket &dbAssets); 390 391 bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem); 392 393 int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem, 394 std::map<std::string, Assets> &downloadAssets); 395 396 int CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info, 397 DownloadCommitList &commitList, uint32_t &successCount); 398 399 int CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, 400 DownloadCommitList &commitList, uint32_t &successCount); 401 402 void SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets, 403 VBucket &failedAssets); 404 405 void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam); 406 407 int SaveCursorIfNeed(const std::string &tableName); 408 409 int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload); 410 411 int UpdateFlagForSavedRecord(const SyncParam ¶m); 412 413 bool IsNeedGetLocalWater(TaskId taskId); 414 415 void SetProxyUser(const std::string &user); 416 417 void MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId); 418 419 void RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId); 420 421 std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId); 422 423 bool IsTaskCanMerge(const CloudTaskInfo &taskInfo); 424 425 bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId); 426 427 bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId); 428 429 void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo); 430 431 std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target); 432 433 bool IsQueryListEmpty(TaskId taskId); 434 435 std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam); 436 437 int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData, 438 ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData); 439 440 bool IsNeedLock(const UploadParam ¶m); 441 442 int UploadVersionRecordIfNeed(const UploadParam &uploadParam); 443 444 std::vector<CloudTaskInfo> CopyAndClearTaskInfos(); 445 446 void WaitCurTaskFinished(); 447 448 bool IsLockInDownload(); 449 450 CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode); 451 452 CloudSyncEvent SyncMachineDoRepeatCheck(); 453 454 void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true); 455 456 bool IsTableFinishInUpload(const std::string &table); 457 458 void MarkUploadFinishIfNeed(const std::string &table); 459 460 int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo); 461 462 void ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount); 463 464 std::string GetStoreIdByTask(TaskId taskId); 465 466 int CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 467 InnerProcessInfo &info, ChangedData &changedAssets); 468 469 void FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList, 470 const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record); 471 472 using DownloadItemRecords = std::vector<DownloadItems>; 473 using RemoveAssetsRecords = std::vector<IAssetLoader::AssetRecord>; 474 using DownloadAssetsRecords = std::vector<IAssetLoader::AssetRecord>; 475 using DownloadAssetDetail = std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords>; 476 DownloadAssetDetail GetDownloadRecords(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 477 bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info); 478 479 int BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet, 480 InnerProcessInfo &info, ChangedData &changedAssets, 481 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail); 482 483 static void StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord, 484 const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem); 485 486 static void AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem, 487 ChangedData &changedAssets); 488 489 void CheckDataAfterDownload(const std::string &tableName); 490 491 bool IsAsyncDownloadAssets(TaskId taskId); 492 493 void TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload); 494 495 void TriggerAsyncDownloadAssetsIfNeed(); 496 497 void BackgroundDownloadAssetsTask(); 498 499 void CancelDownloadListener(); 500 501 void DoBackgroundDownloadAssets(); 502 503 void CancelBackgroundDownloadAssetsTaskIfNeed(); 504 505 void CancelBackgroundDownloadAssetsTask(bool cancelDownload = true); 506 507 int BackgroundDownloadAssetsByTable(const std::string &table, std::map<std::string, int64_t> &downloadBeginTime); 508 509 int CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam ¶m); 510 511 int CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam ¶m, DataInfoWithLog &logInfo); 512 513 int PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam ¶m, std::vector<VBucket> &localInfo); 514 515 bool IsCurrentAsyncDownloadTask(); 516 517 bool CanStartAsyncDownload() const; 518 519 int GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj, VBucket &extend); 520 521 int QueryCloudGidForAssetsOnly( 522 TaskId taskId, SyncParam ¶m, int64_t groupIdx, std::vector<std::string> &cloudGid); 523 524 int GetEmptyGidAssetsMapFromDownloadData( 525 const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap); 526 527 int SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId, SyncParam ¶m, std::vector<VBucket> &downloadData, 528 std::map<std::string, AssetsMap> &gidAssetsMap); 529 530 void NotifyChangedDataWithDefaultDev(ChangedData &&changedData); 531 532 bool IsAlreadyHaveCompensatedSyncTask(); 533 534 bool TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId); 535 536 int FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap, 537 InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail); 538 539 int UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem, InnerProcessInfo &info, 540 bool isExistAssetDownloadFail); 541 542 static void ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info); 543 544 mutable std::mutex dataLock_; 545 TaskId lastTaskId_; 546 std::multimap<int, TaskId, std::greater<int>> taskQueue_; 547 std::map<TaskId, CloudTaskInfo> cloudTaskInfos_; 548 std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_; 549 550 TaskContext currentContext_; 551 std::condition_variable contextCv_; 552 std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive 553 554 CloudDBProxy cloudDB_; 555 556 std::shared_ptr<StorageProxy> storageProxy_; 557 std::atomic<int32_t> queuedManualSyncLimit_; 558 559 std::atomic<bool> closed_; 560 std::atomic<bool> hasKvRemoveTask; 561 std::atomic<TimerId> timerId_; 562 std::mutex heartbeatMutex_; 563 std::condition_variable heartbeatCv_; 564 std::map<TaskId, int32_t> heartbeatCount_; 565 std::map<TaskId, int32_t> failedHeartbeatCount_; 566 567 std::string id_; 568 569 // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios: 570 // 1. Whether upload to the cloud after delete local data that does not have a gid. 571 // 2. Whether the local data need update for different flag when the local time is larger. 572 bool isKvScene_; 573 std::atomic<SingleVerConflictResolvePolicy> policy_; 574 std::condition_variable asyncTaskCv_; 575 TaskId asyncTaskId_; 576 std::atomic<bool> cancelAsyncTask_; 577 std::mutex listenerMutex_; 578 NotificationChain::Listener *waitDownloadListener_; 579 580 static constexpr const TaskId INVALID_TASK_ID = 0u; 581 static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2; 582 static constexpr const int HEARTBEAT_PERIOD = 3; 583 584 CloudSyncStateMachine cloudSyncStateMachine_; 585 }; 586 } 587 #endif // CLOUD_SYNCER_H 588