• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H
17 #define OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H
18 
19 #include "cloud/cloud_event.h"
20 #include "cloud/cloud_info.h"
21 #include "cloud/cloud_last_sync_info.h"
22 #include "cloud/sync_strategy.h"
23 #include "cloud_types.h"
24 #include "cloud/sync_event.h"
25 #include "concurrent_map.h"
26 #include "dfx/radar_reporter.h"
27 #include "eventcenter/event.h"
28 #include "executor_pool.h"
29 #include "metadata/store_meta_data_local.h"
30 #include "store/auto_cache.h"
31 #include "store/general_store.h"
32 #include "store/general_value.h"
33 #include "utils/ref_count.h"
34 
35 namespace OHOS::CloudData {
36 class SyncManager {
37 public:
38     using CloudLastSyncInfo = DistributedData::CloudLastSyncInfo;
39     using GenAsync = DistributedData::GenAsync;
40     using GenStore = DistributedData::GeneralStore;
41     using GenQuery = DistributedData::GenQuery;
42     using RefCount = DistributedData::RefCount;
43     using AutoCache = DistributedData::AutoCache;
44     using StoreMetaData = DistributedData::StoreMetaData;
45     using SchemaMeta = DistributedData::SchemaMeta;
46     using TraceIds = std::map<std::string, std::string>;
47     using SyncStage = DistributedData::SyncStage;
48     using ReportParam = DistributedData::ReportParam;
49     class SyncInfo final {
50     public:
51         using Store = std::string;
52         using Stores = std::vector<Store>;
53         using Tables = std::vector<std::string>;
54         struct Param {
55             int32_t user;
56             std::string bundleName;
57             Store store;
58             Tables tables;
59             int32_t triggerMode = 0;
60             std::string prepareTraceId;
61         };
62         using MutliStoreTables = std::map<Store, Tables>;
63         explicit SyncInfo(int32_t user, const std::string &bundleName = "", const Store &store = "",
64             const Tables &tables = {}, int32_t triggerMode = 0);
65         SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores);
66         SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables);
67         explicit SyncInfo(const Param &param);
68         void SetMode(int32_t mode);
69         void SetWait(int32_t wait);
70         void SetAsyncDetail(GenAsync asyncDetail);
71         void SetQuery(std::shared_ptr<GenQuery> query);
72         void SetError(int32_t code) const;
73         void SetCompensation(bool isCompensation);
74         void SetTriggerMode(int32_t triggerMode);
75         void SetPrepareTraceId(const std::string &prepareTraceId);
76         std::shared_ptr<GenQuery> GenerateQuery(const std::string &store, const Tables &tables);
77         bool Contains(const std::string &storeName);
78         inline static constexpr const char *DEFAULT_ID = "default";
79 
80     private:
81         friend SyncManager;
82         uint64_t syncId_ = 0;
83         int32_t mode_ = GenStore::MixMode(GenStore::CLOUD_TIME_FIRST, GenStore::AUTO_SYNC_MODE);
84         int32_t user_ = 0;
85         int32_t wait_ = 0;
86         std::string id_ = DEFAULT_ID;
87         std::string bundleName_;
88         std::map<std::string, std::vector<std::string>> tables_;
89         GenAsync async_;
90         std::shared_ptr<GenQuery> query_;
91         bool isCompensation_ = false;
92         int32_t triggerMode_ = 0;
93         std::string prepareTraceId_;
94     };
95     SyncManager();
96     ~SyncManager();
97     static void Report(const ReportParam &reportParam);
98     static std::pair<int32_t, AutoCache::Store> GetStore(const StoreMetaData &meta, int32_t user, bool mustBind = true);
99     int32_t Bind(std::shared_ptr<ExecutorPool> executor);
100     int32_t DoCloudSync(SyncInfo syncInfo);
101     int32_t StopCloudSync(int32_t user = 0);
102     std::pair<int32_t, std::map<std::string, CloudLastSyncInfo>> QueryLastSyncInfo(
103         const std::vector<QueryKey> &queryKeys);
104     void OnScreenUnlocked(int32_t user);
105     void CleanCompensateSync(int32_t userId);
106     static std::string GetPath(const StoreMetaData &meta);
107 
108 private:
109     using Event = DistributedData::Event;
110     using Task = ExecutorPool::Task;
111     using TaskId = ExecutorPool::TaskId;
112     using Duration = ExecutorPool::Duration;
113     using Retryer =
114         std::function<bool(Duration interval, int32_t status, int32_t dbCode, const std::string &prepareTraceId)>;
115     using CloudInfo = DistributedData::CloudInfo;
116     using StoreInfo = DistributedData::StoreInfo;
117     using SyncStrategy = DistributedData::SyncStrategy;
118     using SyncId = uint64_t;
119     using GeneralError = DistributedData::GeneralError;
120     using GenProgress = DistributedData::GenProgress;
121     using GenDetails = DistributedData::GenDetails;
122 
123     static constexpr ExecutorPool::Duration RETRY_INTERVAL = std::chrono::seconds(10);  // second
124     static constexpr ExecutorPool::Duration LOCKED_INTERVAL = std::chrono::seconds(30); // second
125     static constexpr ExecutorPool::Duration BUSY_INTERVAL = std::chrono::seconds(180);  // second
126     static constexpr int32_t RETRY_TIMES = 6;                                           // normal retry
127     static constexpr int32_t CLIENT_RETRY_TIMES = 3;                                    // normal retry
128     static constexpr uint64_t USER_MARK = 0xFFFFFFFF00000000;                           // high 32 bit
129     static constexpr int32_t MV_BIT = 32;
130     static constexpr int32_t EXPIRATION_TIME = 6 * 60 * 60 * 1000;                      // 6 hours
131 
132     static uint64_t GenerateId(int32_t user);
133     static ExecutorPool::Duration GetInterval(int32_t code);
134     static std::map<uint32_t, GenStore::BindInfo> GetBindInfos(
135         const StoreMetaData &meta, const std::vector<int32_t> &users, const DistributedData::Database &schemaDatabase);
136     static std::string GetAccountId(int32_t user);
137     static std::vector<std::tuple<QueryKey, uint64_t>> GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud);
138     static std::vector<SchemaMeta> GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName);
139     static bool NeedGetCloudInfo(CloudInfo &cloud);
140     static GeneralError IsValid(SyncInfo &info, CloudInfo &cloud);
141     static std::function<void(const Event &)> GetLockChangeHandler();
142     static TraceIds GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud);
143     static void Report(
144         const std::string &faultType, const std::string &bundleName, int32_t errCode, const std::string &appendix);
145     static std::pair<int32_t, CloudLastSyncInfo> GetLastResults(std::map<SyncId, CloudLastSyncInfo> &infos);
146     static std::pair<int32_t, CloudLastSyncInfo> GetLastSyncInfoFromMeta(const QueryKey &queryKey);
147     static void SaveLastSyncInfo(const QueryKey &queryKey, CloudLastSyncInfo &&info);
148     static void BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode,
149         const std::string &message = "");
150     static void ReportSyncEvent(const DistributedData::SyncEvent &evt, DistributedDataDfx::BizState bizState,
151         int32_t code);
152     static bool HandleRetryFinished(const SyncInfo &info, int32_t user, int32_t code, int32_t dbCode,
153         const std::string &prepareTraceId);
154     Task GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo);
155     void UpdateSchema(const SyncInfo &syncInfo);
156     std::function<void(const Event &)> GetSyncHandler(Retryer retryer);
157     std::function<void(const Event &)> GetClientChangeHandler();
158     Retryer GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user);
159     RefCount GenSyncRef(uint64_t syncId);
160     int32_t Compare(uint64_t syncId, int32_t user);
161     void UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos);
162     void UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code);
163     std::function<void(const DistributedData::GenDetails &result)> GetCallback(const GenAsync &async,
164         const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user);
165     std::function<void()> GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud, SyncInfo &info,
166         bool retry, const TraceIds &traceIds);
167     void DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
168         const ReportParam &param);
169     bool InitDefaultUser(int32_t &user);
170     std::function<void(const DistributedData::GenDetails &result)> RetryCallback(const StoreInfo &storeInfo,
171         Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user);
172     void BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos, int32_t code);
173     bool NeedSaveSyncInfo(const QueryKey &queryKey);
174     void StartCloudSync(const DistributedData::SyncEvent &evt, const StoreMetaData &meta,
175         const AutoCache::Store &store, Retryer retryer, DistributedData::GenDetails &details);
176     std::pair<bool, StoreMetaData> GetMetaData(const StoreInfo &storeInfo);
177     void AddCompensateSync(const StoreMetaData &meta);
178     static DistributedData::GenDetails ConvertGenDetailsCode(const GenDetails &details);
179     static int32_t ConvertValidGeneralCode(int32_t code);
180     static std::atomic<uint32_t> genId_;
181     std::shared_ptr<ExecutorPool> executor_;
182     ConcurrentMap<uint64_t, TaskId> actives_;
183     ConcurrentMap<uint64_t, uint64_t> activeInfos_;
184     std::shared_ptr<SyncStrategy> syncStrategy_;
185     ConcurrentMap<QueryKey, std::map<SyncId, CloudLastSyncInfo>> lastSyncInfos_;
186     std::set<std::string> kvApps_;
187     ConcurrentMap<int32_t, std::map<std::string, std::set<std::string>>> compensateSyncInfos_;
188 };
189 } // namespace OHOS::CloudData
190 #endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H