• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "generic_virtual_device.h"
16 
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21 
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24     : communicateHandle_(nullptr),
25       communicatorAggregator_(nullptr),
26       storage_(nullptr),
27       metadata_(nullptr),
28       deviceId_(std::move(deviceId)),
29       remoteDeviceId_("real_device"),
30       context_(nullptr),
31       onRemoteDataChanged_(nullptr),
32       subManager_(nullptr),
33       executor_(nullptr)
34 {
35 }
36 
~GenericVirtualDevice()37 GenericVirtualDevice::~GenericVirtualDevice()
38 {
39     std::mutex cvMutex;
40     std::condition_variable cv;
41     bool finished = false;
42     Offline();
43 
44     if (communicateHandle_ != nullptr) {
45         communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
46         communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
47         communicateHandle_ = nullptr;
48     }
49     communicatorAggregator_ = nullptr;
50 
51     if (context_ != nullptr) {
52         ISyncInterface *storage = storage_;
53         context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
54             delete storage;
55             {
56                 std::lock_guard<std::mutex> lock(cvMutex);
57                 finished = true;
58             }
59             cv.notify_one();
60         });
61         RefObject::KillAndDecObjRef(context_);
62         std::unique_lock<std::mutex> lock(cvMutex);
63         cv.wait(lock, [&finished] { return finished; });
64     } else {
65         delete storage_;
66     }
67     context_ = nullptr;
68     metadata_ = nullptr;
69     storage_ = nullptr;
70     if (executor_ != nullptr) {
71         RefObject::KillAndDecObjRef(executor_);
72         executor_ = nullptr;
73     }
74 }
75 
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)76 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
77 {
78     if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
79         return -E_INVALID_ARGS;
80     }
81 
82     communicatorAggregator_ = comAggregator;
83     int errCode = E_OK;
84     communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
85     if (communicateHandle_ == nullptr) {
86         return errCode;
87     }
88 
89     storage_ = syncInterface;
90     metadata_ = std::make_shared<Metadata>();
91     if (metadata_->Initialize(storage_) != E_OK) {
92         LOGE("metadata_ init failed");
93         return -E_NOT_SUPPORT;
94     }
95     if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
96         context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
97         subManager_ = std::make_shared<SubscribeManager>();
98         static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
99     } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
100         context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
101     } else {
102         context_ = new (std::nothrow) MultiVerSyncTaskContext;
103     }
104     if (context_ == nullptr) {
105         return -E_OUT_OF_MEMORY;
106     }
107     communicateHandle_->RegOnMessageCallback(
108         std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
109     context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
110     context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
111     context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
112 
113     executor_ = new (std::nothrow) RemoteExecutor();
114     if (executor_ == nullptr) {
115         return -E_OUT_OF_MEMORY;
116     }
117     executor_->Initialize(syncInterface, communicateHandle_);
118     return E_OK;
119 }
120 
SetDeviceId(const std::string & deviceId)121 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
122 {
123     deviceId_ = deviceId;
124 }
125 
GetDeviceId() const126 std::string GenericVirtualDevice::GetDeviceId() const
127 {
128     return deviceId_;
129 }
130 
MessageCallback(const std::string & deviceId,Message * inMsg)131 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
132 {
133     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
134         if (onRemoteDataChanged_) {
135             onRemoteDataChanged_(deviceId);
136             delete inMsg;
137             inMsg = nullptr;
138             return E_OK;
139         }
140         delete inMsg;
141         inMsg = nullptr;
142         return -E_INVALID_ARGS;
143     }
144 
145     LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
146     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
147         RefObject::IncObjRef(executor_);
148         executor_->ReceiveMessage(deviceId, inMsg);
149         RefObject::DecObjRef(executor_);
150         return E_OK;
151     }
152 
153     RefObject::IncObjRef(context_);
154     RefObject::IncObjRef(communicateHandle_);
155     SyncTaskContext *context = context_;
156     ICommunicator *communicateHandle = communicateHandle_;
157     std::thread thread([context, communicateHandle, inMsg]() {
158         int errCode = context->ReceiveMessageCallback(inMsg);
159         if (errCode != -E_NOT_NEED_DELETE_MSG) {
160             delete inMsg;
161         }
162         RefObject::DecObjRef(context);
163         RefObject::DecObjRef(communicateHandle);
164     });
165     thread.detach();
166     return E_OK;
167 }
168 
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)169 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
170 {
171     onRemoteDataChanged_ = callback;
172 }
173 
Online()174 void GenericVirtualDevice::Online()
175 {
176     static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
177     communicatorAggregator_->OnlineDevice(deviceId_);
178 }
179 
Offline()180 void GenericVirtualDevice::Offline()
181 {
182     static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
183     communicatorAggregator_->OfflineDevice(deviceId_);
184 }
185 
StartResponseTask()186 int GenericVirtualDevice::StartResponseTask()
187 {
188     LOGD("[KvVirtualDevice] StartResponseTask");
189     RefObject::AutoLock lockGuard(context_);
190     int status = context_->GetTaskExecStatus();
191     if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
192         LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
193         return -E_NOT_SUPPORT;
194     }
195     if (context_->IsTargetQueueEmpty()) {
196         LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
197         return E_OK;
198     }
199     context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
200     context_->MoveToNextTarget();
201     LOGI("[KvVirtualDevice] machine StartSync");
202     context_->UnlockObj();
203     int errCode = context_->StartStateMachine();
204     context_->LockObj();
205     if (errCode != E_OK) {
206         LOGE("[KvVirtualDevice] machine StartSync failed");
207         context_->SetOperationStatus(SyncOperation::OP_FAILED);
208     }
209     return errCode;
210 }
211 
GetLocalTimeOffset() const212 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
213 {
214     return metadata_->GetLocalTimeOffset();
215 }
216 
Sync(SyncMode mode,bool wait)217 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
218 {
219     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
220     if (operation == nullptr) {
221         return -E_OUT_OF_MEMORY;
222     }
223     operation->Initialize();
224     operation->SetOnSyncFinished([operation](int id) {
225         operation->NotifyIfNeed();
226     });
227     context_->AddSyncOperation(operation);
228     operation->WaitIfNeed();
229     RefObject::KillAndDecObjRef(operation);
230     return E_OK;
231 }
232 
Sync(SyncMode mode,const Query & query,bool wait)233 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
234 {
235     return Sync(mode, query, nullptr, wait);
236 }
237 
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)238 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
239     const SyncOperation::UserCallback &callBack, bool wait)
240 {
241     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
242     if (operation == nullptr) {
243         return -E_OUT_OF_MEMORY;
244     }
245     operation->Initialize();
246     operation->SetOnSyncFinished([operation](int id) {
247         operation->NotifyIfNeed();
248     });
249     QuerySyncObject querySyncObject(query);
250     int errCode = querySyncObject.Init();
251     if (errCode != E_OK) {
252         return errCode;
253     }
254     operation->SetQuery(querySyncObject);
255     context_->AddSyncOperation(operation);
256     operation->WaitIfNeed();
257     RefObject::KillAndDecObjRef(operation);
258     return errCode;
259 }
260 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)261 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
262     uint64_t timeout, std::shared_ptr<ResultSet> &result)
263 {
264     if (executor_ == nullptr) {
265         result = nullptr;
266         return TransferDBErrno(-E_BUSY);
267     }
268     int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
269     if (errCode != E_OK) {
270         result = nullptr;
271     }
272     return TransferDBErrno(errCode);
273 }
274 
SetClearRemoteStaleData(bool isStaleData)275 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
276 {
277     if (context_ != nullptr) {
278         static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
279         LOGD("set clear remote stale data mark");
280     }
281 }
282 } // DistributedDB