• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "generic_virtual_device.h"
16 
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21 
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24     : communicateHandle_(nullptr),
25       communicatorAggregator_(nullptr),
26       storage_(nullptr),
27       metadata_(nullptr),
28       deviceId_(std::move(deviceId)),
29       remoteDeviceId_("real_device"),
30       context_(nullptr),
31       onRemoteDataChanged_(nullptr),
32       subManager_(nullptr),
33       executor_(nullptr)
34 {
35 }
36 
~GenericVirtualDevice()37 GenericVirtualDevice::~GenericVirtualDevice()
38 {
39     std::mutex cvMutex;
40     std::condition_variable cv;
41     bool finished = false;
42     Offline();
43 
44     if (communicateHandle_ != nullptr) {
45         communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
46         communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
47         communicateHandle_ = nullptr;
48     }
49     communicatorAggregator_ = nullptr;
50 
51     if (context_ != nullptr) {
52         ISyncInterface *storage = storage_;
53         context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
54             delete storage;
55             {
56                 std::lock_guard<std::mutex> lock(cvMutex);
57                 finished = true;
58             }
59             cv.notify_one();
60         });
61         RefObject::KillAndDecObjRef(context_);
62         std::unique_lock<std::mutex> lock(cvMutex);
63         cv.wait(lock, [&finished] { return finished; });
64     } else {
65         delete storage_;
66     }
67     context_ = nullptr;
68     metadata_ = nullptr;
69     storage_ = nullptr;
70     if (executor_ != nullptr) {
71         RefObject::KillAndDecObjRef(executor_);
72         executor_ = nullptr;
73     }
74 }
75 
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)76 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
77 {
78     if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
79         return -E_INVALID_ARGS;
80     }
81 
82     communicatorAggregator_ = comAggregator;
83     int errCode = E_OK;
84     communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
85     if (communicateHandle_ == nullptr) {
86         return errCode;
87     }
88 
89     storage_ = syncInterface;
90     metadata_ = std::make_shared<Metadata>();
91     if (metadata_->Initialize(storage_) != E_OK) {
92         LOGE("metadata_ init failed");
93         return -E_NOT_SUPPORT;
94     }
95     if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
96         context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
97         subManager_ = std::make_shared<SubscribeManager>();
98         static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
99     } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
100         context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
101     } else {
102 #ifndef OMIT_MULTI_VER
103         context_ = new (std::nothrow) MultiVerSyncTaskContext;
104 #else
105 	return -E_NOT_SUPPORT;
106 #endif // OMIT_MULTI_VER
107     }
108     if (context_ == nullptr) {
109         return -E_OUT_OF_MEMORY;
110     }
111     communicateHandle_->RegOnMessageCallback(
112         std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
113     context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
114     context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
115     context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
116 
117     executor_ = new (std::nothrow) RemoteExecutor();
118     if (executor_ == nullptr) {
119         return -E_OUT_OF_MEMORY;
120     }
121     executor_->Initialize(syncInterface, communicateHandle_);
122     return E_OK;
123 }
124 
SetDeviceId(const std::string & deviceId)125 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
126 {
127     deviceId_ = deviceId;
128 }
129 
GetDeviceId() const130 std::string GenericVirtualDevice::GetDeviceId() const
131 {
132     return deviceId_;
133 }
134 
MessageCallback(const std::string & deviceId,Message * inMsg)135 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
136 {
137     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
138         if (onRemoteDataChanged_) {
139             onRemoteDataChanged_(deviceId);
140             delete inMsg;
141             inMsg = nullptr;
142             return E_OK;
143         }
144         delete inMsg;
145         inMsg = nullptr;
146         return -E_INVALID_ARGS;
147     }
148 
149     LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
150     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
151         RefObject::IncObjRef(executor_);
152         executor_->ReceiveMessage(deviceId, inMsg);
153         RefObject::DecObjRef(executor_);
154         return E_OK;
155     }
156 
157     RefObject::IncObjRef(context_);
158     RefObject::IncObjRef(communicateHandle_);
159     SyncTaskContext *context = context_;
160     ICommunicator *communicateHandle = communicateHandle_;
161     std::thread thread([context, communicateHandle, inMsg]() {
162         int errCode = context->ReceiveMessageCallback(inMsg);
163         if (errCode != -E_NOT_NEED_DELETE_MSG) {
164             delete inMsg;
165         }
166         RefObject::DecObjRef(context);
167         RefObject::DecObjRef(communicateHandle);
168     });
169     thread.detach();
170     return E_OK;
171 }
172 
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)173 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
174 {
175     onRemoteDataChanged_ = callback;
176 }
177 
Online()178 void GenericVirtualDevice::Online()
179 {
180     static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
181     communicatorAggregator_->OnlineDevice(deviceId_);
182 }
183 
Offline()184 void GenericVirtualDevice::Offline()
185 {
186     static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
187     communicatorAggregator_->OfflineDevice(deviceId_);
188 }
189 
StartResponseTask()190 int GenericVirtualDevice::StartResponseTask()
191 {
192     LOGD("[KvVirtualDevice] StartResponseTask");
193     RefObject::AutoLock lockGuard(context_);
194     int status = context_->GetTaskExecStatus();
195     if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
196         LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
197         return -E_NOT_SUPPORT;
198     }
199     if (context_->IsTargetQueueEmpty()) {
200         LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
201         return E_OK;
202     }
203     context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
204     context_->MoveToNextTarget(DBConstant::MIN_TIMEOUT);
205     LOGI("[KvVirtualDevice] machine StartSync");
206     context_->UnlockObj();
207     int errCode = context_->StartStateMachine();
208     context_->LockObj();
209     if (errCode != E_OK) {
210         LOGE("[KvVirtualDevice] machine StartSync failed");
211         context_->SetOperationStatus(SyncOperation::OP_FAILED);
212     }
213     return errCode;
214 }
215 
GetLocalTimeOffset() const216 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
217 {
218     return metadata_->GetLocalTimeOffset();
219 }
220 
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & processMap,const DeviceSyncProcessCallback & onProcess) const221 void GenericVirtualDevice::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
222     const DeviceSyncProcessCallback &onProcess) const
223 {
224     std::map<std::string, DeviceSyncProcess> result;
225     for (const auto &pair : processMap) {
226         DeviceSyncProcess info = pair.second;
227         int status = info.errCode;
228         info.errCode = SyncOperation::DBStatusTrans(status);
229         info.process = SyncOperation::DBStatusTransProcess(status);
230         result.insert(std::pair<std::string, DeviceSyncProcess>(pair.first, info));
231     }
232     if (onProcess) {
233         onProcess(result);
234     }
235 }
236 
Sync(const DeviceSyncOption & option,const DeviceSyncProcessCallback & onProcess)237 int GenericVirtualDevice::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
238 {
239     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, option.mode, nullptr, option.isWait);
240     if (operation == nullptr) {
241         return -E_OUT_OF_MEMORY;
242     }
243     DeviceSyncProcessCallback onSyncProcess = std::bind(&GenericVirtualDevice::OnDeviceSyncProcess, this,
244         std::placeholders::_1, onProcess);
245     operation->SetSyncProcessCallFun(onSyncProcess);
246     operation->Initialize();
247     operation->SetOnSyncFinished([operation](int id) {
248         operation->NotifyIfNeed();
249     });
250     int errCode = E_OK;
251     if (option.isQuery) {
252         QuerySyncObject querySyncObject(option.query);
253         errCode = querySyncObject.Init();
254         if (errCode != E_OK) {
255             return errCode;
256         }
257         operation->SetQuery(querySyncObject);
258     }
259     errCode = context_->AddSyncOperation(operation);
260     operation->WaitIfNeed();
261     RefObject::KillAndDecObjRef(operation);
262     return errCode;
263 }
264 
Sync(SyncMode mode,bool wait)265 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
266 {
267     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
268     if (operation == nullptr) {
269         return -E_OUT_OF_MEMORY;
270     }
271     operation->Initialize();
272     operation->SetOnSyncFinished([operation](int id) {
273         operation->NotifyIfNeed();
274     });
275     context_->AddSyncOperation(operation);
276     operation->WaitIfNeed();
277     RefObject::KillAndDecObjRef(operation);
278     return E_OK;
279 }
280 
Sync(SyncMode mode,const Query & query,bool wait)281 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
282 {
283     return Sync(mode, query, nullptr, wait);
284 }
285 
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)286 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
287     const SyncOperation::UserCallback &callBack, bool wait)
288 {
289     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
290     if (operation == nullptr) {
291         return -E_OUT_OF_MEMORY;
292     }
293     operation->Initialize();
294     operation->SetOnSyncFinished([operation](int id) {
295         operation->NotifyIfNeed();
296     });
297     QuerySyncObject querySyncObject(query);
298     int errCode = querySyncObject.Init();
299     if (errCode != E_OK) {
300         return errCode;
301     }
302     operation->SetQuery(querySyncObject);
303     context_->AddSyncOperation(operation);
304     operation->WaitIfNeed();
305     RefObject::KillAndDecObjRef(operation);
306     return errCode;
307 }
308 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)309 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
310     uint64_t timeout, std::shared_ptr<ResultSet> &result)
311 {
312     if (executor_ == nullptr) {
313         result = nullptr;
314         return TransferDBErrno(-E_BUSY);
315     }
316     int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
317     if (errCode != E_OK) {
318         result = nullptr;
319     }
320     return TransferDBErrno(errCode);
321 }
322 
SetClearRemoteStaleData(bool isStaleData)323 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
324 {
325     if (context_ != nullptr) {
326         static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
327         LOGD("set clear remote stale data mark");
328     }
329 }
330 } // DistributedDB
331