1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "generic_virtual_device.h"
16
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24 : communicateHandle_(nullptr),
25 communicatorAggregator_(nullptr),
26 storage_(nullptr),
27 metadata_(nullptr),
28 deviceId_(std::move(deviceId)),
29 remoteDeviceId_("real_device"),
30 context_(nullptr),
31 onRemoteDataChanged_(nullptr),
32 subManager_(nullptr),
33 executor_(nullptr)
34 {
35 }
36
~GenericVirtualDevice()37 GenericVirtualDevice::~GenericVirtualDevice()
38 {
39 std::mutex cvMutex;
40 std::condition_variable cv;
41 bool finished = false;
42 Offline();
43
44 if (communicateHandle_ != nullptr) {
45 communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
46 communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
47 communicateHandle_ = nullptr;
48 }
49 communicatorAggregator_ = nullptr;
50
51 if (context_ != nullptr) {
52 ISyncInterface *storage = storage_;
53 context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
54 delete storage;
55 {
56 std::lock_guard<std::mutex> lock(cvMutex);
57 finished = true;
58 }
59 cv.notify_one();
60 });
61 RefObject::KillAndDecObjRef(context_);
62 std::unique_lock<std::mutex> lock(cvMutex);
63 cv.wait(lock, [&finished] { return finished; });
64 } else {
65 delete storage_;
66 }
67 context_ = nullptr;
68 metadata_ = nullptr;
69 storage_ = nullptr;
70 if (executor_ != nullptr) {
71 RefObject::KillAndDecObjRef(executor_);
72 executor_ = nullptr;
73 }
74 }
75
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)76 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
77 {
78 if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
79 return -E_INVALID_ARGS;
80 }
81
82 communicatorAggregator_ = comAggregator;
83 int errCode = E_OK;
84 communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
85 if (communicateHandle_ == nullptr) {
86 return errCode;
87 }
88
89 storage_ = syncInterface;
90 metadata_ = std::make_shared<Metadata>();
91 if (metadata_->Initialize(storage_) != E_OK) {
92 LOGE("metadata_ init failed");
93 return -E_NOT_SUPPORT;
94 }
95 if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
96 context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
97 subManager_ = std::make_shared<SubscribeManager>();
98 static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
99 } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
100 context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
101 } else {
102 context_ = new (std::nothrow) MultiVerSyncTaskContext;
103 }
104 if (context_ == nullptr) {
105 return -E_OUT_OF_MEMORY;
106 }
107 communicateHandle_->RegOnMessageCallback(
108 std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
109 context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
110 context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
111 context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
112
113 executor_ = new (std::nothrow) RemoteExecutor();
114 if (executor_ == nullptr) {
115 return -E_OUT_OF_MEMORY;
116 }
117 executor_->Initialize(syncInterface, communicateHandle_);
118 return E_OK;
119 }
120
SetDeviceId(const std::string & deviceId)121 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
122 {
123 deviceId_ = deviceId;
124 }
125
GetDeviceId() const126 std::string GenericVirtualDevice::GetDeviceId() const
127 {
128 return deviceId_;
129 }
130
MessageCallback(const std::string & deviceId,Message * inMsg)131 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
132 {
133 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
134 if (onRemoteDataChanged_) {
135 onRemoteDataChanged_(deviceId);
136 delete inMsg;
137 inMsg = nullptr;
138 return E_OK;
139 }
140 delete inMsg;
141 inMsg = nullptr;
142 return -E_INVALID_ARGS;
143 }
144
145 LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
146 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
147 RefObject::IncObjRef(executor_);
148 executor_->ReceiveMessage(deviceId, inMsg);
149 RefObject::DecObjRef(executor_);
150 return E_OK;
151 }
152
153 RefObject::IncObjRef(context_);
154 RefObject::IncObjRef(communicateHandle_);
155 SyncTaskContext *context = context_;
156 ICommunicator *communicateHandle = communicateHandle_;
157 std::thread thread([context, communicateHandle, inMsg]() {
158 int errCode = context->ReceiveMessageCallback(inMsg);
159 if (errCode != -E_NOT_NEED_DELETE_MSG) {
160 delete inMsg;
161 }
162 RefObject::DecObjRef(context);
163 RefObject::DecObjRef(communicateHandle);
164 });
165 thread.detach();
166 return E_OK;
167 }
168
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)169 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
170 {
171 onRemoteDataChanged_ = callback;
172 }
173
Online()174 void GenericVirtualDevice::Online()
175 {
176 static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
177 communicatorAggregator_->OnlineDevice(deviceId_);
178 }
179
Offline()180 void GenericVirtualDevice::Offline()
181 {
182 static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
183 communicatorAggregator_->OfflineDevice(deviceId_);
184 }
185
StartResponseTask()186 int GenericVirtualDevice::StartResponseTask()
187 {
188 LOGD("[KvVirtualDevice] StartResponseTask");
189 RefObject::AutoLock lockGuard(context_);
190 int status = context_->GetTaskExecStatus();
191 if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
192 LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
193 return -E_NOT_SUPPORT;
194 }
195 if (context_->IsTargetQueueEmpty()) {
196 LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
197 return E_OK;
198 }
199 context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
200 context_->MoveToNextTarget();
201 LOGI("[KvVirtualDevice] machine StartSync");
202 context_->UnlockObj();
203 int errCode = context_->StartStateMachine();
204 context_->LockObj();
205 if (errCode != E_OK) {
206 LOGE("[KvVirtualDevice] machine StartSync failed");
207 context_->SetOperationStatus(SyncOperation::OP_FAILED);
208 }
209 return errCode;
210 }
211
GetLocalTimeOffset() const212 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
213 {
214 return metadata_->GetLocalTimeOffset();
215 }
216
Sync(SyncMode mode,bool wait)217 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
218 {
219 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
220 if (operation == nullptr) {
221 return -E_OUT_OF_MEMORY;
222 }
223 operation->Initialize();
224 operation->SetOnSyncFinished([operation](int id) {
225 operation->NotifyIfNeed();
226 });
227 context_->AddSyncOperation(operation);
228 operation->WaitIfNeed();
229 RefObject::KillAndDecObjRef(operation);
230 return E_OK;
231 }
232
Sync(SyncMode mode,const Query & query,bool wait)233 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
234 {
235 return Sync(mode, query, nullptr, wait);
236 }
237
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)238 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
239 const SyncOperation::UserCallback &callBack, bool wait)
240 {
241 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
242 if (operation == nullptr) {
243 return -E_OUT_OF_MEMORY;
244 }
245 operation->Initialize();
246 operation->SetOnSyncFinished([operation](int id) {
247 operation->NotifyIfNeed();
248 });
249 QuerySyncObject querySyncObject(query);
250 int errCode = querySyncObject.Init();
251 if (errCode != E_OK) {
252 return errCode;
253 }
254 operation->SetQuery(querySyncObject);
255 context_->AddSyncOperation(operation);
256 operation->WaitIfNeed();
257 RefObject::KillAndDecObjRef(operation);
258 return errCode;
259 }
260
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)261 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
262 uint64_t timeout, std::shared_ptr<ResultSet> &result)
263 {
264 if (executor_ == nullptr) {
265 result = nullptr;
266 return TransferDBErrno(-E_BUSY);
267 }
268 int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
269 if (errCode != E_OK) {
270 result = nullptr;
271 }
272 return TransferDBErrno(errCode);
273 }
274
SetClearRemoteStaleData(bool isStaleData)275 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
276 {
277 if (context_ != nullptr) {
278 static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
279 LOGD("set clear remote stale data mark");
280 }
281 }
282 } // DistributedDB