1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "runtime_context.h"
22
23 namespace DistributedDB {
24 namespace {
25 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
26 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
27 }
28
NetworkAdapter()29 NetworkAdapter::NetworkAdapter()
30 : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
31 {
32 }
33
NetworkAdapter(const std::string & inProcessLabel)34 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
35 : processLabel_(inProcessLabel), processCommunicator_(nullptr)
36 {
37 }
38
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)39 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
40 const std::shared_ptr<IProcessCommunicator> &inCommunicator)
41 : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
42 {
43 }
44
~NetworkAdapter()45 NetworkAdapter::~NetworkAdapter()
46 {
47 }
48
StartAdapter()49 int NetworkAdapter::StartAdapter()
50 {
51 LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", STR_MASK(DBCommon::TransferStringToHex(processLabel_)));
52 if (processLabel_.empty()) {
53 return -E_INVALID_ARGS;
54 }
55 if (!processCommunicator_) {
56 LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
57 return -E_INVALID_ARGS;
58 }
59 DBStatus errCode = processCommunicator_->Start(processLabel_);
60 if (errCode != DBStatus::OK) {
61 LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
62 return -E_PERIPHERAL_INTERFACE_FAIL;
63 }
64 errCode = processCommunicator_->RegOnDataReceive(std::bind(&NetworkAdapter::OnDataReceiveHandler, this,
65 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
66 if (errCode != DBStatus::OK) {
67 LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
68 // DO ROLLBACK
69 errCode = processCommunicator_->Stop();
70 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
71 return -E_PERIPHERAL_INTERFACE_FAIL;
72 }
73 errCode = processCommunicator_->RegOnDeviceChange(std::bind(&NetworkAdapter::OnDeviceChangeHandler, this,
74 std::placeholders::_1, std::placeholders::_2));
75 if (errCode != DBStatus::OK) {
76 LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
77 // DO ROLLBACK
78 errCode = processCommunicator_->RegOnDataReceive(nullptr);
79 LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
80 errCode = processCommunicator_->Stop();
81 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
82 return -E_PERIPHERAL_INTERFACE_FAIL;
83 }
84 // These code is compensation for the probable defect of IProcessCommunicator implementation.
85 // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
86 // OnDeviceChangeHandler is reused to check the existence of peer process.
87 // Since at this point, the CommunicatorAggregator had not been fully initialized,
88 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
89 SearchOnlineRemoteDeviceAtStartup();
90 LOGI("[NAdapt][Start] Exit.");
91 return E_OK;
92 }
93
94 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
95 // If StopAdapter is called, the StartAdapter must have been called successfully before,
96 // so processCommunicator_ won't be null
StopAdapter()97 void NetworkAdapter::StopAdapter()
98 {
99 LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
100 DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
101 if (errCode != DBStatus::OK) {
102 LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
103 }
104 errCode = processCommunicator_->RegOnDataReceive(nullptr);
105 if (errCode != DBStatus::OK) {
106 LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
107 }
108 errCode = processCommunicator_->Stop();
109 if (errCode != DBStatus::OK) {
110 LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
111 }
112 // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
113 // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
114 // The async task is dependent on this Object. we have to wait until all async task finished.
115 LOGI("[NAdapt][Stop] Wait all async task done.");
116 std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
117 asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this]{ return pendingAsyncTaskCount_ <= 0; });
118 LOGI("[NAdapt][Stop] Exit.");
119 }
120
121 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)122 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
123 {
124 if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
125 return DBConstant::MIN_MTU_SIZE;
126 } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
127 return DBConstant::MAX_MTU_SIZE;
128 } else {
129 return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
130 }
131 }
132
CheckAndAdjustTimeout(uint32_t inTimeout)133 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
134 {
135 if (inTimeout < DBConstant::MIN_TIMEOUT) {
136 return DBConstant::MIN_TIMEOUT;
137 } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
138 return DBConstant::MAX_TIMEOUT;
139 } else {
140 return inTimeout;
141 }
142 }
143 }
144
GetMtuSize()145 uint32_t NetworkAdapter::GetMtuSize()
146 {
147 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
148 if (!isMtuSizeValid_) {
149 mtuSize_ = processCommunicator_->GetMtuSize();
150 LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 ".", mtuSize_);
151 mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
152 isMtuSizeValid_ = true;
153 }
154 return mtuSize_;
155 }
156
GetMtuSize(const std::string & target)157 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
158 {
159 #ifndef OMIT_MTU_CACHE
160 DeviceInfos devInfo;
161 devInfo.identifier = target;
162 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
163 return CheckAndAdjustMtuSize(oriMtuSize);
164 #else
165 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
166 if (devMapMtuSize_.count(target) == 0) {
167 DeviceInfos devInfo;
168 devInfo.identifier = target;
169 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
170 LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 " of target=%s{private}.", oriMtuSize, target.c_str());
171 devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
172 }
173 return devMapMtuSize_[target];
174 #endif
175 }
176
GetTimeout()177 uint32_t NetworkAdapter::GetTimeout()
178 {
179 uint32_t timeout = processCommunicator_->GetTimeout();
180 LOGD("[NAdapt][GetTimeout] timeout_=%" PRIu32 " ms.", timeout);
181 return CheckAndAdjustTimeout(timeout);
182 }
183
GetTimeout(const std::string & target)184 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
185 {
186 DeviceInfos devInfos;
187 devInfos.identifier = target;
188 uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
189 LOGD("[NAdapt][GetTimeout] timeout=%" PRIu32 " ms of target=%s{private}.", timeout, target.c_str());
190 return CheckAndAdjustTimeout(timeout);
191 }
192
GetLocalIdentity(std::string & outTarget)193 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
194 {
195 std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
196 DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
197 if (devInfo.identifier.empty()) {
198 return -E_PERIPHERAL_INTERFACE_FAIL;
199 }
200 if (devInfo.identifier != localIdentity_) {
201 LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
202 }
203 localIdentity_ = devInfo.identifier;
204 outTarget = localIdentity_;
205 return E_OK;
206 }
207
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)208 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
209 {
210 if (bytes == nullptr || length == 0) {
211 return -E_INVALID_ARGS;
212 }
213 LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u", dstTarget.c_str(), length);
214 DeviceInfos dstDevInfo;
215 dstDevInfo.identifier = dstTarget;
216 DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length, totalLength);
217 if (errCode == DBStatus::RATE_LIMIT) {
218 LOGD("[NAdapt][SendBytes] rate limit!");
219 return -E_WAIT_RETRY;
220 }
221 if (errCode != DBStatus::OK) {
222 LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
223 // These code is compensation for the probable defect of IProcessCommunicator implementation.
224 // As described in the agreement, for the missed offline situation, we check if still online at send fail.
225 // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
226 // Since this thread is the sending_thread of the CommunicatorAggregator,
227 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
228 CheckDeviceOfflineAfterSendFail(dstDevInfo);
229 return -E_PERIPHERAL_INTERFACE_FAIL;
230 }
231 return E_OK;
232 }
233
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)234 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
235 {
236 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
237 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
238 }
239
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)240 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
241 {
242 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
243 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
244 }
245
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)246 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
247 {
248 std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
249 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
250 }
251
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)252 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
253 {
254 if (data == nullptr || length == 0) {
255 LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
256 return;
257 }
258 uint32_t headLength = 0;
259 std::vector<std::string> userId;
260 DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
261 LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
262 srcDevInfo.identifier.c_str(), headLength, length);
263 if (errCode == NO_PERMISSION) {
264 LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
265 return;
266 }
267 if (headLength >= length) {
268 LOGW("[NAdapt][OnDataRecv] head len is too big, drop packet");
269 return;
270 }
271 {
272 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
273 if (!onReceiveHandle_) {
274 LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
275 return;
276 }
277 std::string currentUserId;
278 if (userId.size() >= 1) {
279 currentUserId = userId[0];
280 }
281 onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
282 }
283 // These code is compensation for the probable defect of IProcessCommunicator implementation.
284 // As described in the agreement, for the missed online situation, we check the source dev when received.
285 // OnDeviceChangeHandler is reused to check the existence of peer process.
286 // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
287 CheckDeviceOnlineAfterReception(srcDevInfo);
288 }
289
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)290 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
291 {
292 LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
293 // These code is compensation for the probable defect of IProcessCommunicator implementation.
294 // As described in the agreement, for the mistake online situation, we check the existence of peer process.
295 // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
296 if (isOnline) {
297 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
298 LOGI("[NAdapt][OnDeviceChange] ######## Detect Not Really Online ########.");
299 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
300 onlineRemoteDev_.erase(devInfo.identifier);
301 return;
302 }
303 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
304 onlineRemoteDev_.insert(devInfo.identifier);
305 } else {
306 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
307 onlineRemoteDev_.erase(devInfo.identifier);
308 }
309 // End compensation, do callback.
310 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
311 if (!onChangeHandle_) {
312 LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
313 return;
314 }
315 onChangeHandle_(devInfo.identifier, isOnline);
316 }
317
SearchOnlineRemoteDeviceAtStartup()318 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
319 {
320 std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
321 LOGE("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
322 if (!onlineDev.empty()) {
323 pendingAsyncTaskCount_.fetch_add(1);
324 // Note: onlineDev should be captured by value (must not by reference)
325 TaskAction callbackTask = [onlineDev, this]() {
326 LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
327 std::string localIdentity;
328 GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
329 for (auto &entry : onlineDev) {
330 if (entry.identifier == localIdentity) {
331 LOGW("[NAdapt][SearchOnline] ######## Detect Local Device in Remote Device List ########.");
332 continue;
333 }
334 OnDeviceChangeHandler(entry, true);
335 }
336 pendingAsyncTaskCount_.fetch_sub(1);
337 asyncTaskDoneCv_.notify_all();
338 LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
339 };
340 // Use ScheduleQueuedTask to keep order
341 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
342 if (errCode != E_OK) {
343 LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
344 pendingAsyncTaskCount_.fetch_sub(1);
345 asyncTaskDoneCv_.notify_all();
346 }
347 }
348 }
349
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)350 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
351 {
352 bool isAlreadyOnline = true;
353 {
354 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
355 if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
356 isAlreadyOnline = false;
357 }
358 }
359
360 // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
361 if (!isAlreadyOnline) {
362 OnDeviceChangeHandler(devInfo, true);
363 }
364 }
365
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)366 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
367 {
368 // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
369 bool isAlreadyOffline = true;
370 {
371 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
372 if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
373 isAlreadyOffline = false;
374 }
375 }
376
377 // Seem online but send fail, we have to check whether still online
378 if (!isAlreadyOffline) {
379 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
380 LOGW("[NAdapt][CheckAfterSend] ######## Missed Offline Detected ########.");
381 {
382 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
383 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
384 onlineRemoteDev_.erase(devInfo.identifier);
385 }
386 pendingAsyncTaskCount_.fetch_add(1);
387 // Note: devInfo should be captured by value (must not by reference)
388 TaskAction callbackTask = [devInfo, this]() {
389 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
390 OnDeviceChangeHandler(devInfo, false);
391 pendingAsyncTaskCount_.fetch_sub(1);
392 asyncTaskDoneCv_.notify_all();
393 };
394 // Use ScheduleQueuedTask to keep order
395 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
396 if (errCode != E_OK) {
397 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
398 pendingAsyncTaskCount_.fetch_sub(1);
399 asyncTaskDoneCv_.notify_all();
400 }
401 }
402 }
403 }
404
IsDeviceOnline(const std::string & device)405 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
406 {
407 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
408 return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
409 }
410
GetExtendHeaderHandle(const ExtendInfo & paramInfo)411 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
412 {
413 return processCommunicator_->GetExtendHeaderHandle(paramInfo);
414 }
415 } // namespace DistributedDB
416