1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "runtime_context.h"
21
22 namespace DistributedDB {
23 namespace {
24 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
25 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
26 }
27
NetworkAdapter()28 NetworkAdapter::NetworkAdapter()
29 : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
30 {
31 }
32
NetworkAdapter(const std::string & inProcessLabel)33 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
34 : processLabel_(inProcessLabel), processCommunicator_(nullptr)
35 {
36 }
37
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)38 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
39 const std::shared_ptr<IProcessCommunicator> &inCommunicator)
40 : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
41 {
42 }
43
~NetworkAdapter()44 NetworkAdapter::~NetworkAdapter()
45 {
46 }
47
StartAdapter()48 int NetworkAdapter::StartAdapter()
49 {
50 LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", processLabel_.c_str());
51 if (processLabel_.empty()) {
52 return -E_INVALID_ARGS;
53 }
54 if (!processCommunicator_) {
55 LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
56 return -E_INVALID_ARGS;
57 }
58 DBStatus errCode = processCommunicator_->Start(processLabel_);
59 if (errCode != DBStatus::OK) {
60 LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
61 return -E_PERIPHERAL_INTERFACE_FAIL;
62 }
63 errCode = processCommunicator_->RegOnDataReceive(std::bind(&NetworkAdapter::OnDataReceiveHandler, this,
64 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
65 if (errCode != DBStatus::OK) {
66 LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
67 // DO ROLLBACK
68 errCode = processCommunicator_->Stop();
69 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
70 return -E_PERIPHERAL_INTERFACE_FAIL;
71 }
72 errCode = processCommunicator_->RegOnDeviceChange(std::bind(&NetworkAdapter::OnDeviceChangeHandler, this,
73 std::placeholders::_1, std::placeholders::_2));
74 if (errCode != DBStatus::OK) {
75 LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
76 // DO ROLLBACK
77 errCode = processCommunicator_->RegOnDataReceive(nullptr);
78 LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
79 errCode = processCommunicator_->Stop();
80 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
81 return -E_PERIPHERAL_INTERFACE_FAIL;
82 }
83 // These code is compensation for the probable defect of IProcessCommunicator implementation.
84 // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
85 // OnDeviceChangeHandler is reused to check the existence of peer process.
86 // Since at this point, the CommunicatorAggregator had not been fully initialized,
87 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
88 SearchOnlineRemoteDeviceAtStartup();
89 LOGI("[NAdapt][Start] Exit.");
90 return E_OK;
91 }
92
93 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
94 // If StopAdapter is called, the StartAdapter must have been called successfully before,
95 // so processCommunicator_ won't be null
StopAdapter()96 void NetworkAdapter::StopAdapter()
97 {
98 LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
99 DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
100 if (errCode != DBStatus::OK) {
101 LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
102 }
103 errCode = processCommunicator_->RegOnDataReceive(nullptr);
104 if (errCode != DBStatus::OK) {
105 LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
106 }
107 errCode = processCommunicator_->Stop();
108 if (errCode != DBStatus::OK) {
109 LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
110 }
111 // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
112 // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
113 // The async task is dependent on this Object. we have to wait until all async task finished.
114 LOGI("[NAdapt][Stop] Wait all async task done.");
115 std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
116 asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this]{ return pendingAsyncTaskCount_ <= 0; });
117 LOGI("[NAdapt][Stop] Exit.");
118 }
119
120 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)121 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
122 {
123 if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
124 return DBConstant::MIN_MTU_SIZE;
125 } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
126 return DBConstant::MAX_MTU_SIZE;
127 } else {
128 return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
129 }
130 }
131
CheckAndAdjustTimeout(uint32_t inTimeout)132 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
133 {
134 if (inTimeout < DBConstant::MIN_TIMEOUT) {
135 return DBConstant::MIN_TIMEOUT;
136 } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
137 return DBConstant::MAX_TIMEOUT;
138 } else {
139 return inTimeout;
140 }
141 }
142 }
143
GetMtuSize()144 uint32_t NetworkAdapter::GetMtuSize()
145 {
146 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
147 if (!isMtuSizeValid_) {
148 mtuSize_ = processCommunicator_->GetMtuSize();
149 LOGI("[NAdapt][GetMtu] mtuSize=%u.", mtuSize_);
150 mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
151 isMtuSizeValid_ = true;
152 }
153 return mtuSize_;
154 }
155
GetMtuSize(const std::string & target)156 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
157 {
158 #ifndef OMIT_MTU_CACHE
159 DeviceInfos devInfo;
160 devInfo.identifier = target;
161 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
162 return CheckAndAdjustMtuSize(oriMtuSize);
163 #else
164 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
165 if (devMapMtuSize_.count(target) == 0) {
166 DeviceInfos devInfo;
167 devInfo.identifier = target;
168 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
169 LOGI("[NAdapt][GetMtu] mtuSize=%u of target=%s{private}.", oriMtuSize, target.c_str());
170 devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
171 }
172 return devMapMtuSize_[target];
173 #endif
174 }
175
GetTimeout()176 uint32_t NetworkAdapter::GetTimeout()
177 {
178 uint32_t timeout = processCommunicator_->GetTimeout();
179 LOGI("[NAdapt][GetTimeout] timeout_=%u ms.", timeout);
180 return CheckAndAdjustTimeout(timeout);
181 }
182
GetTimeout(const std::string & target)183 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
184 {
185 DeviceInfos devInfos;
186 devInfos.identifier = target;
187 uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
188 LOGI("[NAdapt][GetTimeout] timeout=%u ms of target=%s{private}.", timeout, target.c_str());
189 return CheckAndAdjustTimeout(timeout);
190 }
191
GetLocalIdentity(std::string & outTarget)192 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
193 {
194 std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
195 DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
196 if (devInfo.identifier.empty()) {
197 return -E_PERIPHERAL_INTERFACE_FAIL;
198 }
199 if (devInfo.identifier != localIdentity_) {
200 LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
201 }
202 localIdentity_ = devInfo.identifier;
203 outTarget = localIdentity_;
204 return E_OK;
205 }
206
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length)207 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length)
208 {
209 if (bytes == nullptr || length == 0) {
210 return -E_INVALID_ARGS;
211 }
212 LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u", dstTarget.c_str(), length);
213 DeviceInfos dstDevInfo;
214 dstDevInfo.identifier = dstTarget;
215 DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length);
216 if (errCode != DBStatus::OK) {
217 LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
218 // These code is compensation for the probable defect of IProcessCommunicator implementation.
219 // As described in the agreement, for the missed offline situation, we check if still online at send fail.
220 // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
221 // Since this thread is the sending_thread of the CommunicatorAggregator,
222 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
223 CheckDeviceOfflineAfterSendFail(dstDevInfo);
224 return -E_PERIPHERAL_INTERFACE_FAIL;
225 }
226 return E_OK;
227 }
228
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)229 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
230 {
231 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
232 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
233 }
234
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)235 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
236 {
237 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
238 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
239 }
240
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)241 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
242 {
243 std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
244 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
245 }
246
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)247 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
248 {
249 if (data == nullptr || length == 0) {
250 LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
251 return;
252 }
253 uint32_t headLength = 0;
254 std::vector<std::string> userId;
255 DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
256 LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
257 srcDevInfo.identifier.c_str(), headLength, length);
258 if (errCode == NO_PERMISSION) {
259 LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
260 return;
261 }
262 {
263 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
264 if (!onReceiveHandle_) {
265 LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
266 return;
267 }
268 std::string currentUserId;
269 if (userId.size() >= 1) {
270 currentUserId = userId[0];
271 }
272 onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
273 }
274 // These code is compensation for the probable defect of IProcessCommunicator implementation.
275 // As described in the agreement, for the missed online situation, we check the source dev when received.
276 // OnDeviceChangeHandler is reused to check the existence of peer process.
277 // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
278 CheckDeviceOnlineAfterReception(srcDevInfo);
279 }
280
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)281 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
282 {
283 LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
284 // These code is compensation for the probable defect of IProcessCommunicator implementation.
285 // As described in the agreement, for the mistake online situation, we check the existence of peer process.
286 // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
287 if (isOnline) {
288 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
289 LOGI("[NAdapt][OnDeviceChange] ######## Detect Not Really Online ########.");
290 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
291 onlineRemoteDev_.erase(devInfo.identifier);
292 return;
293 }
294 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
295 onlineRemoteDev_.insert(devInfo.identifier);
296 } else {
297 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
298 onlineRemoteDev_.erase(devInfo.identifier);
299 }
300 // End compensation, do callback.
301 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
302 if (!onChangeHandle_) {
303 LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
304 return;
305 }
306 onChangeHandle_(devInfo.identifier, isOnline);
307 }
308
SearchOnlineRemoteDeviceAtStartup()309 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
310 {
311 std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
312 LOGE("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
313 if (!onlineDev.empty()) {
314 pendingAsyncTaskCount_.fetch_add(1);
315 // Note: onlineDev should be captured by value (must not by reference)
316 TaskAction callbackTask = [onlineDev, this]() {
317 LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
318 std::string localIdentity;
319 GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
320 for (auto &entry : onlineDev) {
321 if (entry.identifier == localIdentity) {
322 LOGW("[NAdapt][SearchOnline] ######## Detect Local Device in Remote Device List ########.");
323 continue;
324 }
325 OnDeviceChangeHandler(entry, true);
326 }
327 pendingAsyncTaskCount_.fetch_sub(1);
328 asyncTaskDoneCv_.notify_all();
329 LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
330 };
331 // Use ScheduleQueuedTask to keep order
332 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
333 if (errCode != E_OK) {
334 LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
335 pendingAsyncTaskCount_.fetch_sub(1);
336 asyncTaskDoneCv_.notify_all();
337 }
338 }
339 }
340
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)341 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
342 {
343 bool isAlreadyOnline = true;
344 {
345 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
346 if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
347 isAlreadyOnline = false;
348 }
349 }
350
351 // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
352 if (!isAlreadyOnline) {
353 OnDeviceChangeHandler(devInfo, true);
354 }
355 }
356
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)357 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
358 {
359 // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
360 bool isAlreadyOffline = true;
361 {
362 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
363 if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
364 isAlreadyOffline = false;
365 }
366 }
367
368 // Seem online but send fail, we have to check whether still online
369 if (!isAlreadyOffline) {
370 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
371 LOGW("[NAdapt][CheckAfterSend] ######## Missed Offline Detected ########.");
372 {
373 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
374 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
375 onlineRemoteDev_.erase(devInfo.identifier);
376 }
377 pendingAsyncTaskCount_.fetch_add(1);
378 // Note: devInfo should be captured by value (must not by reference)
379 TaskAction callbackTask = [devInfo, this]() {
380 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
381 OnDeviceChangeHandler(devInfo, false);
382 pendingAsyncTaskCount_.fetch_sub(1);
383 asyncTaskDoneCv_.notify_all();
384 };
385 // Use ScheduleQueuedTask to keep order
386 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
387 if (errCode != E_OK) {
388 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
389 pendingAsyncTaskCount_.fetch_sub(1);
390 asyncTaskDoneCv_.notify_all();
391 }
392 }
393 }
394 }
395
IsDeviceOnline(const std::string & device)396 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
397 {
398 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
399 return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
400 }
401
GetExtendHeaderHandle(const ExtendInfo & paramInfo)402 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
403 {
404 return processCommunicator_->GetExtendHeaderHandle(paramInfo);
405 }
406 } // namespace DistributedDB
407