1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef CLOUDSYNCER_TEST_H 16 #define CLOUDSYNCER_TEST_H 17 18 #include "cloud_merge_strategy.h" 19 #include "cloud_syncer.h" 20 #include "cloud_syncer_test.h" 21 #include "mock_iclouddb.h" 22 23 namespace DistributedDB { 24 25 class TestStorageProxy : public StorageProxy { 26 public: TestStorageProxy(ICloudSyncStorageInterface * iCloud)27 explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud) 28 { 29 Init(); 30 } 31 }; 32 33 class TestCloudSyncer : public CloudSyncer { 34 public: TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)35 explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy) 36 { 37 } 38 ~TestCloudSyncer() override = default; 39 DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer); 40 InitCloudSyncer(TaskId taskId,SyncMode mode)41 void InitCloudSyncer(TaskId taskId, SyncMode mode) 42 { 43 cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()}); 44 cloudTaskInfos_[taskId].mode = mode; 45 cloudTaskInfos_[taskId].taskId = taskId; 46 currentContext_.currentTaskId = taskId; 47 currentContext_.tableName = "TestTable" + std::to_string(taskId); 48 currentContext_.notifier = std::make_shared<ProcessNotifier>(this); 49 currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }); 50 currentContext_.strategy = std::make_shared<CloudMergeStrategy>(); 51 closed_ = false; 52 cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) { 53 if (process.size() >= 1u) { 54 process_[taskId] = process.begin()->second; 55 } else { 56 SyncProcess tmpProcess; 57 process_[taskId] = tmpProcess; 58 } 59 }; 60 } 61 SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)62 void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess) 63 { 64 cloudTaskInfos_[currentContext_.currentTaskId].table = tables; 65 cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess; 66 } 67 GetCurrentCloudTaskInfos()68 CloudTaskInfo GetCurrentCloudTaskInfos() 69 { 70 return cloudTaskInfos_[currentContext_.currentTaskId]; 71 } 72 CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)73 int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables, 74 const SyncProcessCallback &onProcess, int64_t waitTime) 75 { 76 CloudTaskInfo taskInfo; 77 taskInfo.mode = mode; 78 taskInfo.table = tables; 79 taskInfo.callback = onProcess; 80 taskInfo.timeout = waitTime; 81 return TryToAddSyncTask(std::move(taskInfo)); 82 } 83 CallClose()84 void CallClose() 85 { 86 currentContext_.currentTaskId = 0u; 87 Close(); 88 } SetTimeOut(TaskId taskId,int64_t timeout)89 void SetTimeOut(TaskId taskId, int64_t timeout) 90 { 91 this->cloudTaskInfos_[taskId].timeout = timeout; 92 } 93 InitCloudSyncerForSync()94 void InitCloudSyncerForSync() 95 { 96 this->closed_ = false; 97 this->cloudTaskInfos_[this->currentTaskId_].callback = [this]( 98 const std::map<std::string, SyncProcess> &process) { 99 if (process.size() == 1u) { 100 process_[this->currentTaskId_] = process.begin()->second; 101 } else { 102 SyncProcess tmpProcess; 103 process_[this->currentTaskId_] = tmpProcess; 104 } 105 }; 106 } 107 CallDoSyncInner(const CloudTaskInfo & taskInfo,const bool & needUpload)108 int CallDoSyncInner(const CloudTaskInfo &taskInfo, const bool &needUpload) 109 { 110 return DoSyncInner(taskInfo, needUpload); 111 } 112 getCallback(TaskId taskId)113 SyncProcessCallback getCallback(TaskId taskId) 114 { 115 return cloudTaskInfos_[taskId].callback; 116 } 117 getCurrentTaskId()118 TaskId getCurrentTaskId() 119 { 120 return currentContext_.currentTaskId; 121 } 122 123 int CallDoUpload(TaskId taskId, bool lastTable = false) 124 { 125 storageProxy_->StartTransaction(); 126 int ret = CloudSyncer::DoUpload(taskId, lastTable); 127 storageProxy_->Commit(); 128 return ret; 129 } 130 CallDoDownload(TaskId taskId)131 int CallDoDownload(TaskId taskId) 132 { 133 return CloudSyncer::DoDownload(taskId); 134 } 135 GetCurrentContextTableName()136 std::string GetCurrentContextTableName() 137 { 138 return this->currentContext_.tableName; 139 } 140 SetCurrentContextTableName(std::string name)141 void SetCurrentContextTableName(std::string name) 142 { 143 this->currentContext_.tableName = name; 144 } 145 CallClearCloudSyncData(CloudSyncData & uploadData)146 void CallClearCloudSyncData(CloudSyncData& uploadData) 147 { 148 this->ClearCloudSyncData(uploadData); 149 } 150 GetUploadSuccessCount(TaskId taskId)151 int32_t GetUploadSuccessCount(TaskId taskId) 152 { 153 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount; 154 } 155 GetUploadFailCount(TaskId taskId)156 int32_t GetUploadFailCount(TaskId taskId) 157 { 158 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount; 159 } 160 SetMockICloudDB(MockICloudDB * icloudDB)161 void SetMockICloudDB(MockICloudDB *icloudDB) 162 { 163 this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB)); 164 } 165 SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)166 void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB) 167 { 168 this->cloudDB_.SetCloudDB(icloudDB); 169 } 170 CallDoFinished(TaskId taskId,int errCode,const InnerProcessInfo & processInfo)171 void CallDoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo) 172 { 173 DoFinished(taskId, errCode, processInfo); 174 } 175 SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)176 CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table, 177 SyncProcessCallback callback, int64_t timeout) 178 { 179 CloudTaskInfo taskInfo; 180 taskInfo.mode = mode; 181 taskInfo.table = table; 182 taskInfo.callback = callback; 183 taskInfo.timeout = timeout; 184 return taskInfo; 185 } 186 initFullCloudSyncData(CloudSyncData & uploadData,int size)187 void initFullCloudSyncData(CloudSyncData &uploadData, int size) 188 { 189 VBucket tmp = {std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1), 190 std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1)}; 191 uploadData.insData.record = std::vector<VBucket>(size, tmp); 192 uploadData.insData.extend = std::vector<VBucket>(size, tmp); 193 uploadData.updData.record = std::vector<VBucket>(size, tmp); 194 uploadData.updData.extend = std::vector<VBucket>(size, tmp); 195 uploadData.delData.record = std::vector<VBucket>(size, tmp); 196 uploadData.delData.extend = std::vector<VBucket>(size, tmp); 197 } 198 CallTryToAddSyncTask(CloudTaskInfo && taskInfo)199 int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo) 200 { 201 return TryToAddSyncTask(std::move(taskInfo)); 202 } 203 PopTaskQueue()204 void PopTaskQueue() 205 { 206 taskQueue_.pop_back(); 207 } 208 CallPrepareSync(TaskId taskId)209 int CallPrepareSync(TaskId taskId) 210 { 211 return PrepareSync(taskId); 212 } 213 CallNotify()214 void CallNotify() 215 { 216 auto info = cloudTaskInfos_[currentContext_.currentTaskId]; 217 currentContext_.notifier->NotifyProcess(info, {}); 218 } 219 SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)220 void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields) 221 { 222 currentContext_.tableName = tableName; 223 currentContext_.assetFields[currentContext_.tableName] = assetFields; 224 } 225 226 std::map<std::string, Assets> TestTagAssetsInSingleRecord( 227 VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false) 228 { 229 int ret = E_OK; 230 return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret); 231 } 232 TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)233 bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data) 234 { 235 return IsDataContainDuplicateAsset(assetFields, data); 236 } 237 SetCloudWaterMarks(const TableName & tableName,const std::string & mark)238 void SetCloudWaterMarks(const TableName &tableName, const std::string &mark) 239 { 240 currentContext_.tableName = tableName; 241 currentContext_.cloudWaterMarks[tableName] = mark; 242 } 243 244 CloudTaskInfo taskInfo_; 245 private: 246 std::map<TaskId, SyncProcess> process_; 247 }; 248 249 } 250 #endif // #define CLOUDSYNCER_TEST_H