• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef CLOUDSYNCER_TEST_H
16 #define CLOUDSYNCER_TEST_H
17 
18 #include "cloud_merge_strategy.h"
19 #include "cloud_syncer.h"
20 #include "cloud_syncer_test.h"
21 #include "cloud_sync_utils.h"
22 #include "mock_iclouddb.h"
23 
24 namespace DistributedDB {
25 
26 const Asset ASSET_COPY = { .version = 1,
27     .name = "Phone",
28     .assetId = "0",
29     .subpath = "/local/sync",
30     .uri = "/local/sync",
31     .modifyTime = "123456",
32     .createTime = "",
33     .size = "256",
34     .hash = "ASE" };
35 
36 class TestStorageProxy : public StorageProxy {
37 public:
TestStorageProxy(ICloudSyncStorageInterface * iCloud)38     explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud)
39     {
40         Init();
41     }
42 };
43 
44 class TestCloudSyncer : public CloudSyncer {
45 public:
TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46     explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy)
47     {
48     }
49     ~TestCloudSyncer() override = default;
50     DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer);
51 
InitCloudSyncer(TaskId taskId,SyncMode mode)52     void InitCloudSyncer(TaskId taskId, SyncMode mode)
53     {
54         cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()});
55         cloudTaskInfos_[taskId].mode = mode;
56         cloudTaskInfos_[taskId].taskId = taskId;
57         currentContext_.tableName = "TestTable" + std::to_string(taskId);
58         cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName);
59         currentContext_.currentTaskId = taskId;
60         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
61         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
62         currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users);
63         currentContext_.strategy = std::make_shared<CloudMergeStrategy>();
64         closed_ = false;
65         cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) {
66             if (process.size() >= 1u) {
67                 process_[taskId] = process.begin()->second;
68             } else {
69                 SyncProcess tmpProcess;
70                 process_[taskId] = tmpProcess;
71             }
72         };
73     }
74 
SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)75     void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess)
76     {
77         cloudTaskInfos_[currentContext_.currentTaskId].table = tables;
78         cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess;
79     }
80 
GetCurrentCloudTaskInfos()81     CloudTaskInfo GetCurrentCloudTaskInfos()
82     {
83         return cloudTaskInfos_[currentContext_.currentTaskId];
84     }
85 
CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)86     int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables,
87         const SyncProcessCallback &onProcess, int64_t waitTime)
88     {
89         CloudTaskInfo taskInfo;
90         taskInfo.mode = mode;
91         taskInfo.table = tables;
92         taskInfo.callback = onProcess;
93         taskInfo.timeout = waitTime;
94         return TryToAddSyncTask(std::move(taskInfo));
95     }
96 
CallClose()97     void CallClose()
98     {
99         currentContext_.currentTaskId = 0u;
100         currentContext_.strategy = nullptr;
101         currentContext_.notifier = nullptr;
102         Close();
103     }
SetTimeOut(TaskId taskId,int64_t timeout)104     void SetTimeOut(TaskId taskId, int64_t timeout)
105     {
106         this->cloudTaskInfos_[taskId].timeout = timeout;
107     }
108 
InitCloudSyncerForSync()109     void InitCloudSyncerForSync()
110     {
111         this->closed_ = false;
112         this->cloudTaskInfos_[this->lastTaskId_].callback = [this](
113             const std::map<std::string, SyncProcess> &process) {
114             if (process.size() == 1u) {
115                 process_[this->lastTaskId_] = process.begin()->second;
116             } else {
117                 SyncProcess tmpProcess;
118                 process_[this->lastTaskId_] = tmpProcess;
119             }
120         };
121     }
122 
CallDoSyncInner(const CloudTaskInfo & taskInfo)123     int CallDoSyncInner(const CloudTaskInfo &taskInfo)
124     {
125         return DoSyncInner(taskInfo);
126     }
127 
getCallback(TaskId taskId)128     SyncProcessCallback getCallback(TaskId taskId)
129     {
130         return cloudTaskInfos_[taskId].callback;
131     }
132 
getCurrentTaskId()133     TaskId getCurrentTaskId()
134     {
135         return currentContext_.currentTaskId;
136     }
137 
138     int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT)
139     {
140         storageProxy_->StartTransaction();
141         int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction);
142         storageProxy_->Commit();
143         return ret;
144     }
145 
CallDoDownload(TaskId taskId)146     int CallDoDownload(TaskId taskId)
147     {
148         return CloudSyncer::DoDownload(taskId, true);
149     }
150 
GetCurrentContextTableName()151     std::string GetCurrentContextTableName()
152     {
153         return this->currentContext_.tableName;
154     }
155 
SetCurrentContextTableName(std::string name)156     void SetCurrentContextTableName(std::string name)
157     {
158         this->currentContext_.tableName = name;
159     }
160 
CallClearCloudSyncData(CloudSyncData & uploadData)161     void CallClearCloudSyncData(CloudSyncData& uploadData)
162     {
163         CloudSyncUtils::ClearCloudSyncData(uploadData);
164     }
165 
GetUploadSuccessCount(TaskId taskId)166     int32_t GetUploadSuccessCount(TaskId taskId)
167     {
168         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount;
169     }
170 
GetUploadFailCount(TaskId taskId)171     int32_t GetUploadFailCount(TaskId taskId)
172     {
173         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount;
174     }
175 
SetMockICloudDB(MockICloudDB * icloudDB)176     void SetMockICloudDB(MockICloudDB *icloudDB)
177     {
178         this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB));
179     }
180 
SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)181     void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB)
182     {
183         this->cloudDB_.SetCloudDB(icloudDB);
184     }
185 
SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)186     CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table,
187         SyncProcessCallback callback, int64_t timeout)
188     {
189         CloudTaskInfo taskInfo;
190         taskInfo.mode = mode;
191         taskInfo.table = table;
192         taskInfo.callback = callback;
193         taskInfo.timeout = timeout;
194         return taskInfo;
195     }
196 
initFullCloudSyncData(CloudSyncData & uploadData,int size)197     void initFullCloudSyncData(CloudSyncData &uploadData, int size)
198     {
199         VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1),
200                         std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1),
201                         std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"),
202                         std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
203         VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
204         uploadData.insData.record = std::vector<VBucket>(size, tmp);
205         uploadData.insData.extend = std::vector<VBucket>(size, tmp);
206         uploadData.insData.assets = std::vector<VBucket>(size, asset);
207         uploadData.updData.record = std::vector<VBucket>(size, tmp);
208         uploadData.updData.extend = std::vector<VBucket>(size, tmp);
209         uploadData.updData.assets = std::vector<VBucket>(size, asset);
210         uploadData.delData.record = std::vector<VBucket>(size, tmp);
211         uploadData.delData.extend = std::vector<VBucket>(size, tmp);
212     }
213 
CallTryToAddSyncTask(CloudTaskInfo && taskInfo)214     int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo)
215     {
216         return TryToAddSyncTask(std::move(taskInfo));
217     }
218 
PopTaskQueue()219     void PopTaskQueue()
220     {
221         taskQueue_.pop_back();
222     }
223 
CallPrepareSync(TaskId taskId)224     int CallPrepareSync(TaskId taskId)
225     {
226         return PrepareSync(taskId);
227     }
228 
CallNotify()229     void CallNotify()
230     {
231         auto info = cloudTaskInfos_[currentContext_.currentTaskId];
232         currentContext_.notifier->NotifyProcess(info, {});
233     }
234 
SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)235     void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields)
236     {
237         currentContext_.tableName = tableName;
238         currentContext_.assetFields[currentContext_.tableName] = assetFields;
239     }
240 
241     std::map<std::string, Assets> TestTagAssetsInSingleRecord(
242         VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false)
243     {
244         int ret = E_OK;
245         return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret);
246     }
247 
TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)248     bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data)
249     {
250         return IsDataContainDuplicateAsset(assetFields, data);
251     }
252 
SetCloudWaterMarks(const TableName & tableName,const std::string & mark)253     void SetCloudWaterMarks(const TableName &tableName, const std::string &mark)
254     {
255         currentContext_.tableName = tableName;
256         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark;
257     }
258 
CallDownloadAssets()259     int CallDownloadAssets()
260     {
261         InnerProcessInfo info;
262         std::vector<std::string> pKColNames;
263         std::set<Key> dupHashKeySet;
264         ChangedData changedAssets;
265         return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets);
266     }
267 
SetCurrentContext(TaskId taskId)268     void SetCurrentContext(TaskId taskId)
269     {
270         currentContext_.currentTaskId = taskId;
271     }
272 
SetLastTaskId(TaskId taskId)273     void SetLastTaskId(TaskId taskId)
274     {
275         lastTaskId_ = taskId;
276     }
277 
SetCurrentTaskPause()278     void SetCurrentTaskPause()
279     {
280         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
281     }
282 
SetAssetDownloadList(int downloadCount)283     void SetAssetDownloadList(int downloadCount)
284     {
285         for (int i = 0; i < downloadCount; ++i) {
286             currentContext_.assetDownloadList.push_back({});
287         }
288     }
289 
SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)290     void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query)
291     {
292         std::vector<QuerySyncObject> queryList;
293         queryList.push_back(query);
294         cloudTaskInfos_[taskId].queryList = queryList;
295     }
296 
CallGetQuerySyncObject(const std::string & tableName)297     QuerySyncObject CallGetQuerySyncObject(const std::string &tableName)
298     {
299         return CloudSyncer::GetQuerySyncObject(tableName);
300     }
301 
CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)302     void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
303     {
304         CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark);
305     }
306 
CallRecordWaterMark(TaskId taskId,Timestamp waterMark)307     void CallRecordWaterMark(TaskId taskId, Timestamp waterMark)
308     {
309         CloudSyncer::RecordWaterMark(taskId, waterMark);
310     }
311 
SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)312     void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam)
313     {
314         resumeTaskInfos_[taskId].syncParam = syncParam;
315         resumeTaskInfos_[taskId].context.tableName = syncParam.tableName;
316     }
317 
ClearResumeTaskInfo(TaskId taskId)318     void ClearResumeTaskInfo(TaskId taskId)
319     {
320         resumeTaskInfos_.erase(taskId);
321     }
322 
SetTaskResume(TaskId taskId,bool resume)323     void SetTaskResume(TaskId taskId, bool resume)
324     {
325         cloudTaskInfos_[taskId].resume = resume;
326     }
327 
CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)328     int CallGetSyncParamForDownload(TaskId taskId, SyncParam &param)
329     {
330         return CloudSyncer::GetSyncParamForDownload(taskId, param);
331     }
332 
IsResumeTaskUpload(TaskId taskId)333     bool IsResumeTaskUpload(TaskId taskId)
334     {
335         return resumeTaskInfos_[taskId].upload;
336     }
337 
CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)338     int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
339         VBucket &localAssetInfo)
340     {
341         return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
342     }
343 
GetProcessRecorder()344     std::shared_ptr<ProcessRecorder> GetProcessRecorder()
345     {
346         std::lock_guard<std::mutex> autoLock(dataLock_);
347         return currentContext_.processRecorder;
348     }
349 
CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)350     int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload)
351     {
352         CloudTaskInfo taskInfo;
353         {
354             std::lock_guard<std::mutex> autoLock(dataLock_);
355             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
356         }
357         return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
358     }
359 
CallDoUploadInNeed()360     int CallDoUploadInNeed()
361     {
362         CloudTaskInfo taskInfo;
363         {
364             std::lock_guard<std::mutex> autoLock(dataLock_);
365             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
366         }
367         return DoUploadInNeed(taskInfo, true);
368     }
369     CloudTaskInfo taskInfo_;
370 private:
371     std::map<TaskId, SyncProcess> process_;
372 };
373 
374 }
375 #endif // #define CLOUDSYNCER_TEST_H