1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_cloud_syncer.h"
17
18 namespace DistributedDB {
VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)19 VirtualCloudSyncer::VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)
20 : CloudSyncer(storageProxy)
21 {
22 }
23
DoDownload(CloudSyncer::TaskId taskId)24 int VirtualCloudSyncer::DoDownload(CloudSyncer::TaskId taskId)
25 {
26 if (!doDownload_) {
27 LOGI("[VirtualCloudSyncer] download just return ok");
28 return E_OK;
29 }
30 if (downloadFunc_) {
31 return downloadFunc_();
32 }
33 return CloudSyncer::DoDownload(taskId);
34 }
35
DoUpload(CloudSyncer::TaskId taskId,bool lastTable)36 int VirtualCloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable)
37 {
38 if (!doUpload_) {
39 LOGI("[VirtualCloudSyncer] upload just return ok");
40 return E_OK;
41 }
42 return CloudSyncer::DoUpload(taskId, lastTable);
43 }
44
SetSyncAction(bool doDownload,bool doUpload)45 void VirtualCloudSyncer::SetSyncAction(bool doDownload, bool doUpload)
46 {
47 doDownload_ = doDownload;
48 doUpload_ = doUpload;
49 }
50
SetDownloadFunc(const std::function<int ()> & function)51 void VirtualCloudSyncer::SetDownloadFunc(const std::function<int()> &function)
52 {
53 downloadFunc_ = function;
54 }
55
Notify(bool notifyIfError)56 void VirtualCloudSyncer::Notify(bool notifyIfError)
57 {
58 TaskId currentTaskId;
59 {
60 std::lock_guard<std::mutex> autoLock(contextLock_);
61 currentTaskId = currentContext_.currentTaskId;
62 }
63 CloudTaskInfo taskInfo;
64 {
65 std::lock_guard<std::mutex> autoLock(queueLock_);
66 taskInfo = cloudTaskInfos_[currentTaskId];
67 }
68 std::lock_guard<std::mutex> autoLock(contextLock_);
69 currentContext_.notifier->NotifyProcess(taskInfo, {}, notifyIfError);
70 }
71
GetQueueCount()72 size_t VirtualCloudSyncer::GetQueueCount()
73 {
74 std::lock_guard<std::mutex> autoLock(queueLock_);
75 return taskQueue_.size();
76 }
77
SetCurrentTaskInfo(const SyncProcessCallback & callback,CloudSyncer::TaskId taskId)78 void VirtualCloudSyncer::SetCurrentTaskInfo(const SyncProcessCallback &callback,
79 CloudSyncer::TaskId taskId)
80 {
81 {
82 std::lock_guard<std::mutex> autoContextLock(contextLock_);
83 currentContext_.currentTaskId = taskId;
84 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
85 }
86 std::lock_guard<std::mutex> autoLock(queueLock_);
87 CloudTaskInfo taskInfo;
88 taskInfo.callback = callback;
89 cloudTaskInfos_[taskId] = taskInfo;
90 }
91
CallTagStatusByStrategy(bool isExist,const DataInfoWithLog & localInfo,const LogInfo & cloudInfo,OpType & strategyOpResult)92 int VirtualCloudSyncer::CallTagStatusByStrategy(bool isExist, const DataInfoWithLog &localInfo,
93 const LogInfo &cloudInfo, OpType &strategyOpResult)
94 {
95 SyncParam param;
96 DataInfo dataInfo;
97 dataInfo.localInfo = localInfo;
98 dataInfo.cloudLogInfo = cloudInfo;
99 return CloudSyncer::TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
100 }
101 }