• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef CLOUDSYNCER_TEST_H
16 #define CLOUDSYNCER_TEST_H
17 
18 #include "cloud_merge_strategy.h"
19 #include "cloud_syncer.h"
20 #include "cloud_syncer_test.h"
21 #include "cloud_sync_utils.h"
22 #include "mock_iclouddb.h"
23 
24 namespace DistributedDB {
25 
26 const Asset ASSET_COPY = { .version = 1,
27     .name = "Phone",
28     .assetId = "0",
29     .subpath = "/local/sync",
30     .uri = "/local/sync",
31     .modifyTime = "123456",
32     .createTime = "",
33     .size = "256",
34     .hash = "ASE" };
35 
36 class TestStorageProxy : public StorageProxy {
37 public:
TestStorageProxy(ICloudSyncStorageInterface * iCloud)38     explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud)
39     {
40         Init();
41     }
42 };
43 
44 class TestCloudSyncer : public CloudSyncer {
45 public:
TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46     explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy)
47     {
48     }
49     ~TestCloudSyncer() override = default;
50     DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer);
51 
InitCloudSyncer(TaskId taskId,SyncMode mode)52     void InitCloudSyncer(TaskId taskId, SyncMode mode)
53     {
54         cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()});
55         cloudTaskInfos_[taskId].mode = mode;
56         cloudTaskInfos_[taskId].taskId = taskId;
57         currentContext_.tableName = "TestTable" + std::to_string(taskId);
58         cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName);
59         currentContext_.currentTaskId = taskId;
60         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
61         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
62         currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users);
63         currentContext_.strategy = std::make_shared<CloudMergeStrategy>();
64         currentContext_.strategy->SetIsKvScene(isKvScene_);
65         closed_ = false;
66         cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) {
67             if (process.size() >= 1u) {
68                 process_[taskId] = process.begin()->second;
69             } else {
70                 SyncProcess tmpProcess;
71                 process_[taskId] = tmpProcess;
72             }
73         };
74     }
75 
SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)76     void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess)
77     {
78         cloudTaskInfos_[currentContext_.currentTaskId].table = tables;
79         cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess;
80     }
81 
GetCurrentCloudTaskInfos()82     CloudTaskInfo GetCurrentCloudTaskInfos()
83     {
84         return cloudTaskInfos_[currentContext_.currentTaskId];
85     }
86 
CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)87     int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables,
88         const SyncProcessCallback &onProcess, int64_t waitTime)
89     {
90         CloudTaskInfo taskInfo;
91         taskInfo.mode = mode;
92         taskInfo.table = tables;
93         taskInfo.callback = onProcess;
94         taskInfo.timeout = waitTime;
95         return TryToAddSyncTask(std::move(taskInfo));
96     }
97 
CallClose()98     void CallClose()
99     {
100         currentContext_.currentTaskId = 0u;
101         currentContext_.strategy = nullptr;
102         currentContext_.notifier = nullptr;
103         Close();
104     }
SetTimeOut(TaskId taskId,int64_t timeout)105     void SetTimeOut(TaskId taskId, int64_t timeout)
106     {
107         this->cloudTaskInfos_[taskId].timeout = timeout;
108     }
109 
InitCloudSyncerForSync()110     void InitCloudSyncerForSync()
111     {
112         this->closed_ = false;
113         this->cloudTaskInfos_[this->lastTaskId_].callback = [this](
114             const std::map<std::string, SyncProcess> &process) {
115             if (process.size() == 1u) {
116                 process_[this->lastTaskId_] = process.begin()->second;
117             } else {
118                 SyncProcess tmpProcess;
119                 process_[this->lastTaskId_] = tmpProcess;
120             }
121         };
122     }
123 
CallDoSyncInner(const CloudTaskInfo & taskInfo)124     int CallDoSyncInner(const CloudTaskInfo &taskInfo)
125     {
126         return DoSyncInner(taskInfo);
127     }
128 
getCallback(TaskId taskId)129     SyncProcessCallback getCallback(TaskId taskId)
130     {
131         return cloudTaskInfos_[taskId].callback;
132     }
133 
getCurrentTaskId()134     TaskId getCurrentTaskId()
135     {
136         return currentContext_.currentTaskId;
137     }
138 
139     int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT)
140     {
141         storageProxy_->StartTransaction();
142         int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction);
143         storageProxy_->Commit();
144         return ret;
145     }
146 
CallDoDownload(TaskId taskId)147     int CallDoDownload(TaskId taskId)
148     {
149         return CloudSyncer::DoDownload(taskId, true);
150     }
151 
GetCurrentContextTableName()152     std::string GetCurrentContextTableName()
153     {
154         return this->currentContext_.tableName;
155     }
156 
SetCurrentContextTableName(std::string name)157     void SetCurrentContextTableName(std::string name)
158     {
159         this->currentContext_.tableName = name;
160     }
161 
CallClearCloudSyncData(CloudSyncData & uploadData)162     void CallClearCloudSyncData(CloudSyncData& uploadData)
163     {
164         CloudSyncUtils::ClearCloudSyncData(uploadData);
165     }
166 
GetUploadSuccessCount(TaskId taskId)167     int32_t GetUploadSuccessCount(TaskId taskId)
168     {
169         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount;
170     }
171 
GetUploadFailCount(TaskId taskId)172     int32_t GetUploadFailCount(TaskId taskId)
173     {
174         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount;
175     }
176 
SetMockICloudDB(MockICloudDB * icloudDB)177     void SetMockICloudDB(MockICloudDB *icloudDB)
178     {
179         this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB));
180     }
181 
SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)182     void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB)
183     {
184         this->cloudDB_.SetCloudDB(icloudDB);
185     }
186 
SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)187     CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table,
188         SyncProcessCallback callback, int64_t timeout)
189     {
190         CloudTaskInfo taskInfo;
191         taskInfo.mode = mode;
192         taskInfo.table = table;
193         taskInfo.callback = callback;
194         taskInfo.timeout = timeout;
195         return taskInfo;
196     }
197 
initFullCloudSyncData(CloudSyncData & uploadData,int size)198     void initFullCloudSyncData(CloudSyncData &uploadData, int size)
199     {
200         VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1),
201                         std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1),
202                         std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"),
203                         std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
204         VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
205         uploadData.insData.record = std::vector<VBucket>(size, tmp);
206         uploadData.insData.extend = std::vector<VBucket>(size, tmp);
207         uploadData.insData.assets = std::vector<VBucket>(size, asset);
208         uploadData.updData.record = std::vector<VBucket>(size, tmp);
209         uploadData.updData.extend = std::vector<VBucket>(size, tmp);
210         uploadData.updData.assets = std::vector<VBucket>(size, asset);
211         uploadData.delData.record = std::vector<VBucket>(size, tmp);
212         uploadData.delData.extend = std::vector<VBucket>(size, tmp);
213     }
214 
CallTryToAddSyncTask(CloudTaskInfo && taskInfo)215     int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo)
216     {
217         return TryToAddSyncTask(std::move(taskInfo));
218     }
219 
CallIsAlreadyHaveCompensatedSyncTask()220     bool CallIsAlreadyHaveCompensatedSyncTask()
221     {
222         return IsAlreadyHaveCompensatedSyncTask();
223     }
224 
PopTaskQueue()225     void PopTaskQueue()
226     {
227         if (!taskQueue_.empty()) {
228             taskQueue_.erase(--taskQueue_.end());
229         }
230     }
231 
CallPrepareSync(TaskId taskId)232     int CallPrepareSync(TaskId taskId)
233     {
234         return PrepareSync(taskId);
235     }
236 
CallNotify()237     void CallNotify()
238     {
239         auto info = cloudTaskInfos_[currentContext_.currentTaskId];
240         currentContext_.notifier->NotifyProcess(info, {});
241     }
242 
SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)243     void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields)
244     {
245         currentContext_.tableName = tableName;
246         currentContext_.assetFields[currentContext_.tableName] = assetFields;
247     }
248 
249     std::map<std::string, Assets> TestTagAssetsInSingleRecord(
250         VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false)
251     {
252         int ret = E_OK;
253         return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, false, ret);
254     }
255 
TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)256     bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data)
257     {
258         return IsDataContainDuplicateAsset(assetFields, data);
259     }
260 
SetCloudWaterMarks(const TableName & tableName,const std::string & mark)261     void SetCloudWaterMarks(const TableName &tableName, const std::string &mark)
262     {
263         currentContext_.tableName = tableName;
264         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark;
265     }
266 
CallDownloadAssets()267     int CallDownloadAssets()
268     {
269         InnerProcessInfo info;
270         std::vector<std::string> pKColNames;
271         std::set<Key> dupHashKeySet;
272         ChangedData changedAssets;
273         return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets);
274     }
275 
SetCurrentContext(TaskId taskId)276     void SetCurrentContext(TaskId taskId)
277     {
278         currentContext_.currentTaskId = taskId;
279     }
280 
SetLastTaskId(TaskId taskId)281     void SetLastTaskId(TaskId taskId)
282     {
283         lastTaskId_ = taskId;
284     }
285 
SetCurrentTaskPause()286     void SetCurrentTaskPause()
287     {
288         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
289     }
290 
SetAssetDownloadList(int downloadCount)291     void SetAssetDownloadList(int downloadCount)
292     {
293         for (int i = 0; i < downloadCount; ++i) {
294             currentContext_.assetDownloadList.push_back({});
295         }
296     }
297 
SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)298     void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query)
299     {
300         std::vector<QuerySyncObject> queryList;
301         queryList.push_back(query);
302         cloudTaskInfos_[taskId].queryList = queryList;
303     }
304 
CallGetQuerySyncObject(const std::string & tableName)305     QuerySyncObject CallGetQuerySyncObject(const std::string &tableName)
306     {
307         return CloudSyncer::GetQuerySyncObject(tableName);
308     }
309 
CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)310     void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
311     {
312         CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark);
313     }
314 
CallRecordWaterMark(TaskId taskId,Timestamp waterMark)315     void CallRecordWaterMark(TaskId taskId, Timestamp waterMark)
316     {
317         CloudSyncer::RecordWaterMark(taskId, waterMark);
318     }
319 
SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)320     void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam)
321     {
322         resumeTaskInfos_[taskId].syncParam = syncParam;
323         resumeTaskInfos_[taskId].context.tableName = syncParam.tableName;
324     }
325 
ClearResumeTaskInfo(TaskId taskId)326     void ClearResumeTaskInfo(TaskId taskId)
327     {
328         resumeTaskInfos_.erase(taskId);
329     }
330 
SetTaskResume(TaskId taskId,bool resume)331     void SetTaskResume(TaskId taskId, bool resume)
332     {
333         cloudTaskInfos_[taskId].resume = resume;
334     }
335 
CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)336     int CallGetSyncParamForDownload(TaskId taskId, SyncParam &param)
337     {
338         return CloudSyncer::GetSyncParamForDownload(taskId, param);
339     }
340 
SetResumeTaskUpload(TaskId taskId,bool upload)341     void SetResumeTaskUpload(TaskId taskId, bool upload)
342     {
343         resumeTaskInfos_[taskId].upload = upload;
344     }
345 
IsResumeTaskUpload(TaskId taskId)346     bool IsResumeTaskUpload(TaskId taskId)
347     {
348         return resumeTaskInfos_[taskId].upload;
349     }
350 
CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)351     int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
352         VBucket &localAssetInfo)
353     {
354         return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
355     }
356 
GetProcessRecorder()357     std::shared_ptr<ProcessRecorder> GetProcessRecorder()
358     {
359         std::lock_guard<std::mutex> autoLock(dataLock_);
360         return currentContext_.processRecorder;
361     }
362 
CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)363     int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload)
364     {
365         CloudTaskInfo taskInfo;
366         {
367             std::lock_guard<std::mutex> autoLock(dataLock_);
368             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
369         }
370         return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
371     }
372 
CallDoUploadInNeed()373     int CallDoUploadInNeed()
374     {
375         CloudTaskInfo taskInfo;
376         {
377             std::lock_guard<std::mutex> autoLock(dataLock_);
378             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
379         }
380         return DoUploadInNeed(taskInfo, true);
381     }
382     CloudTaskInfo taskInfo_;
383 private:
384     std::map<TaskId, SyncProcess> process_;
385 };
386 
387 }
388 #endif // #define CLOUDSYNCER_TEST_H