1 /* 2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H 17 #define OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H 18 19 #include "cloud/cloud_event.h" 20 #include "cloud/cloud_info.h" 21 #include "cloud/cloud_last_sync_info.h" 22 #include "cloud/sync_strategy.h" 23 #include "cloud_types.h" 24 #include "cloud/sync_event.h" 25 #include "concurrent_map.h" 26 #include "dfx/radar_reporter.h" 27 #include "eventcenter/event.h" 28 #include "executor_pool.h" 29 #include "metadata/store_meta_data_local.h" 30 #include "store/auto_cache.h" 31 #include "store/general_store.h" 32 #include "store/general_value.h" 33 #include "utils/ref_count.h" 34 35 namespace OHOS::CloudData { 36 class SyncManager { 37 public: 38 using CloudLastSyncInfo = DistributedData::CloudLastSyncInfo; 39 using GenAsync = DistributedData::GenAsync; 40 using GenStore = DistributedData::GeneralStore; 41 using GenQuery = DistributedData::GenQuery; 42 using RefCount = DistributedData::RefCount; 43 using AutoCache = DistributedData::AutoCache; 44 using StoreMetaData = DistributedData::StoreMetaData; 45 using SchemaMeta = DistributedData::SchemaMeta; 46 using TraceIds = std::map<std::string, std::string>; 47 using SyncStage = DistributedData::SyncStage; 48 using ReportParam = DistributedData::ReportParam; 49 class SyncInfo final { 50 public: 51 using Store = std::string; 52 using Stores = std::vector<Store>; 53 using Tables = std::vector<std::string>; 54 struct Param { 55 int32_t user; 56 std::string bundleName; 57 Store store; 58 Tables tables; 59 int32_t triggerMode = 0; 60 std::string prepareTraceId; 61 }; 62 using MutliStoreTables = std::map<Store, Tables>; 63 explicit SyncInfo(int32_t user, const std::string &bundleName = "", const Store &store = "", 64 const Tables &tables = {}, int32_t triggerMode = 0); 65 SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores); 66 SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables); 67 explicit SyncInfo(const Param ¶m); 68 void SetMode(int32_t mode); 69 void SetWait(int32_t wait); 70 void SetAsyncDetail(GenAsync asyncDetail); 71 void SetQuery(std::shared_ptr<GenQuery> query); 72 void SetError(int32_t code) const; 73 void SetCompensation(bool isCompensation); 74 void SetTriggerMode(int32_t triggerMode); 75 void SetPrepareTraceId(const std::string &prepareTraceId); 76 std::shared_ptr<GenQuery> GenerateQuery(const std::string &store, const Tables &tables); 77 bool Contains(const std::string &storeName); 78 inline static constexpr const char *DEFAULT_ID = "default"; 79 80 private: 81 friend SyncManager; 82 uint64_t syncId_ = 0; 83 int32_t mode_ = GenStore::MixMode(GenStore::CLOUD_TIME_FIRST, GenStore::AUTO_SYNC_MODE); 84 int32_t user_ = 0; 85 int32_t wait_ = 0; 86 std::string id_ = DEFAULT_ID; 87 std::string bundleName_; 88 std::map<std::string, std::vector<std::string>> tables_; 89 GenAsync async_; 90 std::shared_ptr<GenQuery> query_; 91 bool isCompensation_ = false; 92 int32_t triggerMode_ = 0; 93 std::string prepareTraceId_; 94 }; 95 SyncManager(); 96 ~SyncManager(); 97 static void Report(const ReportParam &reportParam); 98 static std::pair<int32_t, AutoCache::Store> GetStore(const StoreMetaData &meta, int32_t user, bool mustBind = true); 99 int32_t Bind(std::shared_ptr<ExecutorPool> executor); 100 int32_t DoCloudSync(SyncInfo syncInfo); 101 int32_t StopCloudSync(int32_t user = 0); 102 std::pair<int32_t, std::map<std::string, CloudLastSyncInfo>> QueryLastSyncInfo( 103 const std::vector<QueryKey> &queryKeys); 104 void OnScreenUnlocked(int32_t user); 105 void CleanCompensateSync(int32_t userId); 106 static std::string GetPath(const StoreMetaData &meta); 107 108 private: 109 using Event = DistributedData::Event; 110 using Task = ExecutorPool::Task; 111 using TaskId = ExecutorPool::TaskId; 112 using Duration = ExecutorPool::Duration; 113 using Retryer = 114 std::function<bool(Duration interval, int32_t status, int32_t dbCode, const std::string &prepareTraceId)>; 115 using CloudInfo = DistributedData::CloudInfo; 116 using StoreInfo = DistributedData::StoreInfo; 117 using SyncStrategy = DistributedData::SyncStrategy; 118 using SyncId = uint64_t; 119 using GeneralError = DistributedData::GeneralError; 120 using GenProgress = DistributedData::GenProgress; 121 using GenDetails = DistributedData::GenDetails; 122 123 static constexpr ExecutorPool::Duration RETRY_INTERVAL = std::chrono::seconds(10); // second 124 static constexpr ExecutorPool::Duration LOCKED_INTERVAL = std::chrono::seconds(30); // second 125 static constexpr ExecutorPool::Duration BUSY_INTERVAL = std::chrono::seconds(180); // second 126 static constexpr int32_t RETRY_TIMES = 6; // normal retry 127 static constexpr int32_t CLIENT_RETRY_TIMES = 3; // normal retry 128 static constexpr uint64_t USER_MARK = 0xFFFFFFFF00000000; // high 32 bit 129 static constexpr int32_t MV_BIT = 32; 130 static constexpr int32_t EXPIRATION_TIME = 6 * 60 * 60 * 1000; // 6 hours 131 132 static uint64_t GenerateId(int32_t user); 133 static ExecutorPool::Duration GetInterval(int32_t code); 134 static std::map<uint32_t, GenStore::BindInfo> GetBindInfos( 135 const StoreMetaData &meta, const std::vector<int32_t> &users, const DistributedData::Database &schemaDatabase); 136 static std::string GetAccountId(int32_t user); 137 static std::vector<std::tuple<QueryKey, uint64_t>> GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud); 138 static std::vector<SchemaMeta> GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName); 139 static bool NeedGetCloudInfo(CloudInfo &cloud); 140 static GeneralError IsValid(SyncInfo &info, CloudInfo &cloud); 141 static std::function<void(const Event &)> GetLockChangeHandler(); 142 static TraceIds GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud); 143 static void Report( 144 const std::string &faultType, const std::string &bundleName, int32_t errCode, const std::string &appendix); 145 static std::pair<int32_t, CloudLastSyncInfo> GetLastResults(std::map<SyncId, CloudLastSyncInfo> &infos); 146 static std::pair<int32_t, CloudLastSyncInfo> GetLastSyncInfoFromMeta(const QueryKey &queryKey); 147 static void SaveLastSyncInfo(const QueryKey &queryKey, CloudLastSyncInfo &&info); 148 static void BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode, 149 const std::string &message = ""); 150 static void ReportSyncEvent(const DistributedData::SyncEvent &evt, DistributedDataDfx::BizState bizState, 151 int32_t code); 152 static bool HandleRetryFinished(const SyncInfo &info, int32_t user, int32_t code, int32_t dbCode, 153 const std::string &prepareTraceId); 154 Task GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo); 155 void UpdateSchema(const SyncInfo &syncInfo); 156 std::function<void(const Event &)> GetSyncHandler(Retryer retryer); 157 std::function<void(const Event &)> GetClientChangeHandler(); 158 Retryer GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user); 159 RefCount GenSyncRef(uint64_t syncId); 160 int32_t Compare(uint64_t syncId, int32_t user); 161 void UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos); 162 void UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code); 163 std::function<void(const DistributedData::GenDetails &result)> GetCallback(const GenAsync &async, 164 const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user); 165 std::function<void()> GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud, SyncInfo &info, 166 bool retry, const TraceIds &traceIds); 167 void DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo, 168 const ReportParam ¶m); 169 bool InitDefaultUser(int32_t &user); 170 std::function<void(const DistributedData::GenDetails &result)> RetryCallback(const StoreInfo &storeInfo, 171 Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user); 172 void BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos, int32_t code); 173 bool NeedSaveSyncInfo(const QueryKey &queryKey); 174 void StartCloudSync(const DistributedData::SyncEvent &evt, const StoreMetaData &meta, 175 const AutoCache::Store &store, Retryer retryer, DistributedData::GenDetails &details); 176 std::pair<bool, StoreMetaData> GetMetaData(const StoreInfo &storeInfo); 177 void AddCompensateSync(const StoreMetaData &meta); 178 static DistributedData::GenDetails ConvertGenDetailsCode(const GenDetails &details); 179 static int32_t ConvertValidGeneralCode(int32_t code); 180 static std::atomic<uint32_t> genId_; 181 std::shared_ptr<ExecutorPool> executor_; 182 ConcurrentMap<uint64_t, TaskId> actives_; 183 ConcurrentMap<uint64_t, uint64_t> activeInfos_; 184 std::shared_ptr<SyncStrategy> syncStrategy_; 185 ConcurrentMap<QueryKey, std::map<SyncId, CloudLastSyncInfo>> lastSyncInfos_; 186 std::set<std::string> kvApps_; 187 ConcurrentMap<int32_t, std::map<std::string, std::set<std::string>>> compensateSyncInfos_; 188 }; 189 } // namespace OHOS::CloudData 190 #endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H