1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef CLOUDSYNCER_TEST_H 16 #define CLOUDSYNCER_TEST_H 17 18 #include "cloud_merge_strategy.h" 19 #include "cloud_syncer.h" 20 #include "cloud_syncer_test.h" 21 #include "cloud_sync_utils.h" 22 #include "mock_iclouddb.h" 23 24 namespace DistributedDB { 25 26 const Asset ASSET_COPY = { .version = 1, 27 .name = "Phone", 28 .assetId = "0", 29 .subpath = "/local/sync", 30 .uri = "/local/sync", 31 .modifyTime = "123456", 32 .createTime = "", 33 .size = "256", 34 .hash = "ASE" }; 35 36 class TestStorageProxy : public StorageProxy { 37 public: TestStorageProxy(ICloudSyncStorageInterface * iCloud)38 explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud) 39 { 40 Init(); 41 } 42 }; 43 44 class TestCloudSyncer : public CloudSyncer { 45 public: TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46 explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy) 47 { 48 } 49 ~TestCloudSyncer() override = default; 50 DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer); 51 InitCloudSyncer(TaskId taskId,SyncMode mode)52 void InitCloudSyncer(TaskId taskId, SyncMode mode) 53 { 54 cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()}); 55 cloudTaskInfos_[taskId].mode = mode; 56 cloudTaskInfos_[taskId].taskId = taskId; 57 currentContext_.tableName = "TestTable" + std::to_string(taskId); 58 cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName); 59 currentContext_.currentTaskId = taskId; 60 currentContext_.notifier = std::make_shared<ProcessNotifier>(this); 61 currentContext_.processRecorder = std::make_shared<ProcessRecorder>(); 62 currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users); 63 currentContext_.strategy = std::make_shared<CloudMergeStrategy>(); 64 currentContext_.strategy->SetIsKvScene(isKvScene_); 65 closed_ = false; 66 cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) { 67 if (process.size() >= 1u) { 68 process_[taskId] = process.begin()->second; 69 } else { 70 SyncProcess tmpProcess; 71 process_[taskId] = tmpProcess; 72 } 73 }; 74 } 75 SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)76 void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess) 77 { 78 cloudTaskInfos_[currentContext_.currentTaskId].table = tables; 79 cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess; 80 } 81 GetCurrentCloudTaskInfos()82 CloudTaskInfo GetCurrentCloudTaskInfos() 83 { 84 return cloudTaskInfos_[currentContext_.currentTaskId]; 85 } 86 CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)87 int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables, 88 const SyncProcessCallback &onProcess, int64_t waitTime) 89 { 90 CloudTaskInfo taskInfo; 91 taskInfo.mode = mode; 92 taskInfo.table = tables; 93 taskInfo.callback = onProcess; 94 taskInfo.timeout = waitTime; 95 return TryToAddSyncTask(std::move(taskInfo)); 96 } 97 CallClose()98 void CallClose() 99 { 100 currentContext_.currentTaskId = 0u; 101 currentContext_.strategy = nullptr; 102 currentContext_.notifier = nullptr; 103 Close(); 104 } SetTimeOut(TaskId taskId,int64_t timeout)105 void SetTimeOut(TaskId taskId, int64_t timeout) 106 { 107 this->cloudTaskInfos_[taskId].timeout = timeout; 108 } 109 InitCloudSyncerForSync()110 void InitCloudSyncerForSync() 111 { 112 this->closed_ = false; 113 this->cloudTaskInfos_[this->lastTaskId_].callback = [this]( 114 const std::map<std::string, SyncProcess> &process) { 115 if (process.size() == 1u) { 116 process_[this->lastTaskId_] = process.begin()->second; 117 } else { 118 SyncProcess tmpProcess; 119 process_[this->lastTaskId_] = tmpProcess; 120 } 121 }; 122 } 123 CallDoSyncInner(const CloudTaskInfo & taskInfo)124 int CallDoSyncInner(const CloudTaskInfo &taskInfo) 125 { 126 return DoSyncInner(taskInfo); 127 } 128 getCallback(TaskId taskId)129 SyncProcessCallback getCallback(TaskId taskId) 130 { 131 return cloudTaskInfos_[taskId].callback; 132 } 133 getCurrentTaskId()134 TaskId getCurrentTaskId() 135 { 136 return currentContext_.currentTaskId; 137 } 138 139 int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT) 140 { 141 storageProxy_->StartTransaction(); 142 int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction); 143 storageProxy_->Commit(); 144 return ret; 145 } 146 CallDoDownload(TaskId taskId)147 int CallDoDownload(TaskId taskId) 148 { 149 return CloudSyncer::DoDownload(taskId, true); 150 } 151 GetCurrentContextTableName()152 std::string GetCurrentContextTableName() 153 { 154 return this->currentContext_.tableName; 155 } 156 SetCurrentContextTableName(std::string name)157 void SetCurrentContextTableName(std::string name) 158 { 159 this->currentContext_.tableName = name; 160 } 161 CallClearCloudSyncData(CloudSyncData & uploadData)162 void CallClearCloudSyncData(CloudSyncData& uploadData) 163 { 164 CloudSyncUtils::ClearCloudSyncData(uploadData); 165 } 166 GetUploadSuccessCount(TaskId taskId)167 int32_t GetUploadSuccessCount(TaskId taskId) 168 { 169 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount; 170 } 171 GetUploadFailCount(TaskId taskId)172 int32_t GetUploadFailCount(TaskId taskId) 173 { 174 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount; 175 } 176 SetMockICloudDB(MockICloudDB * icloudDB)177 void SetMockICloudDB(MockICloudDB *icloudDB) 178 { 179 this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB)); 180 } 181 SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)182 void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB) 183 { 184 this->cloudDB_.SetCloudDB(icloudDB); 185 } 186 SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)187 CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table, 188 SyncProcessCallback callback, int64_t timeout) 189 { 190 CloudTaskInfo taskInfo; 191 taskInfo.mode = mode; 192 taskInfo.table = table; 193 taskInfo.callback = callback; 194 taskInfo.timeout = timeout; 195 return taskInfo; 196 } 197 initFullCloudSyncData(CloudSyncData & uploadData,int size)198 void initFullCloudSyncData(CloudSyncData &uploadData, int size) 199 { 200 VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1), 201 std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1), 202 std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"), 203 std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 204 VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 205 uploadData.insData.record = std::vector<VBucket>(size, tmp); 206 uploadData.insData.extend = std::vector<VBucket>(size, tmp); 207 uploadData.insData.assets = std::vector<VBucket>(size, asset); 208 uploadData.updData.record = std::vector<VBucket>(size, tmp); 209 uploadData.updData.extend = std::vector<VBucket>(size, tmp); 210 uploadData.updData.assets = std::vector<VBucket>(size, asset); 211 uploadData.delData.record = std::vector<VBucket>(size, tmp); 212 uploadData.delData.extend = std::vector<VBucket>(size, tmp); 213 } 214 CallTryToAddSyncTask(CloudTaskInfo && taskInfo)215 int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo) 216 { 217 return TryToAddSyncTask(std::move(taskInfo)); 218 } 219 CallIsAlreadyHaveCompensatedSyncTask()220 bool CallIsAlreadyHaveCompensatedSyncTask() 221 { 222 return IsAlreadyHaveCompensatedSyncTask(); 223 } 224 PopTaskQueue()225 void PopTaskQueue() 226 { 227 if (!taskQueue_.empty()) { 228 taskQueue_.erase(--taskQueue_.end()); 229 } 230 } 231 CallPrepareSync(TaskId taskId)232 int CallPrepareSync(TaskId taskId) 233 { 234 return PrepareSync(taskId); 235 } 236 CallNotify()237 void CallNotify() 238 { 239 auto info = cloudTaskInfos_[currentContext_.currentTaskId]; 240 currentContext_.notifier->NotifyProcess(info, {}); 241 } 242 SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)243 void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields) 244 { 245 currentContext_.tableName = tableName; 246 currentContext_.assetFields[currentContext_.tableName] = assetFields; 247 } 248 249 std::map<std::string, Assets> TestTagAssetsInSingleRecord( 250 VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false) 251 { 252 int ret = E_OK; 253 return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, false, ret); 254 } 255 TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)256 bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data) 257 { 258 return IsDataContainDuplicateAsset(assetFields, data); 259 } 260 SetCloudWaterMarks(const TableName & tableName,const std::string & mark)261 void SetCloudWaterMarks(const TableName &tableName, const std::string &mark) 262 { 263 currentContext_.tableName = tableName; 264 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark; 265 } 266 CallDownloadAssets()267 int CallDownloadAssets() 268 { 269 InnerProcessInfo info; 270 std::vector<std::string> pKColNames; 271 std::set<Key> dupHashKeySet; 272 ChangedData changedAssets; 273 return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets); 274 } 275 SetCurrentContext(TaskId taskId)276 void SetCurrentContext(TaskId taskId) 277 { 278 currentContext_.currentTaskId = taskId; 279 } 280 SetLastTaskId(TaskId taskId)281 void SetLastTaskId(TaskId taskId) 282 { 283 lastTaskId_ = taskId; 284 } 285 SetCurrentTaskPause()286 void SetCurrentTaskPause() 287 { 288 cloudTaskInfos_[currentContext_.currentTaskId].pause = true; 289 } 290 SetAssetDownloadList(int downloadCount)291 void SetAssetDownloadList(int downloadCount) 292 { 293 for (int i = 0; i < downloadCount; ++i) { 294 currentContext_.assetDownloadList.push_back({}); 295 } 296 } 297 SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)298 void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query) 299 { 300 std::vector<QuerySyncObject> queryList; 301 queryList.push_back(query); 302 cloudTaskInfos_[taskId].queryList = queryList; 303 } 304 CallGetQuerySyncObject(const std::string & tableName)305 QuerySyncObject CallGetQuerySyncObject(const std::string &tableName) 306 { 307 return CloudSyncer::GetQuerySyncObject(tableName); 308 } 309 CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)310 void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark) 311 { 312 CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark); 313 } 314 CallRecordWaterMark(TaskId taskId,Timestamp waterMark)315 void CallRecordWaterMark(TaskId taskId, Timestamp waterMark) 316 { 317 CloudSyncer::RecordWaterMark(taskId, waterMark); 318 } 319 SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)320 void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam) 321 { 322 resumeTaskInfos_[taskId].syncParam = syncParam; 323 resumeTaskInfos_[taskId].context.tableName = syncParam.tableName; 324 } 325 ClearResumeTaskInfo(TaskId taskId)326 void ClearResumeTaskInfo(TaskId taskId) 327 { 328 resumeTaskInfos_.erase(taskId); 329 } 330 SetTaskResume(TaskId taskId,bool resume)331 void SetTaskResume(TaskId taskId, bool resume) 332 { 333 cloudTaskInfos_[taskId].resume = resume; 334 } 335 CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)336 int CallGetSyncParamForDownload(TaskId taskId, SyncParam ¶m) 337 { 338 return CloudSyncer::GetSyncParamForDownload(taskId, param); 339 } 340 SetResumeTaskUpload(TaskId taskId,bool upload)341 void SetResumeTaskUpload(TaskId taskId, bool upload) 342 { 343 resumeTaskInfos_[taskId].upload = upload; 344 } 345 IsResumeTaskUpload(TaskId taskId)346 bool IsResumeTaskUpload(TaskId taskId) 347 { 348 return resumeTaskInfos_[taskId].upload; 349 } 350 CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)351 int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 352 VBucket &localAssetInfo) 353 { 354 return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo); 355 } 356 GetProcessRecorder()357 std::shared_ptr<ProcessRecorder> GetProcessRecorder() 358 { 359 std::lock_guard<std::mutex> autoLock(dataLock_); 360 return currentContext_.processRecorder; 361 } 362 CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)363 int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload) 364 { 365 CloudTaskInfo taskInfo; 366 { 367 std::lock_guard<std::mutex> autoLock(dataLock_); 368 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 369 } 370 return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload); 371 } 372 CallDoUploadInNeed()373 int CallDoUploadInNeed() 374 { 375 CloudTaskInfo taskInfo; 376 { 377 std::lock_guard<std::mutex> autoLock(dataLock_); 378 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 379 } 380 return DoUploadInNeed(taskInfo, true); 381 } 382 CloudTaskInfo taskInfo_; 383 private: 384 std::map<TaskId, SyncProcess> process_; 385 }; 386 387 } 388 #endif // #define CLOUDSYNCER_TEST_H