• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_SYNCER_H
17 #define CLOUD_SYNCER_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <utility>
22 
23 #include "cloud_db_proxy.h"
24 #include "cloud/cloud_store_types.h"
25 #include "cloud/cloud_sync_strategy.h"
26 #include "cloud/icloud_syncer.h"
27 #include "cloud/process_notifier.h"
28 #include "data_transformer.h"
29 #include "db_common.h"
30 #include "cloud/icloud_db.h"
31 #include "ref_object.h"
32 #include "runtime_context.h"
33 #include "storage_proxy.h"
34 #include "store_observer.h"
35 
36 namespace DistributedDB {
37 using DownloadList = std::vector<std::tuple<std::string, Type, OpType, std::map<std::string, Assets>, Key,
38     std::vector<Type>>>;
39 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>;
40 class CloudSyncer : public ICloudSyncer {
41 public:
42     explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy);
43     ~CloudSyncer() override = default;
44     DISABLE_COPY_ASSIGN_MOVE(CloudSyncer);
45 
46     int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables,
47         const SyncProcessCallback &callback, int64_t waitTime);
48 
49     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
50 
51     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
52 
53     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
54         const RelationalSchemaObject &localSchema);
55 
56     int32_t GetCloudSyncTaskCount();
57 
58     void Close();
59 
60     void IncSyncCallbackTaskCount() override;
61 
62     void DecSyncCallbackTaskCount() override;
63 
64     std::string GetIdentify() const override;
65 protected:
66     struct DataInfo {
67         DataInfoWithLog localInfo;
68         LogInfo cloudLogInfo;
69     };
70     struct AssetDownloadList {
71         // assets in following list will fill STATUS and timestamp after calling downloading
72         DownloadList downloadList = {};
73         // assets in following list won't fill STATUS and timestamp after calling downloading
74         DownloadList completeDownloadList = {};
75     };
76     struct WithoutRowIdData {
77         std::vector<size_t> insertData = {};
78         std::vector<std::tuple<size_t, size_t>> updateData = {};
79         std::vector<std::tuple<size_t, size_t>> assetInsertData = {};
80     };
81     struct SyncParam {
82         DownloadData downloadData;
83         ChangedData changedData;
84         InnerProcessInfo info;
85         AssetDownloadList assetsDownloadList;
86         std::string cloudWaterMark;
87         std::vector<std::string> pkColNames;
88         std::set<Key> deletePrimaryKeySet;
89         std::set<Key> dupHashKeySet;
90         std::string tableName;
91         bool isSinglePrimaryKey;
92         bool isLastBatch = false;
93         WithoutRowIdData withoutRowIdData;
94     };
95     struct TaskContext {
96         TaskId currentTaskId = 0u;
97         std::string tableName;
98         std::shared_ptr<ProcessNotifier> notifier;
99         std::shared_ptr<CloudSyncStrategy> strategy;
100         std::map<TableName, std::vector<Field>> assetFields;
101         // should be cleared after each Download
102         AssetDownloadList assetDownloadList;
103         // store GID and assets, using in upload procedure
104         std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo;
105         std::map<TableName, std::string> cloudWaterMarks;
106     };
107     struct UploadParam {
108         int64_t count = 0;
109         TaskId taskId = 0u;
110         Timestamp localMark = 0u;
111         bool lastTable = false;
112     };
113     struct DownloadItem {
114         std::string gid;
115         Type prefix;
116         OpType strategy;
117         std::map<std::string, Assets> assets;
118         Key hashKey;
119         std::vector<Type> primaryKeyValList;
120     };
121 
122     int TriggerSync();
123 
124     void DoSyncIfNeed();
125 
126     int DoSync(TaskId taskId);
127 
128     int DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload);
129 
130     int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload);
131 
132     void DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo);
133 
134     virtual int DoDownload(CloudSyncer::TaskId taskId);
135 
136     int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param);
137 
138     void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info);
139 
140     int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark);
141 
142     int PreCheck(TaskId &taskId, const TableName &tableName);
143 
144     int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo);
145 
146     int CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName, const int64_t &count,
147         TaskId &taskId);
148 
149     static bool CheckCloudSyncDataEmpty(const CloudSyncData &uploadData);
150 
151     int GetWaterMarkAndUpdateTime(std::vector<VBucket>& extend, Timestamp &waterMark);
152 
153     int UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, TaskId taskId,
154         Timestamp &waterMark);
155 
156     void ClearCloudSyncData(CloudSyncData &uploadData);
157 
158     int PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo,
159         CloudSyncData &uploadData, Timestamp &localMark);
160 
161     int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam);
162 
163     virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable);
164 
165     void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData);
166 
167     bool IsModeForcePush(const TaskId taskId);
168 
169     bool IsModeForcePull(const TaskId taskId);
170 
171     int DoUploadInner(const std::string &tableName, UploadParam &uploadParam);
172 
173     int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames);
174 
175     int QueryCloudData(const std::string &tableName, std::string &cloudWaterMark, DownloadData &downloadData);
176 
177     int CheckTaskIdValid(TaskId taskId);
178 
179     int GetCurrentTableName(std::string &tableName);
180 
181     int TryToAddSyncTask(CloudTaskInfo &&taskInfo);
182 
183     int CheckQueueSizeWithNoLock();
184 
185     int PrepareSync(TaskId taskId);
186 
187     int LockCloud(TaskId taskId);
188 
189     int UnlockCloud();
190 
191     int StartHeartBeatTimer(int period, TaskId taskId);
192 
193     void FinishHeartBeatTimer();
194 
195     void WaitAllHeartBeatTaskExit();
196 
197     void HeartBeat(TimerId timerId, TaskId taskId);
198 
199     void HeartBeatFailed(TaskId taskId, int errCode);
200 
201     void SetTaskFailed(TaskId taskId, int errCode);
202 
203     int SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList);
204 
205     int SaveData(SyncParam &param);
206 
207     void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param);
208 
209     int SaveDataInTransaction(CloudSyncer::TaskId taskId,  SyncParam &param);
210 
211     int FindDeletedListIndex(const std::vector<std::pair<Key, size_t>> &deletedList, const Key &hashKey,
212         size_t &delIdx);
213 
214     int SaveChangedData(SyncParam &param, int dataIndex, const DataInfo &dataInfo,
215         std::vector<std::pair<Key, size_t>> &deletedList);
216 
217     int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param);
218 
219     void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch);
220 
221     bool NeedNotifyChangedData(const ChangedData &changedData);
222 
223     int NotifyChangedData(ChangedData &&changedData);
224 
225     std::map<std::string, Assets> GetAssetsFromVBucket(VBucket &data);
226 
227     std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
228         bool setNormalStatus, int &errCode);
229 
230     int TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo);
231 
232     int HandleTagAssets(const Key &hashKey, size_t idx, SyncParam &param, DataInfo &dataInfo, VBucket &localAssetInfo);
233 
234     int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, DataInfo &dataInfo,
235         VBucket &localAssetInfo);
236 
237     int TagUploadAssets(CloudSyncData &uploadData);
238 
239     int FillCloudAssets(const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets);
240 
241     int HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList, uint32_t &successCount);
242 
243     int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
244         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
245 
246     int CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult,
247         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
248 
249     void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem);
250 
251     bool IsDataContainAssets();
252 
253     void ModifyCloudDataTime(VBucket &data);
254 
255     int SaveCloudWaterMark(const TableName &tableName);
256 
257     bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data);
258 
259     int UpdateChangedData(SyncParam &param, AssetDownloadList &assetsDownloadList);
260 
261     void WaitAllSyncCallbackTaskFinish();
262 
263     void UpdateCloudWaterMark(const SyncParam &param);
264 
265     int TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult);
266 
267     int CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList);
268 
269     static int CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode);
270 
271     void ClearWithoutData(SyncParam &param);
272 
273     std::mutex queueLock_;
274     TaskId currentTaskId_;
275     std::list<TaskId> taskQueue_;
276     std::map<TaskId, CloudTaskInfo> cloudTaskInfos_;
277 
278     std::mutex contextLock_;
279     TaskContext currentContext_;
280     std::condition_variable contextCv_;
281     std::mutex syncMutex_;  // Clean Cloud Data and Sync are mutually exclusive
282 
283     CloudDBProxy cloudDB_;
284 
285     std::shared_ptr<StorageProxy> storageProxy_;
286     std::atomic<int32_t> queuedManualSyncLimit_;
287 
288     std::atomic<bool> closed_;
289 
290     std::atomic<TimerId> timerId_;
291     std::mutex heartbeatMutex_;
292     std::condition_variable heartbeatCv_;
293     int32_t heartBeatCount_;
294     std::atomic<int32_t> failedHeartBeatCount_;
295 
296     std::mutex syncCallbackMutex_;
297     std::condition_variable syncCallbackCv_;
298     int32_t syncCallbackCount_;
299 
300     std::string id_;
301 };
302 }
303 #endif // CLOUD_SYNCER_H
304