• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "runtime_context.h"
21 
22 namespace DistributedDB {
23 namespace {
24 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
25 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
26 }
27 
NetworkAdapter()28 NetworkAdapter::NetworkAdapter()
29     : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
30 {
31 }
32 
NetworkAdapter(const std::string & inProcessLabel)33 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
34     : processLabel_(inProcessLabel), processCommunicator_(nullptr)
35 {
36 }
37 
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)38 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
39     const std::shared_ptr<IProcessCommunicator> &inCommunicator)
40     : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
41 {
42 }
43 
~NetworkAdapter()44 NetworkAdapter::~NetworkAdapter()
45 {
46 }
47 
StartAdapter()48 int NetworkAdapter::StartAdapter()
49 {
50     LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", processLabel_.c_str());
51     if (processLabel_.empty()) {
52         return -E_INVALID_ARGS;
53     }
54     if (!processCommunicator_) {
55         LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
56         return -E_INVALID_ARGS;
57     }
58     DBStatus errCode = processCommunicator_->Start(processLabel_);
59     if (errCode != DBStatus::OK) {
60         LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
61         return -E_PERIPHERAL_INTERFACE_FAIL;
62     }
63     errCode = processCommunicator_->RegOnDataReceive(std::bind(&NetworkAdapter::OnDataReceiveHandler, this,
64         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
65     if (errCode != DBStatus::OK) {
66         LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
67         // DO ROLLBACK
68         errCode = processCommunicator_->Stop();
69         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
70         return -E_PERIPHERAL_INTERFACE_FAIL;
71     }
72     errCode = processCommunicator_->RegOnDeviceChange(std::bind(&NetworkAdapter::OnDeviceChangeHandler, this,
73         std::placeholders::_1, std::placeholders::_2));
74     if (errCode != DBStatus::OK) {
75         LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
76         // DO ROLLBACK
77         errCode = processCommunicator_->RegOnDataReceive(nullptr);
78         LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
79         errCode = processCommunicator_->Stop();
80         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
81         return -E_PERIPHERAL_INTERFACE_FAIL;
82     }
83     // These code is compensation for the probable defect of IProcessCommunicator implementation.
84     // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
85     // OnDeviceChangeHandler is reused to check the existence of peer process.
86     // Since at this point, the CommunicatorAggregator had not been fully initialized,
87     // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
88     SearchOnlineRemoteDeviceAtStartup();
89     LOGI("[NAdapt][Start] Exit.");
90     return E_OK;
91 }
92 
93 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
94 // If StopAdapter is called, the StartAdapter must have been called successfully before,
95 // so processCommunicator_ won't be null
StopAdapter()96 void NetworkAdapter::StopAdapter()
97 {
98     LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
99     DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
100     if (errCode != DBStatus::OK) {
101         LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
102     }
103     errCode = processCommunicator_->RegOnDataReceive(nullptr);
104     if (errCode != DBStatus::OK) {
105         LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
106     }
107     errCode = processCommunicator_->Stop();
108     if (errCode != DBStatus::OK) {
109         LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
110     }
111     // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
112     // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
113     // The async task is dependent on this Object. we have to wait until all async task finished.
114     LOGI("[NAdapt][Stop] Wait all async task done.");
115     std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
116     asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this]{ return pendingAsyncTaskCount_ <= 0; });
117     LOGI("[NAdapt][Stop] Exit.");
118 }
119 
120 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)121 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
122 {
123     if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
124         return DBConstant::MIN_MTU_SIZE;
125     } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
126         return DBConstant::MAX_MTU_SIZE;
127     } else {
128         return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
129     }
130 }
131 
CheckAndAdjustTimeout(uint32_t inTimeout)132 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
133 {
134     if (inTimeout < DBConstant::MIN_TIMEOUT) {
135         return DBConstant::MIN_TIMEOUT;
136     } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
137         return DBConstant::MAX_TIMEOUT;
138     } else {
139         return inTimeout;
140     }
141 }
142 }
143 
GetMtuSize()144 uint32_t NetworkAdapter::GetMtuSize()
145 {
146     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
147     if (!isMtuSizeValid_) {
148         mtuSize_ = processCommunicator_->GetMtuSize();
149         LOGI("[NAdapt][GetMtu] mtuSize=%u.", mtuSize_);
150         mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
151         isMtuSizeValid_ = true;
152     }
153     return mtuSize_;
154 }
155 
GetMtuSize(const std::string & target)156 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
157 {
158 #ifndef OMIT_MTU_CACHE
159     DeviceInfos devInfo;
160     devInfo.identifier = target;
161     uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
162     return CheckAndAdjustMtuSize(oriMtuSize);
163 #else
164     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
165     if (devMapMtuSize_.count(target) == 0) {
166         DeviceInfos devInfo;
167         devInfo.identifier = target;
168         uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
169         LOGI("[NAdapt][GetMtu] mtuSize=%u of target=%s{private}.", oriMtuSize, target.c_str());
170         devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
171     }
172     return devMapMtuSize_[target];
173 #endif
174 }
175 
GetTimeout()176 uint32_t NetworkAdapter::GetTimeout()
177 {
178     uint32_t timeout = processCommunicator_->GetTimeout();
179     LOGI("[NAdapt][GetTimeout] timeout_=%u ms.", timeout);
180     return CheckAndAdjustTimeout(timeout);
181 }
182 
GetTimeout(const std::string & target)183 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
184 {
185     DeviceInfos devInfos;
186     devInfos.identifier = target;
187     uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
188     LOGI("[NAdapt][GetTimeout] timeout=%u ms of target=%s{private}.", timeout, target.c_str());
189     return CheckAndAdjustTimeout(timeout);
190 }
191 
GetLocalIdentity(std::string & outTarget)192 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
193 {
194     std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
195     DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
196     if (devInfo.identifier.empty()) {
197         return -E_PERIPHERAL_INTERFACE_FAIL;
198     }
199     if (devInfo.identifier != localIdentity_) {
200         LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
201     }
202     localIdentity_ = devInfo.identifier;
203     outTarget = localIdentity_;
204     return E_OK;
205 }
206 
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length)207 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length)
208 {
209     if (bytes == nullptr || length == 0) {
210         return -E_INVALID_ARGS;
211     }
212     LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u", dstTarget.c_str(), length);
213     DeviceInfos dstDevInfo;
214     dstDevInfo.identifier = dstTarget;
215     DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length);
216     if (errCode != DBStatus::OK) {
217         LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
218         // These code is compensation for the probable defect of IProcessCommunicator implementation.
219         // As described in the agreement, for the missed offline situation, we check if still online at send fail.
220         // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
221         // Since this thread is the sending_thread of the CommunicatorAggregator,
222         // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
223         CheckDeviceOfflineAfterSendFail(dstDevInfo);
224         return -E_PERIPHERAL_INTERFACE_FAIL;
225     }
226     return E_OK;
227 }
228 
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)229 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
230 {
231     std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
232     return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
233 }
234 
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)235 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
236 {
237     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
238     return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
239 }
240 
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)241 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
242 {
243     std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
244     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
245 }
246 
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)247 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
248 {
249     if (data == nullptr || length == 0) {
250         LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
251         return;
252     }
253     uint32_t headLength = 0;
254     std::vector<std::string> userId;
255     DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
256     LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
257         srcDevInfo.identifier.c_str(), headLength, length);
258     if (errCode == NO_PERMISSION) {
259         LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
260         return;
261     }
262     {
263         std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
264         if (!onReceiveHandle_) {
265             LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
266             return;
267         }
268         std::string currentUserId;
269         if (userId.size() >= 1) {
270             currentUserId = userId[0];
271         }
272         onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
273     }
274     // These code is compensation for the probable defect of IProcessCommunicator implementation.
275     // As described in the agreement, for the missed online situation, we check the source dev when received.
276     // OnDeviceChangeHandler is reused to check the existence of peer process.
277     // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
278     CheckDeviceOnlineAfterReception(srcDevInfo);
279 }
280 
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)281 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
282 {
283     LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
284     // These code is compensation for the probable defect of IProcessCommunicator implementation.
285     // As described in the agreement, for the mistake online situation, we check the existence of peer process.
286     // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
287     if (isOnline) {
288         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
289             LOGI("[NAdapt][OnDeviceChange] ######## Detect Not Really Online ########.");
290             std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
291             onlineRemoteDev_.erase(devInfo.identifier);
292             return;
293         }
294         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
295         onlineRemoteDev_.insert(devInfo.identifier);
296     } else {
297         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
298         onlineRemoteDev_.erase(devInfo.identifier);
299     }
300     // End compensation, do callback.
301     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
302     if (!onChangeHandle_) {
303         LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
304         return;
305     }
306     onChangeHandle_(devInfo.identifier, isOnline);
307 }
308 
SearchOnlineRemoteDeviceAtStartup()309 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
310 {
311     std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
312     LOGE("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
313     if (!onlineDev.empty()) {
314         pendingAsyncTaskCount_.fetch_add(1);
315         // Note: onlineDev should be captured by value (must not by reference)
316         TaskAction callbackTask = [onlineDev, this]() {
317             LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
318             std::string localIdentity;
319             GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
320             for (auto &entry : onlineDev) {
321                 if (entry.identifier == localIdentity) {
322                     LOGW("[NAdapt][SearchOnline] ######## Detect Local Device in Remote Device List ########.");
323                     continue;
324                 }
325                 OnDeviceChangeHandler(entry, true);
326             }
327             pendingAsyncTaskCount_.fetch_sub(1);
328             asyncTaskDoneCv_.notify_all();
329             LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
330         };
331         // Use ScheduleQueuedTask to keep order
332         int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
333         if (errCode != E_OK) {
334             LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
335             pendingAsyncTaskCount_.fetch_sub(1);
336             asyncTaskDoneCv_.notify_all();
337         }
338     }
339 }
340 
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)341 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
342 {
343     bool isAlreadyOnline = true;
344     {
345         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
346         if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
347             isAlreadyOnline = false;
348         }
349     }
350 
351     // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
352     if (!isAlreadyOnline) {
353         OnDeviceChangeHandler(devInfo, true);
354     }
355 }
356 
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)357 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
358 {
359     // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
360     bool isAlreadyOffline = true;
361     {
362         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
363         if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
364             isAlreadyOffline = false;
365         }
366     }
367 
368     // Seem online but send fail, we have to check whether still online
369     if (!isAlreadyOffline) {
370         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
371             LOGW("[NAdapt][CheckAfterSend] ######## Missed Offline Detected ########.");
372             {
373                 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
374                 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
375                 onlineRemoteDev_.erase(devInfo.identifier);
376             }
377             pendingAsyncTaskCount_.fetch_add(1);
378             // Note: devInfo should be captured by value (must not by reference)
379             TaskAction callbackTask = [devInfo, this]() {
380                 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
381                 OnDeviceChangeHandler(devInfo, false);
382                 pendingAsyncTaskCount_.fetch_sub(1);
383                 asyncTaskDoneCv_.notify_all();
384             };
385             // Use ScheduleQueuedTask to keep order
386             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
387             if (errCode != E_OK) {
388                 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
389                 pendingAsyncTaskCount_.fetch_sub(1);
390                 asyncTaskDoneCv_.notify_all();
391             }
392         }
393     }
394 }
395 
IsDeviceOnline(const std::string & device)396 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
397 {
398     std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
399     return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
400 }
401 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)402 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
403 {
404     return processCommunicator_->GetExtendHeaderHandle(paramInfo);
405 }
406 } // namespace DistributedDB
407