1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "generic_virtual_device.h"
16
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24 : communicateHandle_(nullptr),
25 communicatorAggregator_(nullptr),
26 storage_(nullptr),
27 metadata_(nullptr),
28 deviceId_(std::move(deviceId)),
29 remoteDeviceId_("real_device"),
30 targetUserId_("targetUser"),
31 context_(nullptr),
32 onRemoteDataChanged_(nullptr),
33 subManager_(nullptr),
34 executor_(nullptr)
35 {
36 }
37
~GenericVirtualDevice()38 GenericVirtualDevice::~GenericVirtualDevice()
39 {
40 std::mutex cvMutex;
41 std::condition_variable cv;
42 bool finished = false;
43 Offline();
44
45 if (communicateHandle_ != nullptr) {
46 communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
47 communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
48 communicateHandle_ = nullptr;
49 }
50 communicatorAggregator_ = nullptr;
51
52 if (context_ != nullptr) {
53 ISyncInterface *storage = storage_;
54 context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
55 delete storage;
56 {
57 std::lock_guard<std::mutex> lock(cvMutex);
58 finished = true;
59 }
60 cv.notify_one();
61 });
62 RefObject::KillAndDecObjRef(context_);
63 std::unique_lock<std::mutex> lock(cvMutex);
64 cv.wait(lock, [&finished] { return finished; });
65 } else {
66 delete storage_;
67 }
68 context_ = nullptr;
69 metadata_ = nullptr;
70 storage_ = nullptr;
71 if (executor_ != nullptr) {
72 RefObject::KillAndDecObjRef(executor_);
73 executor_ = nullptr;
74 }
75 }
76
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)77 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
78 {
79 if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
80 return -E_INVALID_ARGS;
81 }
82
83 communicatorAggregator_ = comAggregator;
84 int errCode = E_OK;
85 communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
86 if (communicateHandle_ == nullptr) {
87 return errCode;
88 }
89
90 storage_ = syncInterface;
91 metadata_ = std::make_shared<Metadata>();
92 if (metadata_->Initialize(storage_) != E_OK) {
93 LOGE("metadata_ init failed");
94 return -E_NOT_SUPPORT;
95 }
96 if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
97 context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
98 subManager_ = std::make_shared<SubscribeManager>();
99 static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
100 } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
101 context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
102 } else {
103 #ifndef OMIT_MULTI_VER
104 context_ = new (std::nothrow) MultiVerSyncTaskContext;
105 #else
106 return -E_NOT_SUPPORT;
107 #endif // OMIT_MULTI_VER
108 }
109 if (context_ == nullptr) {
110 return -E_OUT_OF_MEMORY;
111 }
112 communicateHandle_->RegOnMessageCallback(
113 std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
114 context_->Initialize({remoteDeviceId_, targetUserId_}, storage_, metadata_, communicateHandle_);
115 context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
116 context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
117
118 executor_ = new (std::nothrow) RemoteExecutor();
119 if (executor_ == nullptr) {
120 return -E_OUT_OF_MEMORY;
121 }
122 executor_->Initialize(syncInterface, communicateHandle_);
123 return E_OK;
124 }
125
SetDeviceId(const std::string & deviceId)126 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
127 {
128 deviceId_ = deviceId;
129 }
130
GetDeviceId() const131 std::string GenericVirtualDevice::GetDeviceId() const
132 {
133 return deviceId_;
134 }
135
MessageCallback(const std::string & deviceId,Message * inMsg)136 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
137 {
138 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
139 if (onRemoteDataChanged_) {
140 onRemoteDataChanged_(deviceId);
141 delete inMsg;
142 inMsg = nullptr;
143 return E_OK;
144 }
145 delete inMsg;
146 inMsg = nullptr;
147 return -E_INVALID_ARGS;
148 }
149
150 LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
151 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
152 RefObject::IncObjRef(executor_);
153 executor_->ReceiveMessage(deviceId, inMsg);
154 RefObject::DecObjRef(executor_);
155 return E_OK;
156 }
157
158 RefObject::IncObjRef(context_);
159 RefObject::IncObjRef(communicateHandle_);
160 SyncTaskContext *context = context_;
161 ICommunicator *communicateHandle = communicateHandle_;
162 std::thread thread([context, communicateHandle, inMsg]() {
163 int errCode = context->ReceiveMessageCallback(inMsg);
164 if (errCode != -E_NOT_NEED_DELETE_MSG) {
165 delete inMsg;
166 }
167 RefObject::DecObjRef(context);
168 RefObject::DecObjRef(communicateHandle);
169 });
170 thread.detach();
171 return E_OK;
172 }
173
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)174 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
175 {
176 onRemoteDataChanged_ = callback;
177 }
178
Online()179 void GenericVirtualDevice::Online()
180 {
181 static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
182 communicatorAggregator_->OnlineDevice(deviceId_);
183 }
184
Offline()185 void GenericVirtualDevice::Offline()
186 {
187 static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
188 communicatorAggregator_->OfflineDevice(deviceId_);
189 }
190
StartResponseTask()191 int GenericVirtualDevice::StartResponseTask()
192 {
193 LOGD("[KvVirtualDevice] StartResponseTask");
194 RefObject::AutoLock lockGuard(context_);
195 int status = context_->GetTaskExecStatus();
196 if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
197 LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
198 return -E_NOT_SUPPORT;
199 }
200 if (context_->IsTargetQueueEmpty()) {
201 LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
202 return E_OK;
203 }
204 context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
205 context_->MoveToNextTarget(DBConstant::MIN_TIMEOUT);
206 LOGI("[KvVirtualDevice] machine StartSync");
207 context_->UnlockObj();
208 int errCode = context_->StartStateMachine();
209 context_->LockObj();
210 if (errCode != E_OK) {
211 LOGE("[KvVirtualDevice] machine StartSync failed");
212 context_->SetOperationStatus(SyncOperation::OP_FAILED);
213 }
214 return errCode;
215 }
216
GetLocalTimeOffset() const217 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
218 {
219 return metadata_->GetLocalTimeOffset();
220 }
221
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & processMap,const DeviceSyncProcessCallback & onProcess) const222 void GenericVirtualDevice::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
223 const DeviceSyncProcessCallback &onProcess) const
224 {
225 std::map<std::string, DeviceSyncProcess> result;
226 for (const auto &pair : processMap) {
227 DeviceSyncProcess info = pair.second;
228 int status = info.errCode;
229 info.errCode = SyncOperation::DBStatusTrans(status);
230 info.process = SyncOperation::DBStatusTransProcess(status);
231 result.insert(std::pair<std::string, DeviceSyncProcess>(pair.first, info));
232 }
233 if (onProcess) {
234 onProcess(result);
235 }
236 }
237
Sync(const DeviceSyncOption & option,const DeviceSyncProcessCallback & onProcess)238 int GenericVirtualDevice::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
239 {
240 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, option.mode, nullptr, option.isWait);
241 if (operation == nullptr) {
242 return -E_OUT_OF_MEMORY;
243 }
244 DeviceSyncProcessCallback onSyncProcess = std::bind(&GenericVirtualDevice::OnDeviceSyncProcess, this,
245 std::placeholders::_1, onProcess);
246 operation->SetSyncProcessCallFun(onSyncProcess);
247 operation->Initialize();
248 operation->SetOnSyncFinished([operation](int id) {
249 operation->NotifyIfNeed();
250 });
251 int errCode = E_OK;
252 if (option.isQuery) {
253 QuerySyncObject querySyncObject(option.query);
254 errCode = querySyncObject.Init();
255 if (errCode != E_OK) {
256 return errCode;
257 }
258 operation->SetQuery(querySyncObject);
259 }
260 errCode = context_->AddSyncOperation(operation);
261 operation->WaitIfNeed();
262 RefObject::KillAndDecObjRef(operation);
263 return errCode;
264 }
265
Sync(SyncMode mode,bool wait)266 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
267 {
268 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
269 if (operation == nullptr) {
270 return -E_OUT_OF_MEMORY;
271 }
272 operation->Initialize();
273 operation->SetOnSyncFinished([operation](int id) {
274 operation->NotifyIfNeed();
275 });
276 context_->AddSyncOperation(operation);
277 operation->WaitIfNeed();
278 RefObject::KillAndDecObjRef(operation);
279 return E_OK;
280 }
281
Sync(SyncMode mode,const Query & query,bool wait)282 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
283 {
284 return Sync(mode, query, nullptr, wait);
285 }
286
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)287 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
288 const SyncOperation::UserCallback &callBack, bool wait)
289 {
290 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
291 if (operation == nullptr) {
292 return -E_OUT_OF_MEMORY;
293 }
294 operation->Initialize();
295 operation->SetOnSyncFinished([operation](int id) {
296 operation->NotifyIfNeed();
297 });
298 QuerySyncObject querySyncObject(query);
299 int errCode = querySyncObject.Init();
300 if (errCode != E_OK) {
301 return errCode;
302 }
303 operation->SetQuery(querySyncObject);
304 context_->AddSyncOperation(operation);
305 operation->WaitIfNeed();
306 RefObject::KillAndDecObjRef(operation);
307 return errCode;
308 }
309
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)310 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
311 uint64_t timeout, std::shared_ptr<ResultSet> &result)
312 {
313 if (executor_ == nullptr) {
314 result = nullptr;
315 return TransferDBErrno(-E_BUSY);
316 }
317 int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
318 if (errCode != E_OK) {
319 result = nullptr;
320 }
321 return TransferDBErrno(errCode);
322 }
323
SetClearRemoteStaleData(bool isStaleData)324 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
325 {
326 if (context_ != nullptr) {
327 static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
328 LOGD("set clear remote stale data mark");
329 }
330 }
331 } // DistributedDB
332