1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef CLOUDSYNCER_TEST_H 16 #define CLOUDSYNCER_TEST_H 17 18 #include "cloud_merge_strategy.h" 19 #include "cloud_syncer.h" 20 #include "cloud_syncer_test.h" 21 #include "cloud_sync_utils.h" 22 #include "mock_iclouddb.h" 23 24 namespace DistributedDB { 25 26 const Asset ASSET_COPY = { .version = 1, 27 .name = "Phone", 28 .assetId = "0", 29 .subpath = "/local/sync", 30 .uri = "/local/sync", 31 .modifyTime = "123456", 32 .createTime = "", 33 .size = "256", 34 .hash = "ASE" }; 35 36 class TestStorageProxy : public StorageProxy { 37 public: TestStorageProxy(ICloudSyncStorageInterface * iCloud)38 explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud) 39 { 40 Init(); 41 } 42 }; 43 44 class TestCloudSyncer : public CloudSyncer { 45 public: TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46 explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy) 47 { 48 } 49 ~TestCloudSyncer() override = default; 50 DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer); 51 InitCloudSyncer(TaskId taskId,SyncMode mode)52 void InitCloudSyncer(TaskId taskId, SyncMode mode) 53 { 54 cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()}); 55 cloudTaskInfos_[taskId].mode = mode; 56 cloudTaskInfos_[taskId].taskId = taskId; 57 currentContext_.tableName = "TestTable" + std::to_string(taskId); 58 cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName); 59 currentContext_.currentTaskId = taskId; 60 currentContext_.notifier = std::make_shared<ProcessNotifier>(this); 61 currentContext_.processRecorder = std::make_shared<ProcessRecorder>(); 62 currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users); 63 currentContext_.strategy = std::make_shared<CloudMergeStrategy>(); 64 closed_ = false; 65 cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) { 66 if (process.size() >= 1u) { 67 process_[taskId] = process.begin()->second; 68 } else { 69 SyncProcess tmpProcess; 70 process_[taskId] = tmpProcess; 71 } 72 }; 73 } 74 SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)75 void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess) 76 { 77 cloudTaskInfos_[currentContext_.currentTaskId].table = tables; 78 cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess; 79 } 80 GetCurrentCloudTaskInfos()81 CloudTaskInfo GetCurrentCloudTaskInfos() 82 { 83 return cloudTaskInfos_[currentContext_.currentTaskId]; 84 } 85 CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)86 int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables, 87 const SyncProcessCallback &onProcess, int64_t waitTime) 88 { 89 CloudTaskInfo taskInfo; 90 taskInfo.mode = mode; 91 taskInfo.table = tables; 92 taskInfo.callback = onProcess; 93 taskInfo.timeout = waitTime; 94 return TryToAddSyncTask(std::move(taskInfo)); 95 } 96 CallClose()97 void CallClose() 98 { 99 currentContext_.currentTaskId = 0u; 100 currentContext_.strategy = nullptr; 101 currentContext_.notifier = nullptr; 102 Close(); 103 } SetTimeOut(TaskId taskId,int64_t timeout)104 void SetTimeOut(TaskId taskId, int64_t timeout) 105 { 106 this->cloudTaskInfos_[taskId].timeout = timeout; 107 } 108 InitCloudSyncerForSync()109 void InitCloudSyncerForSync() 110 { 111 this->closed_ = false; 112 this->cloudTaskInfos_[this->lastTaskId_].callback = [this]( 113 const std::map<std::string, SyncProcess> &process) { 114 if (process.size() == 1u) { 115 process_[this->lastTaskId_] = process.begin()->second; 116 } else { 117 SyncProcess tmpProcess; 118 process_[this->lastTaskId_] = tmpProcess; 119 } 120 }; 121 } 122 CallDoSyncInner(const CloudTaskInfo & taskInfo)123 int CallDoSyncInner(const CloudTaskInfo &taskInfo) 124 { 125 return DoSyncInner(taskInfo); 126 } 127 getCallback(TaskId taskId)128 SyncProcessCallback getCallback(TaskId taskId) 129 { 130 return cloudTaskInfos_[taskId].callback; 131 } 132 getCurrentTaskId()133 TaskId getCurrentTaskId() 134 { 135 return currentContext_.currentTaskId; 136 } 137 138 int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT) 139 { 140 storageProxy_->StartTransaction(); 141 int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction); 142 storageProxy_->Commit(); 143 return ret; 144 } 145 CallDoDownload(TaskId taskId)146 int CallDoDownload(TaskId taskId) 147 { 148 return CloudSyncer::DoDownload(taskId, true); 149 } 150 GetCurrentContextTableName()151 std::string GetCurrentContextTableName() 152 { 153 return this->currentContext_.tableName; 154 } 155 SetCurrentContextTableName(std::string name)156 void SetCurrentContextTableName(std::string name) 157 { 158 this->currentContext_.tableName = name; 159 } 160 CallClearCloudSyncData(CloudSyncData & uploadData)161 void CallClearCloudSyncData(CloudSyncData& uploadData) 162 { 163 CloudSyncUtils::ClearCloudSyncData(uploadData); 164 } 165 GetUploadSuccessCount(TaskId taskId)166 int32_t GetUploadSuccessCount(TaskId taskId) 167 { 168 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount; 169 } 170 GetUploadFailCount(TaskId taskId)171 int32_t GetUploadFailCount(TaskId taskId) 172 { 173 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount; 174 } 175 SetMockICloudDB(MockICloudDB * icloudDB)176 void SetMockICloudDB(MockICloudDB *icloudDB) 177 { 178 this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB)); 179 } 180 SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)181 void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB) 182 { 183 this->cloudDB_.SetCloudDB(icloudDB); 184 } 185 SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)186 CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table, 187 SyncProcessCallback callback, int64_t timeout) 188 { 189 CloudTaskInfo taskInfo; 190 taskInfo.mode = mode; 191 taskInfo.table = table; 192 taskInfo.callback = callback; 193 taskInfo.timeout = timeout; 194 return taskInfo; 195 } 196 initFullCloudSyncData(CloudSyncData & uploadData,int size)197 void initFullCloudSyncData(CloudSyncData &uploadData, int size) 198 { 199 VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1), 200 std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1), 201 std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"), 202 std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 203 VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 204 uploadData.insData.record = std::vector<VBucket>(size, tmp); 205 uploadData.insData.extend = std::vector<VBucket>(size, tmp); 206 uploadData.insData.assets = std::vector<VBucket>(size, asset); 207 uploadData.updData.record = std::vector<VBucket>(size, tmp); 208 uploadData.updData.extend = std::vector<VBucket>(size, tmp); 209 uploadData.updData.assets = std::vector<VBucket>(size, asset); 210 uploadData.delData.record = std::vector<VBucket>(size, tmp); 211 uploadData.delData.extend = std::vector<VBucket>(size, tmp); 212 } 213 CallTryToAddSyncTask(CloudTaskInfo && taskInfo)214 int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo) 215 { 216 return TryToAddSyncTask(std::move(taskInfo)); 217 } 218 PopTaskQueue()219 void PopTaskQueue() 220 { 221 taskQueue_.pop_back(); 222 } 223 CallPrepareSync(TaskId taskId)224 int CallPrepareSync(TaskId taskId) 225 { 226 return PrepareSync(taskId); 227 } 228 CallNotify()229 void CallNotify() 230 { 231 auto info = cloudTaskInfos_[currentContext_.currentTaskId]; 232 currentContext_.notifier->NotifyProcess(info, {}); 233 } 234 SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)235 void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields) 236 { 237 currentContext_.tableName = tableName; 238 currentContext_.assetFields[currentContext_.tableName] = assetFields; 239 } 240 241 std::map<std::string, Assets> TestTagAssetsInSingleRecord( 242 VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false) 243 { 244 int ret = E_OK; 245 return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret); 246 } 247 TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)248 bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data) 249 { 250 return IsDataContainDuplicateAsset(assetFields, data); 251 } 252 SetCloudWaterMarks(const TableName & tableName,const std::string & mark)253 void SetCloudWaterMarks(const TableName &tableName, const std::string &mark) 254 { 255 currentContext_.tableName = tableName; 256 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark; 257 } 258 CallDownloadAssets()259 int CallDownloadAssets() 260 { 261 InnerProcessInfo info; 262 std::vector<std::string> pKColNames; 263 std::set<Key> dupHashKeySet; 264 ChangedData changedAssets; 265 return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets); 266 } 267 SetCurrentContext(TaskId taskId)268 void SetCurrentContext(TaskId taskId) 269 { 270 currentContext_.currentTaskId = taskId; 271 } 272 SetLastTaskId(TaskId taskId)273 void SetLastTaskId(TaskId taskId) 274 { 275 lastTaskId_ = taskId; 276 } 277 SetCurrentTaskPause()278 void SetCurrentTaskPause() 279 { 280 cloudTaskInfos_[currentContext_.currentTaskId].pause = true; 281 } 282 SetAssetDownloadList(int downloadCount)283 void SetAssetDownloadList(int downloadCount) 284 { 285 for (int i = 0; i < downloadCount; ++i) { 286 currentContext_.assetDownloadList.push_back({}); 287 } 288 } 289 SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)290 void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query) 291 { 292 std::vector<QuerySyncObject> queryList; 293 queryList.push_back(query); 294 cloudTaskInfos_[taskId].queryList = queryList; 295 } 296 CallGetQuerySyncObject(const std::string & tableName)297 QuerySyncObject CallGetQuerySyncObject(const std::string &tableName) 298 { 299 return CloudSyncer::GetQuerySyncObject(tableName); 300 } 301 CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)302 void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark) 303 { 304 CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark); 305 } 306 CallRecordWaterMark(TaskId taskId,Timestamp waterMark)307 void CallRecordWaterMark(TaskId taskId, Timestamp waterMark) 308 { 309 CloudSyncer::RecordWaterMark(taskId, waterMark); 310 } 311 SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)312 void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam) 313 { 314 resumeTaskInfos_[taskId].syncParam = syncParam; 315 resumeTaskInfos_[taskId].context.tableName = syncParam.tableName; 316 } 317 ClearResumeTaskInfo(TaskId taskId)318 void ClearResumeTaskInfo(TaskId taskId) 319 { 320 resumeTaskInfos_.erase(taskId); 321 } 322 SetTaskResume(TaskId taskId,bool resume)323 void SetTaskResume(TaskId taskId, bool resume) 324 { 325 cloudTaskInfos_[taskId].resume = resume; 326 } 327 CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)328 int CallGetSyncParamForDownload(TaskId taskId, SyncParam ¶m) 329 { 330 return CloudSyncer::GetSyncParamForDownload(taskId, param); 331 } 332 IsResumeTaskUpload(TaskId taskId)333 bool IsResumeTaskUpload(TaskId taskId) 334 { 335 return resumeTaskInfos_[taskId].upload; 336 } 337 CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)338 int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 339 VBucket &localAssetInfo) 340 { 341 return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo); 342 } 343 GetProcessRecorder()344 std::shared_ptr<ProcessRecorder> GetProcessRecorder() 345 { 346 std::lock_guard<std::mutex> autoLock(dataLock_); 347 return currentContext_.processRecorder; 348 } 349 CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)350 int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload) 351 { 352 CloudTaskInfo taskInfo; 353 { 354 std::lock_guard<std::mutex> autoLock(dataLock_); 355 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 356 } 357 return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload); 358 } 359 CallDoUploadInNeed()360 int CallDoUploadInNeed() 361 { 362 CloudTaskInfo taskInfo; 363 { 364 std::lock_guard<std::mutex> autoLock(dataLock_); 365 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 366 } 367 return DoUploadInNeed(taskInfo, true); 368 } 369 CloudTaskInfo taskInfo_; 370 private: 371 std::map<TaskId, SyncProcess> process_; 372 }; 373 374 } 375 #endif // #define CLOUDSYNCER_TEST_H