• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "generic_virtual_device.h"
16 
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21 
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24     : communicateHandle_(nullptr),
25       communicatorAggregator_(nullptr),
26       storage_(nullptr),
27       metadata_(nullptr),
28       deviceId_(std::move(deviceId)),
29       remoteDeviceId_("real_device"),
30       targetUserId_("targetUser"),
31       context_(nullptr),
32       onRemoteDataChanged_(nullptr),
33       subManager_(nullptr),
34       executor_(nullptr)
35 {
36 }
37 
~GenericVirtualDevice()38 GenericVirtualDevice::~GenericVirtualDevice()
39 {
40     std::mutex cvMutex;
41     std::condition_variable cv;
42     bool finished = false;
43     Offline();
44 
45     if (communicateHandle_ != nullptr) {
46         communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
47         communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
48         communicateHandle_ = nullptr;
49     }
50     communicatorAggregator_ = nullptr;
51 
52     if (context_ != nullptr) {
53         ISyncInterface *storage = storage_;
54         context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
55             delete storage;
56             {
57                 std::lock_guard<std::mutex> lock(cvMutex);
58                 finished = true;
59             }
60             cv.notify_one();
61         });
62         RefObject::KillAndDecObjRef(context_);
63         std::unique_lock<std::mutex> lock(cvMutex);
64         cv.wait(lock, [&finished] { return finished; });
65     } else {
66         delete storage_;
67     }
68     context_ = nullptr;
69     metadata_ = nullptr;
70     storage_ = nullptr;
71     if (executor_ != nullptr) {
72         RefObject::KillAndDecObjRef(executor_);
73         executor_ = nullptr;
74     }
75 }
76 
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)77 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
78 {
79     if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
80         return -E_INVALID_ARGS;
81     }
82 
83     communicatorAggregator_ = comAggregator;
84     int errCode = E_OK;
85     communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
86     if (communicateHandle_ == nullptr) {
87         return errCode;
88     }
89 
90     storage_ = syncInterface;
91     metadata_ = std::make_shared<Metadata>();
92     if (metadata_->Initialize(storage_) != E_OK) {
93         LOGE("metadata_ init failed");
94         return -E_NOT_SUPPORT;
95     }
96     if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
97         context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
98         subManager_ = std::make_shared<SubscribeManager>();
99         static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
100     } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
101         context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
102     } else {
103 #ifndef OMIT_MULTI_VER
104         context_ = new (std::nothrow) MultiVerSyncTaskContext;
105 #else
106 	return -E_NOT_SUPPORT;
107 #endif // OMIT_MULTI_VER
108     }
109     if (context_ == nullptr) {
110         return -E_OUT_OF_MEMORY;
111     }
112     communicateHandle_->RegOnMessageCallback(
113         std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
114     context_->Initialize({remoteDeviceId_, targetUserId_}, storage_, metadata_, communicateHandle_);
115     context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
116     context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
117 
118     executor_ = new (std::nothrow) RemoteExecutor();
119     if (executor_ == nullptr) {
120         return -E_OUT_OF_MEMORY;
121     }
122     executor_->Initialize(syncInterface, communicateHandle_);
123     return E_OK;
124 }
125 
SetDeviceId(const std::string & deviceId)126 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
127 {
128     deviceId_ = deviceId;
129 }
130 
GetDeviceId() const131 std::string GenericVirtualDevice::GetDeviceId() const
132 {
133     return deviceId_;
134 }
135 
MessageCallback(const std::string & deviceId,Message * inMsg)136 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
137 {
138     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
139         if (onRemoteDataChanged_) {
140             onRemoteDataChanged_(deviceId);
141             delete inMsg;
142             inMsg = nullptr;
143             return E_OK;
144         }
145         delete inMsg;
146         inMsg = nullptr;
147         return -E_INVALID_ARGS;
148     }
149 
150     LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
151     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
152         RefObject::IncObjRef(executor_);
153         executor_->ReceiveMessage(deviceId, inMsg);
154         RefObject::DecObjRef(executor_);
155         return E_OK;
156     }
157 
158     RefObject::IncObjRef(context_);
159     RefObject::IncObjRef(communicateHandle_);
160     SyncTaskContext *context = context_;
161     ICommunicator *communicateHandle = communicateHandle_;
162     std::thread thread([context, communicateHandle, inMsg]() {
163         int errCode = context->ReceiveMessageCallback(inMsg);
164         if (errCode != -E_NOT_NEED_DELETE_MSG) {
165             delete inMsg;
166         }
167         RefObject::DecObjRef(context);
168         RefObject::DecObjRef(communicateHandle);
169     });
170     thread.detach();
171     return E_OK;
172 }
173 
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)174 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
175 {
176     onRemoteDataChanged_ = callback;
177 }
178 
Online()179 void GenericVirtualDevice::Online()
180 {
181     static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
182     communicatorAggregator_->OnlineDevice(deviceId_);
183 }
184 
Offline()185 void GenericVirtualDevice::Offline()
186 {
187     static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
188     communicatorAggregator_->OfflineDevice(deviceId_);
189 }
190 
StartResponseTask()191 int GenericVirtualDevice::StartResponseTask()
192 {
193     LOGD("[KvVirtualDevice] StartResponseTask");
194     RefObject::AutoLock lockGuard(context_);
195     int status = context_->GetTaskExecStatus();
196     if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
197         LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
198         return -E_NOT_SUPPORT;
199     }
200     if (context_->IsTargetQueueEmpty()) {
201         LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
202         return E_OK;
203     }
204     context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
205     context_->MoveToNextTarget(DBConstant::MIN_TIMEOUT);
206     LOGI("[KvVirtualDevice] machine StartSync");
207     context_->UnlockObj();
208     int errCode = context_->StartStateMachine();
209     context_->LockObj();
210     if (errCode != E_OK) {
211         LOGE("[KvVirtualDevice] machine StartSync failed");
212         context_->SetOperationStatus(SyncOperation::OP_FAILED);
213     }
214     return errCode;
215 }
216 
GetLocalTimeOffset() const217 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
218 {
219     return metadata_->GetLocalTimeOffset();
220 }
221 
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & processMap,const DeviceSyncProcessCallback & onProcess) const222 void GenericVirtualDevice::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
223     const DeviceSyncProcessCallback &onProcess) const
224 {
225     std::map<std::string, DeviceSyncProcess> result;
226     for (const auto &pair : processMap) {
227         DeviceSyncProcess info = pair.second;
228         int status = info.errCode;
229         info.errCode = SyncOperation::DBStatusTrans(status);
230         info.process = SyncOperation::DBStatusTransProcess(status);
231         result.insert(std::pair<std::string, DeviceSyncProcess>(pair.first, info));
232     }
233     if (onProcess) {
234         onProcess(result);
235     }
236 }
237 
Sync(const DeviceSyncOption & option,const DeviceSyncProcessCallback & onProcess)238 int GenericVirtualDevice::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
239 {
240     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, option.mode, nullptr, option.isWait);
241     if (operation == nullptr) {
242         return -E_OUT_OF_MEMORY;
243     }
244     DeviceSyncProcessCallback onSyncProcess = std::bind(&GenericVirtualDevice::OnDeviceSyncProcess, this,
245         std::placeholders::_1, onProcess);
246     operation->SetSyncProcessCallFun(onSyncProcess);
247     operation->Initialize();
248     operation->SetOnSyncFinished([operation](int id) {
249         operation->NotifyIfNeed();
250     });
251     int errCode = E_OK;
252     if (option.isQuery) {
253         QuerySyncObject querySyncObject(option.query);
254         errCode = querySyncObject.Init();
255         if (errCode != E_OK) {
256             return errCode;
257         }
258         operation->SetQuery(querySyncObject);
259     }
260     errCode = context_->AddSyncOperation(operation);
261     operation->WaitIfNeed();
262     RefObject::KillAndDecObjRef(operation);
263     return errCode;
264 }
265 
Sync(SyncMode mode,bool wait)266 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
267 {
268     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
269     if (operation == nullptr) {
270         return -E_OUT_OF_MEMORY;
271     }
272     operation->Initialize();
273     operation->SetOnSyncFinished([operation](int id) {
274         operation->NotifyIfNeed();
275     });
276     context_->AddSyncOperation(operation);
277     operation->WaitIfNeed();
278     RefObject::KillAndDecObjRef(operation);
279     return E_OK;
280 }
281 
Sync(SyncMode mode,const Query & query,bool wait)282 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
283 {
284     return Sync(mode, query, nullptr, wait);
285 }
286 
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)287 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
288     const SyncOperation::UserCallback &callBack, bool wait)
289 {
290     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
291     if (operation == nullptr) {
292         return -E_OUT_OF_MEMORY;
293     }
294     operation->Initialize();
295     operation->SetOnSyncFinished([operation](int id) {
296         operation->NotifyIfNeed();
297     });
298     QuerySyncObject querySyncObject(query);
299     int errCode = querySyncObject.Init();
300     if (errCode != E_OK) {
301         return errCode;
302     }
303     operation->SetQuery(querySyncObject);
304     context_->AddSyncOperation(operation);
305     operation->WaitIfNeed();
306     RefObject::KillAndDecObjRef(operation);
307     return errCode;
308 }
309 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)310 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
311     uint64_t timeout, std::shared_ptr<ResultSet> &result)
312 {
313     if (executor_ == nullptr) {
314         result = nullptr;
315         return TransferDBErrno(-E_BUSY);
316     }
317     int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
318     if (errCode != E_OK) {
319         result = nullptr;
320     }
321     return TransferDBErrno(errCode);
322 }
323 
SetClearRemoteStaleData(bool isStaleData)324 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
325 {
326     if (context_ != nullptr) {
327         static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
328         LOGD("set clear remote stale data mark");
329     }
330 }
331 } // DistributedDB
332