• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "adapter_stub.h"
17 #include <memory>
18 #include "db_errno.h"
19 #include "endian_convert.h"
20 #include "frame_header.h"
21 #include "iprocess_communicator.h"
22 #include "log_print.h"
23 #include "distributeddb_communicator_common.h"
24 #include "process_communicator_test_stub.h"
25 
26 using namespace DistributedDB;
27 
28 namespace {
29     const uint32_t STUB_MTU_SIZE = 5 * 1024 * 1024; // 5 M, 1024 is scale
30     const uint32_t STUB_TIME_OUT = 5 * 1000; // 5 S, 1000 is scale
31 }
32 
33 /*
34  * Override Part
35  */
~AdapterStub()36 AdapterStub::~AdapterStub()
37 {
38     // Do nothing
39 }
40 
StartAdapter()41 int AdapterStub::StartAdapter()
42 {
43     return E_OK;
44 }
45 
StopAdapter()46 void AdapterStub::StopAdapter()
47 {
48     // Do nothing
49 }
50 
GetMtuSize()51 uint32_t AdapterStub::GetMtuSize()
52 {
53     return STUB_MTU_SIZE;
54 }
55 
GetMtuSize(const std::string & target)56 uint32_t AdapterStub::GetMtuSize(const std::string &target)
57 {
58     (void)target;
59     return GetMtuSize();
60 }
61 
GetTimeout()62 uint32_t AdapterStub::GetTimeout()
63 {
64     return STUB_TIME_OUT;
65 }
66 
GetTimeout(const std::string & target)67 uint32_t AdapterStub::GetTimeout(const std::string &target)
68 {
69     (void)target;
70     return GetTimeout();
71 }
72 
GetLocalIdentity(std::string & outTarget)73 int AdapterStub::GetLocalIdentity(std::string &outTarget)
74 {
75     outTarget = localTarget_;
76     return E_OK;
77 }
78 
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)79 int AdapterStub::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
80 {
81     LOGI("[UT][Stub][Send] Send length=%" PRIu32 " to dstTarget=%s begin.", length, dstTarget.c_str());
82     ApplySendBlock();
83 
84     (void)totalLength;
85     {
86         std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
87         if (onSendBytes_) {
88             int errCode = onSendBytes_();
89             if (errCode != E_OK) {
90                 LOGI("[UT][Stub][Send] failed for %s errCode %d.", dstTarget.c_str(), errCode);
91                 return errCode;
92             }
93         }
94     }
95 
96     if (QuerySendRetry(dstTarget)) {
97         LOGI("[UT][Stub][Send] Retry for %s true.", dstTarget.c_str());
98         return -E_WAIT_RETRY;
99     }
100 
101     if (QuerySendTotalLoss()) {
102         LOGI("[UT][Stub][Send] Total loss for %s true.", dstTarget.c_str());
103         return E_OK;
104     }
105 
106     if (QuerySendPartialLoss()) {
107         LOGI("[UT][Stub][Send] Partial loss for %s true.", dstTarget.c_str());
108         return E_OK;
109     }
110 
111     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
112     if (targetMapAdapter_.count(dstTarget) == 0) {
113         LOGI("[UT][Stub][Send] dstTarget=%s not found.", dstTarget.c_str());
114         return -E_NOT_FOUND;
115     }
116 
117     ApplySendBitError(bytes, length);
118 
119     AdapterStub *toAdapter = targetMapAdapter_[dstTarget];
120     toAdapter->DeliverBytes(localTarget_, bytes, length);
121     LOGI("[UT][Stub][Send] Send to dstTarget=%s end.", dstTarget.c_str());
122     return E_OK;
123 }
124 
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)125 int AdapterStub::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
126 {
127     std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
128     return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
129 }
130 
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)131 int AdapterStub::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
132 {
133     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
134     return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
135 }
136 
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)137 int AdapterStub::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
138 {
139     std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
140     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
141 }
142 
143 
IsDeviceOnline(const std::string & device)144 bool AdapterStub::IsDeviceOnline(const std::string &device)
145 {
146     (void)device;
147     return true;
148 }
149 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)150 std::shared_ptr<ExtendHeaderHandle> AdapterStub::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
151 {
152     std::shared_ptr<ExtendHeaderHandle> handle = std::make_shared<ExtendHeaderHandleTest>(paramInfo);
153     return handle;
154 }
155 /*
156  * Extended Part
157  */
ConnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)158 void AdapterStub::ConnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
159 {
160     LOGI("[UT][Stub][ConnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
161         thatStub->GetLocalTarget().c_str());
162     thisStub->Connect(thatStub);
163     thatStub->Connect(thisStub);
164 }
165 
DisconnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)166 void AdapterStub::DisconnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
167 {
168     LOGI("[UT][Stub][DisconnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
169         thatStub->GetLocalTarget().c_str());
170     thisStub->Disconnect(thatStub);
171     thatStub->Disconnect(thisStub);
172 }
173 
AdapterStub(const std::string & inLocalTarget)174 AdapterStub::AdapterStub(const std::string &inLocalTarget)
175     : localTarget_(inLocalTarget)
176 {
177 }
178 
GetLocalTarget()179 const std::string &AdapterStub::GetLocalTarget()
180 {
181     return localTarget_;
182 }
183 
Connect(AdapterStub * inStub)184 void AdapterStub::Connect(AdapterStub *inStub)
185 {
186     LOGI("[UT][Stub][Connect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
187     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
188     bool isOnlineBefore = targetMapAdapter_.find(inStub->GetLocalTarget()) != targetMapAdapter_.end();
189     targetMapAdapter_[inStub->GetLocalTarget()] = inStub;
190     if (!isOnlineBefore && onChangeHandle_) {
191         onChangeHandle_(inStub->GetLocalTarget(), true);
192     }
193 }
194 
Disconnect(AdapterStub * inStub)195 void AdapterStub::Disconnect(AdapterStub *inStub)
196 {
197     LOGI("[UT][Stub][Disconnect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
198     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
199     targetMapAdapter_.erase(inStub->GetLocalTarget());
200     if (onChangeHandle_) {
201         onChangeHandle_(inStub->GetLocalTarget(), false);
202     }
203 }
204 
DeliverBytes(const std::string & srcTarget,const uint8_t * bytes,uint32_t length)205 void AdapterStub::DeliverBytes(const std::string &srcTarget, const uint8_t *bytes, uint32_t length)
206 {
207     std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
208     if (onReceiveHandle_) {
209         uint32_t headLength = 0;
210         GetDataHeadInfo(bytes, headLength);
211         std::vector<UserInfo> userInfos;
212         GetDataUserInfo(bytes, userInfos);
213         std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator =
214             std::make_shared<ProcessCommunicatorTestStub>();
215         processCommunicator->SetDataUserInfo(userInfos);
216         DataUserInfoProc userInfoProc = {bytes, length, processCommunicator};
217         onReceiveHandle_(srcTarget, bytes + headLength, length - headLength, userInfoProc);
218     }
219 }
220 
GetDataHeadInfo(const uint8_t * data,uint32_t & headLength)221 void AdapterStub::GetDataHeadInfo(const uint8_t *data, uint32_t &headLength)
222 {
223     auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
224     NetToHost(info->magic);
225     if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
226         NetToHost(info->length);
227         NetToHost(info->version);
228         headLength = info->length;
229     } else {
230         headLength = 0;
231     }
232 }
233 
GetDataUserInfo(const uint8_t * data,std::vector<UserInfo> & userInfos)234 void AdapterStub::GetDataUserInfo(const uint8_t *data, std::vector<UserInfo> &userInfos)
235 {
236     auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
237     NetToHost(info->magic);
238     if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
239         UserInfo userInfo;
240         for (uint8_t i = 0; i < BUFF_LEN; i++) {
241             if (info->userId[i] == 0) {
242                 break;
243             }
244             userInfo.receiveUser.push_back(info->userId[i]);
245         }
246         userInfos.push_back(userInfo);
247     }
248 }
249 
250 /*
251  * Simulate Part
252  */
SimulateSendBlock()253 void AdapterStub::SimulateSendBlock()
254 {
255     LOGI("[UT][Stub][Block] Before Lock.");
256     block_.lock();
257     LOGI("[UT][Stub][Block] After Lock.");
258 }
259 
SimulateSendBlockClear()260 void AdapterStub::SimulateSendBlockClear()
261 {
262     LOGI("[UT][Stub][UnBlock] Before UnLock.");
263     block_.unlock();
264     LOGI("[UT][Stub][UnBlock] After UnLock.");
265 }
266 
SimulateSendRetry(const std::string & dstTarget)267 void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
268 {
269     std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
270     targetRetrySet_.insert(dstTarget);
271 }
272 
SimulateSendRetryClear(const std::string & dstTarget,int deviceCommErrCode)273 void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget, int deviceCommErrCode)
274 {
275     {
276         std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
277         if (targetRetrySet_.count(dstTarget) == 0) {
278             return;
279         }
280         targetRetrySet_.erase(dstTarget);
281     }
282     std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
283     if (onSendableHandle_) {
284         onSendableHandle_(dstTarget, deviceCommErrCode);
285     }
286 }
287 
SimulateTriggerSendableCallback(const std::string & dstTarget,int deviceCommErrCode)288 void AdapterStub::SimulateTriggerSendableCallback(const std::string &dstTarget, int deviceCommErrCode)
289 {
290     std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
291     if (onSendableHandle_) {
292         onSendableHandle_(dstTarget, deviceCommErrCode);
293     }
294 }
295 
SimulateSendPartialLoss()296 void AdapterStub::SimulateSendPartialLoss()
297 {
298     isPartialLossSimulated_ = true;
299 }
300 
SimulateSendPartialLossClear()301 void AdapterStub::SimulateSendPartialLossClear()
302 {
303     isPartialLossSimulated_ = false;
304 }
305 
SimulateSendTotalLoss()306 void AdapterStub::SimulateSendTotalLoss()
307 {
308     isTotalLossSimulated_ = true;
309 }
310 
SimulateSendTotalLossClear()311 void AdapterStub::SimulateSendTotalLossClear()
312 {
313     isTotalLossSimulated_ = false;
314 }
315 
SimulateSendBitErrorInMagicField(bool doFlag,uint16_t inMagic)316 void AdapterStub::SimulateSendBitErrorInMagicField(bool doFlag, uint16_t inMagic)
317 {
318     doChangeMagicFlag_ = doFlag;
319     magicField_ = inMagic;
320 }
321 
SimulateSendBitErrorInVersionField(bool doFlag,uint16_t inVersion)322 void AdapterStub::SimulateSendBitErrorInVersionField(bool doFlag, uint16_t inVersion)
323 {
324     doChangeVersionFlag_ = doFlag;
325     versionField_ = inVersion;
326 }
327 
SimulateSendBitErrorInCheckSumField(bool doFlag,uint64_t inCheckSum)328 void AdapterStub::SimulateSendBitErrorInCheckSumField(bool doFlag, uint64_t inCheckSum)
329 {
330     doChangeCheckSumFlag_ = doFlag;
331     checkSumField_ = inCheckSum;
332 }
333 
SimulateSendBitErrorInPacketLenField(bool doFlag,uint32_t inPacketLen)334 void AdapterStub::SimulateSendBitErrorInPacketLenField(bool doFlag, uint32_t inPacketLen)
335 {
336     doChangePacketLenFlag_ = doFlag;
337     packetLenField_ = inPacketLen;
338 }
339 
SimulateSendBitErrorInPacketTypeField(bool doFlag,uint8_t inPacketType)340 void AdapterStub::SimulateSendBitErrorInPacketTypeField(bool doFlag, uint8_t inPacketType)
341 {
342     doChangePacketTypeFlag_ = doFlag;
343     packetTypeField_ = inPacketType;
344 }
345 
SimulateSendBitErrorInPaddingLenField(bool doFlag,uint8_t inPaddingLen)346 void AdapterStub::SimulateSendBitErrorInPaddingLenField(bool doFlag, uint8_t inPaddingLen)
347 {
348     doChangePaddingLenFlag_ = doFlag;
349     paddingLenField_ = inPaddingLen;
350 }
351 
SimulateSendBitErrorInMessageIdField(bool doFlag,uint32_t inMessageId)352 void AdapterStub::SimulateSendBitErrorInMessageIdField(bool doFlag, uint32_t inMessageId)
353 {
354     doChangeMessageIdFlag_ = doFlag;
355     messageIdField_ = inMessageId;
356 }
357 
ApplySendBlock()358 void AdapterStub::ApplySendBlock()
359 {
360     LOGI("[UT][Stub][ApplyBlock] Before Lock&UnLock.");
361     block_.lock();
362     block_.unlock();
363     LOGI("[UT][Stub][ApplyBlock] After Lock&UnLock.");
364 }
365 
QuerySendRetry(const std::string & dstTarget)366 bool AdapterStub::QuerySendRetry(const std::string &dstTarget)
367 {
368     std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
369     if (targetRetrySet_.count(dstTarget) == 0) {
370         return false;
371     } else {
372         return true;
373     }
374 }
375 
QuerySendPartialLoss()376 bool AdapterStub::QuerySendPartialLoss()
377 {
378     if (isPartialLossSimulated_) {
379         uint64_t count = countForPartialLoss_.fetch_add(1, std::memory_order_seq_cst);
380         if (count % 2 == 0) { // 2 is half
381             return true;
382         }
383     }
384     return false;
385 }
386 
QuerySendTotalLoss()387 bool AdapterStub::QuerySendTotalLoss()
388 {
389     return isTotalLossSimulated_;
390 }
391 
ForkSendBytes(const DistributedDB::OnSendBytes & onSendBytes)392 void AdapterStub::ForkSendBytes(const DistributedDB::OnSendBytes &onSendBytes)
393 {
394     std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
395     onSendBytes_ = onSendBytes;
396 }
397 
398 namespace {
CalculateXorSum(const uint8_t * bytes,uint32_t length)399 uint64_t CalculateXorSum(const uint8_t *bytes, uint32_t length)
400 {
401     if (length % sizeof(uint64_t) != 0) {
402         return 0;
403     }
404     int count = static_cast<int>(length / static_cast<uint32_t>(sizeof(uint64_t)));
405     auto array = reinterpret_cast<const uint64_t *>(bytes);
406     uint64_t outSum = 0;
407     for (int i = 0; i < count; i++) {
408         outSum ^= array[i];
409     }
410     return outSum;
411 }
412 const uint32_t LENGTH_BEFORE_SUM_RANGE = sizeof(uint64_t) + sizeof(uint64_t);
413 }
414 
ApplySendBitError(const uint8_t * bytes,uint32_t length)415 void AdapterStub::ApplySendBitError(const uint8_t *bytes, uint32_t length)
416 {
417     // Change field in CommPhyHeader
418     if (length < sizeof(CommPhyHeader)) {
419         return;
420     }
421     auto edibleBytes = const_cast<uint8_t *>(bytes);
422     auto phyHeader = reinterpret_cast<CommPhyHeader *>(edibleBytes);
423     if (doChangeMagicFlag_) {
424         phyHeader->magic = HostToNet(magicField_);
425     }
426     if (doChangeVersionFlag_) {
427         phyHeader->version = HostToNet(versionField_);
428     }
429     if (doChangeCheckSumFlag_) {
430         phyHeader->checkSum = HostToNet(checkSumField_);
431     }
432     if (doChangePacketLenFlag_) {
433         phyHeader->packetLen = HostToNet(packetLenField_);
434     }
435     if (doChangePacketTypeFlag_) {
436         phyHeader->packetType = HostToNet(packetTypeField_);
437     }
438     if (doChangePaddingLenFlag_) {
439         phyHeader->paddingLen = HostToNet(paddingLenField_);
440     }
441     // Change field in MessageHeader. Assumpt that no fragment
442     if (length < sizeof(CommPhyHeader) + sizeof(CommDivergeHeader) + sizeof(MessageHeader)) {
443         return;
444     }
445     edibleBytes += (sizeof(CommPhyHeader) + sizeof(CommDivergeHeader));
446     auto msgHeader = reinterpret_cast<MessageHeader *>(edibleBytes);
447     if (doChangeMessageIdFlag_) {
448         msgHeader->messageId = HostToNet(messageIdField_);
449         phyHeader->checkSum = HostToNet(CalculateXorSum(bytes + LENGTH_BEFORE_SUM_RANGE,
450             length - LENGTH_BEFORE_SUM_RANGE));
451     }
452 }