• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "runtime_context.h"
22 
23 namespace DistributedDB {
24 namespace {
25 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
26 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
27 }
28 
NetworkAdapter()29 NetworkAdapter::NetworkAdapter()
30     : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
31 {
32 }
33 
NetworkAdapter(const std::string & inProcessLabel)34 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
35     : processLabel_(inProcessLabel), processCommunicator_(nullptr)
36 {
37 }
38 
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)39 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
40     const std::shared_ptr<IProcessCommunicator> &inCommunicator)
41     : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
42 {
43 }
44 
~NetworkAdapter()45 NetworkAdapter::~NetworkAdapter()
46 {
47 }
48 
StartAdapter()49 int NetworkAdapter::StartAdapter()
50 {
51     LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", STR_MASK(DBCommon::TransferStringToHex(processLabel_)));
52     if (processLabel_.empty()) {
53         return -E_INVALID_ARGS;
54     }
55     if (!processCommunicator_) {
56         LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
57         return -E_INVALID_ARGS;
58     }
59     DBStatus errCode = processCommunicator_->Start(processLabel_);
60     if (errCode != DBStatus::OK) {
61         LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
62         return -E_PERIPHERAL_INTERFACE_FAIL;
63     }
64     errCode = processCommunicator_->RegOnDataReceive(std::bind(&NetworkAdapter::OnDataReceiveHandler, this,
65         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
66     if (errCode != DBStatus::OK) {
67         LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
68         // DO ROLLBACK
69         errCode = processCommunicator_->Stop();
70         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
71         return -E_PERIPHERAL_INTERFACE_FAIL;
72     }
73     errCode = processCommunicator_->RegOnDeviceChange(std::bind(&NetworkAdapter::OnDeviceChangeHandler, this,
74         std::placeholders::_1, std::placeholders::_2));
75     if (errCode != DBStatus::OK) {
76         LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
77         // DO ROLLBACK
78         errCode = processCommunicator_->RegOnDataReceive(nullptr);
79         LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
80         errCode = processCommunicator_->Stop();
81         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
82         return -E_PERIPHERAL_INTERFACE_FAIL;
83     }
84     // These code is compensation for the probable defect of IProcessCommunicator implementation.
85     // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
86     // OnDeviceChangeHandler is reused to check the existence of peer process.
87     // Since at this point, the CommunicatorAggregator had not been fully initialized,
88     // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
89     SearchOnlineRemoteDeviceAtStartup();
90     LOGI("[NAdapt][Start] Exit.");
91     return E_OK;
92 }
93 
94 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
95 // If StopAdapter is called, the StartAdapter must have been called successfully before,
96 // so processCommunicator_ won't be null
StopAdapter()97 void NetworkAdapter::StopAdapter()
98 {
99     LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
100     DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
101     if (errCode != DBStatus::OK) {
102         LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
103     }
104     errCode = processCommunicator_->RegOnDataReceive(nullptr);
105     if (errCode != DBStatus::OK) {
106         LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
107     }
108     errCode = processCommunicator_->Stop();
109     if (errCode != DBStatus::OK) {
110         LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
111     }
112     // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
113     // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
114     // The async task is dependent on this Object. we have to wait until all async task finished.
115     LOGI("[NAdapt][Stop] Wait all async task done.");
116     std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
117     asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this]{ return pendingAsyncTaskCount_ <= 0; });
118     LOGI("[NAdapt][Stop] Exit.");
119 }
120 
121 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)122 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
123 {
124     if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
125         return DBConstant::MIN_MTU_SIZE;
126     } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
127         return DBConstant::MAX_MTU_SIZE;
128     } else {
129         return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
130     }
131 }
132 
CheckAndAdjustTimeout(uint32_t inTimeout)133 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
134 {
135     if (inTimeout < DBConstant::MIN_TIMEOUT) {
136         return DBConstant::MIN_TIMEOUT;
137     } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
138         return DBConstant::MAX_TIMEOUT;
139     } else {
140         return inTimeout;
141     }
142 }
143 }
144 
GetMtuSize()145 uint32_t NetworkAdapter::GetMtuSize()
146 {
147     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
148     if (!isMtuSizeValid_) {
149         mtuSize_ = processCommunicator_->GetMtuSize();
150         LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 ".", mtuSize_);
151         mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
152         isMtuSizeValid_ = true;
153     }
154     return mtuSize_;
155 }
156 
GetMtuSize(const std::string & target)157 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
158 {
159 #ifndef OMIT_MTU_CACHE
160     DeviceInfos devInfo;
161     devInfo.identifier = target;
162     uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
163     return CheckAndAdjustMtuSize(oriMtuSize);
164 #else
165     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
166     if (devMapMtuSize_.count(target) == 0) {
167         DeviceInfos devInfo;
168         devInfo.identifier = target;
169         uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
170         LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 " of target=%s{private}.", oriMtuSize, target.c_str());
171         devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
172     }
173     return devMapMtuSize_[target];
174 #endif
175 }
176 
GetTimeout()177 uint32_t NetworkAdapter::GetTimeout()
178 {
179     uint32_t timeout = processCommunicator_->GetTimeout();
180     LOGD("[NAdapt][GetTimeout] timeout_=%" PRIu32 " ms.", timeout);
181     return CheckAndAdjustTimeout(timeout);
182 }
183 
GetTimeout(const std::string & target)184 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
185 {
186     DeviceInfos devInfos;
187     devInfos.identifier = target;
188     uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
189     LOGD("[NAdapt][GetTimeout] timeout=%" PRIu32 " ms of target=%s{private}.", timeout, target.c_str());
190     return CheckAndAdjustTimeout(timeout);
191 }
192 
GetLocalIdentity(std::string & outTarget)193 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
194 {
195     std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
196     DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
197     if (devInfo.identifier.empty()) {
198         return -E_PERIPHERAL_INTERFACE_FAIL;
199     }
200     if (devInfo.identifier != localIdentity_) {
201         LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
202     }
203     localIdentity_ = devInfo.identifier;
204     outTarget = localIdentity_;
205     return E_OK;
206 }
207 
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)208 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
209 {
210     if (bytes == nullptr || length == 0) {
211         return -E_INVALID_ARGS;
212     }
213     LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u", dstTarget.c_str(), length);
214     DeviceInfos dstDevInfo;
215     dstDevInfo.identifier = dstTarget;
216     DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length, totalLength);
217     if (errCode == DBStatus::RATE_LIMIT) {
218         LOGD("[NAdapt][SendBytes] rate limit!");
219         return -E_WAIT_RETRY;
220     }
221     if (errCode != DBStatus::OK) {
222         LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
223         // These code is compensation for the probable defect of IProcessCommunicator implementation.
224         // As described in the agreement, for the missed offline situation, we check if still online at send fail.
225         // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
226         // Since this thread is the sending_thread of the CommunicatorAggregator,
227         // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
228         CheckDeviceOfflineAfterSendFail(dstDevInfo);
229         return -E_PERIPHERAL_INTERFACE_FAIL;
230     }
231     return E_OK;
232 }
233 
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)234 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
235 {
236     std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
237     return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
238 }
239 
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)240 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
241 {
242     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
243     return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
244 }
245 
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)246 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
247 {
248     std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
249     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
250 }
251 
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)252 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
253 {
254     if (data == nullptr || length == 0) {
255         LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
256         return;
257     }
258     uint32_t headLength = 0;
259     std::vector<std::string> userId;
260     DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
261     LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
262         srcDevInfo.identifier.c_str(), headLength, length);
263     if (errCode == NO_PERMISSION) {
264         LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
265         return;
266     }
267     if (headLength >= length) {
268         LOGW("[NAdapt][OnDataRecv] head len is too big, drop packet");
269         return;
270     }
271     {
272         std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
273         if (!onReceiveHandle_) {
274             LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
275             return;
276         }
277         std::string currentUserId;
278         if (userId.size() >= 1) {
279             currentUserId = userId[0];
280         }
281         onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
282     }
283     // These code is compensation for the probable defect of IProcessCommunicator implementation.
284     // As described in the agreement, for the missed online situation, we check the source dev when received.
285     // OnDeviceChangeHandler is reused to check the existence of peer process.
286     // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
287     CheckDeviceOnlineAfterReception(srcDevInfo);
288 }
289 
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)290 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
291 {
292     LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
293     // These code is compensation for the probable defect of IProcessCommunicator implementation.
294     // As described in the agreement, for the mistake online situation, we check the existence of peer process.
295     // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
296     if (isOnline) {
297         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
298             LOGI("[NAdapt][OnDeviceChange] ######## Detect Not Really Online ########.");
299             std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
300             onlineRemoteDev_.erase(devInfo.identifier);
301             return;
302         }
303         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
304         onlineRemoteDev_.insert(devInfo.identifier);
305     } else {
306         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
307         onlineRemoteDev_.erase(devInfo.identifier);
308     }
309     // End compensation, do callback.
310     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
311     if (!onChangeHandle_) {
312         LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
313         return;
314     }
315     onChangeHandle_(devInfo.identifier, isOnline);
316 }
317 
SearchOnlineRemoteDeviceAtStartup()318 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
319 {
320     std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
321     LOGE("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
322     if (!onlineDev.empty()) {
323         pendingAsyncTaskCount_.fetch_add(1);
324         // Note: onlineDev should be captured by value (must not by reference)
325         TaskAction callbackTask = [onlineDev, this]() {
326             LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
327             std::string localIdentity;
328             GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
329             for (auto &entry : onlineDev) {
330                 if (entry.identifier == localIdentity) {
331                     LOGW("[NAdapt][SearchOnline] ######## Detect Local Device in Remote Device List ########.");
332                     continue;
333                 }
334                 OnDeviceChangeHandler(entry, true);
335             }
336             pendingAsyncTaskCount_.fetch_sub(1);
337             asyncTaskDoneCv_.notify_all();
338             LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
339         };
340         // Use ScheduleQueuedTask to keep order
341         int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
342         if (errCode != E_OK) {
343             LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
344             pendingAsyncTaskCount_.fetch_sub(1);
345             asyncTaskDoneCv_.notify_all();
346         }
347     }
348 }
349 
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)350 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
351 {
352     bool isAlreadyOnline = true;
353     {
354         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
355         if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
356             isAlreadyOnline = false;
357         }
358     }
359 
360     // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
361     if (!isAlreadyOnline) {
362         OnDeviceChangeHandler(devInfo, true);
363     }
364 }
365 
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)366 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
367 {
368     // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
369     bool isAlreadyOffline = true;
370     {
371         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
372         if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
373             isAlreadyOffline = false;
374         }
375     }
376 
377     // Seem online but send fail, we have to check whether still online
378     if (!isAlreadyOffline) {
379         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
380             LOGW("[NAdapt][CheckAfterSend] ######## Missed Offline Detected ########.");
381             {
382                 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
383                 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
384                 onlineRemoteDev_.erase(devInfo.identifier);
385             }
386             pendingAsyncTaskCount_.fetch_add(1);
387             // Note: devInfo should be captured by value (must not by reference)
388             TaskAction callbackTask = [devInfo, this]() {
389                 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
390                 OnDeviceChangeHandler(devInfo, false);
391                 pendingAsyncTaskCount_.fetch_sub(1);
392                 asyncTaskDoneCv_.notify_all();
393             };
394             // Use ScheduleQueuedTask to keep order
395             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
396             if (errCode != E_OK) {
397                 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
398                 pendingAsyncTaskCount_.fetch_sub(1);
399                 asyncTaskDoneCv_.notify_all();
400             }
401         }
402     }
403 }
404 
IsDeviceOnline(const std::string & device)405 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
406 {
407     std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
408     return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
409 }
410 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)411 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
412 {
413     return processCommunicator_->GetExtendHeaderHandle(paramInfo);
414 }
415 } // namespace DistributedDB
416