• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef CLOUDSYNCER_TEST_H
16 #define CLOUDSYNCER_TEST_H
17 
18 #include "cloud_merge_strategy.h"
19 #include "cloud_syncer.h"
20 #include "cloud_syncer_test.h"
21 #include "mock_iclouddb.h"
22 
23 namespace DistributedDB {
24 
25 class TestStorageProxy : public StorageProxy {
26 public:
TestStorageProxy(ICloudSyncStorageInterface * iCloud)27     explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud)
28     {
29         Init();
30     }
31 };
32 
33 class TestCloudSyncer : public CloudSyncer {
34 public:
TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)35     explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy)
36     {
37     }
38     ~TestCloudSyncer() override = default;
39     DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer);
40 
InitCloudSyncer(TaskId taskId,SyncMode mode)41     void InitCloudSyncer(TaskId taskId, SyncMode mode)
42     {
43         cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()});
44         cloudTaskInfos_[taskId].mode = mode;
45         cloudTaskInfos_[taskId].taskId = taskId;
46         currentContext_.currentTaskId = taskId;
47         currentContext_.tableName = "TestTable" + std::to_string(taskId);
48         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
49         currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" });
50         currentContext_.strategy = std::make_shared<CloudMergeStrategy>();
51         closed_ = false;
52         cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) {
53             if (process.size() >= 1u) {
54                 process_[taskId] = process.begin()->second;
55             } else {
56                 SyncProcess tmpProcess;
57                 process_[taskId] = tmpProcess;
58             }
59         };
60     }
61 
SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)62     void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess)
63     {
64         cloudTaskInfos_[currentContext_.currentTaskId].table = tables;
65         cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess;
66     }
67 
GetCurrentCloudTaskInfos()68     CloudTaskInfo GetCurrentCloudTaskInfos()
69     {
70         return cloudTaskInfos_[currentContext_.currentTaskId];
71     }
72 
CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)73     int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables,
74         const SyncProcessCallback &onProcess, int64_t waitTime)
75     {
76         CloudTaskInfo taskInfo;
77         taskInfo.mode = mode;
78         taskInfo.table = tables;
79         taskInfo.callback = onProcess;
80         taskInfo.timeout = waitTime;
81         return TryToAddSyncTask(std::move(taskInfo));
82     }
83 
CallClose()84     void CallClose()
85     {
86         currentContext_.currentTaskId = 0u;
87         Close();
88     }
SetTimeOut(TaskId taskId,int64_t timeout)89     void SetTimeOut(TaskId taskId, int64_t timeout)
90     {
91         this->cloudTaskInfos_[taskId].timeout = timeout;
92     }
93 
InitCloudSyncerForSync()94     void InitCloudSyncerForSync()
95     {
96         this->closed_ = false;
97         this->cloudTaskInfos_[this->currentTaskId_].callback = [this](
98             const std::map<std::string, SyncProcess> &process) {
99             if (process.size() == 1u) {
100                 process_[this->currentTaskId_] = process.begin()->second;
101             } else {
102                 SyncProcess tmpProcess;
103                 process_[this->currentTaskId_] = tmpProcess;
104             }
105         };
106     }
107 
CallDoSyncInner(const CloudTaskInfo & taskInfo,const bool & needUpload)108     int CallDoSyncInner(const CloudTaskInfo &taskInfo, const bool &needUpload)
109     {
110         return DoSyncInner(taskInfo, needUpload);
111     }
112 
getCallback(TaskId taskId)113     SyncProcessCallback getCallback(TaskId taskId)
114     {
115         return cloudTaskInfos_[taskId].callback;
116     }
117 
getCurrentTaskId()118     TaskId getCurrentTaskId()
119     {
120         return currentContext_.currentTaskId;
121     }
122 
123     int CallDoUpload(TaskId taskId, bool lastTable = false)
124     {
125         storageProxy_->StartTransaction();
126         int ret = CloudSyncer::DoUpload(taskId, lastTable);
127         storageProxy_->Commit();
128         return ret;
129     }
130 
CallDoDownload(TaskId taskId)131     int CallDoDownload(TaskId taskId)
132     {
133         return CloudSyncer::DoDownload(taskId);
134     }
135 
GetCurrentContextTableName()136     std::string GetCurrentContextTableName()
137     {
138         return this->currentContext_.tableName;
139     }
140 
SetCurrentContextTableName(std::string name)141     void SetCurrentContextTableName(std::string name)
142     {
143         this->currentContext_.tableName = name;
144     }
145 
CallClearCloudSyncData(CloudSyncData & uploadData)146     void CallClearCloudSyncData(CloudSyncData& uploadData)
147     {
148         this->ClearCloudSyncData(uploadData);
149     }
150 
GetUploadSuccessCount(TaskId taskId)151     int32_t GetUploadSuccessCount(TaskId taskId)
152     {
153         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount;
154     }
155 
GetUploadFailCount(TaskId taskId)156     int32_t GetUploadFailCount(TaskId taskId)
157     {
158         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount;
159     }
160 
SetMockICloudDB(MockICloudDB * icloudDB)161     void SetMockICloudDB(MockICloudDB *icloudDB)
162     {
163         this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB));
164     }
165 
SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)166     void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB)
167     {
168         this->cloudDB_.SetCloudDB(icloudDB);
169     }
170 
CallDoFinished(TaskId taskId,int errCode,const InnerProcessInfo & processInfo)171     void CallDoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo)
172     {
173         DoFinished(taskId,  errCode,  processInfo);
174     }
175 
SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)176     CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table,
177         SyncProcessCallback callback, int64_t timeout)
178     {
179         CloudTaskInfo taskInfo;
180         taskInfo.mode = mode;
181         taskInfo.table = table;
182         taskInfo.callback = callback;
183         taskInfo.timeout = timeout;
184         return taskInfo;
185     }
186 
initFullCloudSyncData(CloudSyncData & uploadData,int size)187     void initFullCloudSyncData(CloudSyncData &uploadData, int size)
188     {
189         VBucket tmp = {std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1),
190                        std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1)};
191         uploadData.insData.record = std::vector<VBucket>(size, tmp);
192         uploadData.insData.extend = std::vector<VBucket>(size, tmp);
193         uploadData.updData.record = std::vector<VBucket>(size, tmp);
194         uploadData.updData.extend = std::vector<VBucket>(size, tmp);
195         uploadData.delData.record = std::vector<VBucket>(size, tmp);
196         uploadData.delData.extend = std::vector<VBucket>(size, tmp);
197     }
198 
CallTryToAddSyncTask(CloudTaskInfo && taskInfo)199     int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo)
200     {
201         return TryToAddSyncTask(std::move(taskInfo));
202     }
203 
PopTaskQueue()204     void PopTaskQueue()
205     {
206         taskQueue_.pop_back();
207     }
208 
CallPrepareSync(TaskId taskId)209     int CallPrepareSync(TaskId taskId)
210     {
211         return PrepareSync(taskId);
212     }
213 
CallNotify()214     void CallNotify()
215     {
216         auto info = cloudTaskInfos_[currentContext_.currentTaskId];
217         currentContext_.notifier->NotifyProcess(info, {});
218     }
219 
SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)220     void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields)
221     {
222         currentContext_.tableName = tableName;
223         currentContext_.assetFields[currentContext_.tableName] = assetFields;
224     }
225 
226     std::map<std::string, Assets> TestTagAssetsInSingleRecord(
227         VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false)
228     {
229         int ret = E_OK;
230         return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret);
231     }
232 
TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)233     bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data)
234     {
235         return IsDataContainDuplicateAsset(assetFields, data);
236     }
237 
SetCloudWaterMarks(const TableName & tableName,const std::string & mark)238     void SetCloudWaterMarks(const TableName &tableName, const std::string &mark)
239     {
240         currentContext_.tableName = tableName;
241         currentContext_.cloudWaterMarks[tableName] = mark;
242     }
243 
244     CloudTaskInfo taskInfo_;
245 private:
246     std::map<TaskId, SyncProcess> process_;
247 };
248 
249 }
250 #endif // #define CLOUDSYNCER_TEST_H