1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_SYNCER_H 17 #define CLOUD_SYNCER_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <utility> 22 23 #include "cloud_db_proxy.h" 24 #include "cloud/cloud_store_types.h" 25 #include "cloud/cloud_sync_strategy.h" 26 #include "cloud/icloud_syncer.h" 27 #include "cloud/process_notifier.h" 28 #include "data_transformer.h" 29 #include "db_common.h" 30 #include "cloud/icloud_db.h" 31 #include "ref_object.h" 32 #include "runtime_context.h" 33 #include "storage_proxy.h" 34 #include "store_observer.h" 35 36 namespace DistributedDB { 37 using DownloadList = std::vector<std::tuple<std::string, Type, OpType, std::map<std::string, Assets>, Key, 38 std::vector<Type>>>; 39 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>; 40 class CloudSyncer : public ICloudSyncer { 41 public: 42 explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy); 43 ~CloudSyncer() override = default; 44 DISABLE_COPY_ASSIGN_MOVE(CloudSyncer); 45 46 int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables, 47 const SyncProcessCallback &callback, int64_t waitTime); 48 49 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 50 51 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 52 53 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 54 const RelationalSchemaObject &localSchema); 55 56 int32_t GetCloudSyncTaskCount(); 57 58 void Close(); 59 60 void IncSyncCallbackTaskCount() override; 61 62 void DecSyncCallbackTaskCount() override; 63 64 std::string GetIdentify() const override; 65 protected: 66 struct DataInfo { 67 DataInfoWithLog localInfo; 68 LogInfo cloudLogInfo; 69 }; 70 struct AssetDownloadList { 71 // assets in following list will fill STATUS and timestamp after calling downloading 72 DownloadList downloadList = {}; 73 // assets in following list won't fill STATUS and timestamp after calling downloading 74 DownloadList completeDownloadList = {}; 75 }; 76 struct WithoutRowIdData { 77 std::vector<size_t> insertData = {}; 78 std::vector<std::tuple<size_t, size_t>> updateData = {}; 79 std::vector<std::tuple<size_t, size_t>> assetInsertData = {}; 80 }; 81 struct SyncParam { 82 DownloadData downloadData; 83 ChangedData changedData; 84 InnerProcessInfo info; 85 AssetDownloadList assetsDownloadList; 86 std::string cloudWaterMark; 87 std::vector<std::string> pkColNames; 88 std::set<Key> deletePrimaryKeySet; 89 std::set<Key> dupHashKeySet; 90 std::string tableName; 91 bool isSinglePrimaryKey; 92 bool isLastBatch = false; 93 WithoutRowIdData withoutRowIdData; 94 }; 95 struct TaskContext { 96 TaskId currentTaskId = 0u; 97 std::string tableName; 98 std::shared_ptr<ProcessNotifier> notifier; 99 std::shared_ptr<CloudSyncStrategy> strategy; 100 std::map<TableName, std::vector<Field>> assetFields; 101 // should be cleared after each Download 102 AssetDownloadList assetDownloadList; 103 // store GID and assets, using in upload procedure 104 std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo; 105 std::map<TableName, std::string> cloudWaterMarks; 106 }; 107 struct UploadParam { 108 int64_t count = 0; 109 TaskId taskId = 0u; 110 Timestamp localMark = 0u; 111 bool lastTable = false; 112 }; 113 struct DownloadItem { 114 std::string gid; 115 Type prefix; 116 OpType strategy; 117 std::map<std::string, Assets> assets; 118 Key hashKey; 119 std::vector<Type> primaryKeyValList; 120 }; 121 122 int TriggerSync(); 123 124 void DoSyncIfNeed(); 125 126 int DoSync(TaskId taskId); 127 128 int DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload); 129 130 int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); 131 132 void DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo); 133 134 virtual int DoDownload(CloudSyncer::TaskId taskId); 135 136 int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m); 137 138 void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); 139 140 int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark); 141 142 int PreCheck(TaskId &taskId, const TableName &tableName); 143 144 int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo); 145 146 int CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName, const int64_t &count, 147 TaskId &taskId); 148 149 static bool CheckCloudSyncDataEmpty(const CloudSyncData &uploadData); 150 151 int GetWaterMarkAndUpdateTime(std::vector<VBucket>& extend, Timestamp &waterMark); 152 153 int UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, TaskId taskId, 154 Timestamp &waterMark); 155 156 void ClearCloudSyncData(CloudSyncData &uploadData); 157 158 int PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo, 159 CloudSyncData &uploadData, Timestamp &localMark); 160 161 int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); 162 163 virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable); 164 165 void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); 166 167 bool IsModeForcePush(const TaskId taskId); 168 169 bool IsModeForcePull(const TaskId taskId); 170 171 int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); 172 173 int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames); 174 175 int QueryCloudData(const std::string &tableName, std::string &cloudWaterMark, DownloadData &downloadData); 176 177 int CheckTaskIdValid(TaskId taskId); 178 179 int GetCurrentTableName(std::string &tableName); 180 181 int TryToAddSyncTask(CloudTaskInfo &&taskInfo); 182 183 int CheckQueueSizeWithNoLock(); 184 185 int PrepareSync(TaskId taskId); 186 187 int LockCloud(TaskId taskId); 188 189 int UnlockCloud(); 190 191 int StartHeartBeatTimer(int period, TaskId taskId); 192 193 void FinishHeartBeatTimer(); 194 195 void WaitAllHeartBeatTaskExit(); 196 197 void HeartBeat(TimerId timerId, TaskId taskId); 198 199 void HeartBeatFailed(TaskId taskId, int errCode); 200 201 void SetTaskFailed(TaskId taskId, int errCode); 202 203 int SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList); 204 205 int SaveData(SyncParam ¶m); 206 207 void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m); 208 209 int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); 210 211 int FindDeletedListIndex(const std::vector<std::pair<Key, size_t>> &deletedList, const Key &hashKey, 212 size_t &delIdx); 213 214 int SaveChangedData(SyncParam ¶m, int dataIndex, const DataInfo &dataInfo, 215 std::vector<std::pair<Key, size_t>> &deletedList); 216 217 int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m); 218 219 void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); 220 221 bool NeedNotifyChangedData(const ChangedData &changedData); 222 223 int NotifyChangedData(ChangedData &&changedData); 224 225 std::map<std::string, Assets> GetAssetsFromVBucket(VBucket &data); 226 227 std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, 228 bool setNormalStatus, int &errCode); 229 230 int TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); 231 232 int HandleTagAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, VBucket &localAssetInfo); 233 234 int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo, 235 VBucket &localAssetInfo); 236 237 int TagUploadAssets(CloudSyncData &uploadData); 238 239 int FillCloudAssets(const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets); 240 241 int HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList, uint32_t &successCount); 242 243 int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames, 244 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 245 246 int CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult, 247 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 248 249 void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem); 250 251 bool IsDataContainAssets(); 252 253 void ModifyCloudDataTime(VBucket &data); 254 255 int SaveCloudWaterMark(const TableName &tableName); 256 257 bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data); 258 259 int UpdateChangedData(SyncParam ¶m, AssetDownloadList &assetsDownloadList); 260 261 void WaitAllSyncCallbackTaskFinish(); 262 263 void UpdateCloudWaterMark(const SyncParam ¶m); 264 265 int TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult); 266 267 int CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList); 268 269 static int CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode); 270 271 void ClearWithoutData(SyncParam ¶m); 272 273 std::mutex queueLock_; 274 TaskId currentTaskId_; 275 std::list<TaskId> taskQueue_; 276 std::map<TaskId, CloudTaskInfo> cloudTaskInfos_; 277 278 std::mutex contextLock_; 279 TaskContext currentContext_; 280 std::condition_variable contextCv_; 281 std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive 282 283 CloudDBProxy cloudDB_; 284 285 std::shared_ptr<StorageProxy> storageProxy_; 286 std::atomic<int32_t> queuedManualSyncLimit_; 287 288 std::atomic<bool> closed_; 289 290 std::atomic<TimerId> timerId_; 291 std::mutex heartbeatMutex_; 292 std::condition_variable heartbeatCv_; 293 int32_t heartBeatCount_; 294 std::atomic<int32_t> failedHeartBeatCount_; 295 296 std::mutex syncCallbackMutex_; 297 std::condition_variable syncCallbackCv_; 298 int32_t syncCallbackCount_; 299 300 std::string id_; 301 }; 302 } 303 #endif // CLOUD_SYNCER_H 304