• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "adapter_stub.h"
17 #include <memory>
18 #include "db_errno.h"
19 #include "endian_convert.h"
20 #include "frame_header.h"
21 #include "iprocess_communicator.h"
22 #include "log_print.h"
23 #include "distributeddb_communicator_common.h"
24 
25 using namespace DistributedDB;
26 
27 namespace {
28     const uint32_t STUB_MTU_SIZE = 5 * 1024 * 1024; // 5 M, 1024 is scale
29     const uint32_t STUB_TIME_OUT = 5 * 1000; // 5 S, 1000 is scale
30 }
31 
32 /*
33  * Override Part
34  */
~AdapterStub()35 AdapterStub::~AdapterStub()
36 {
37     // Do nothing
38 }
39 
StartAdapter()40 int AdapterStub::StartAdapter()
41 {
42     return E_OK;
43 }
44 
StopAdapter()45 void AdapterStub::StopAdapter()
46 {
47     // Do nothing
48 }
49 
GetMtuSize()50 uint32_t AdapterStub::GetMtuSize()
51 {
52     return STUB_MTU_SIZE;
53 }
54 
GetMtuSize(const std::string & target)55 uint32_t AdapterStub::GetMtuSize(const std::string &target)
56 {
57     (void)target;
58     return GetMtuSize();
59 }
60 
GetTimeout()61 uint32_t AdapterStub::GetTimeout()
62 {
63     return STUB_TIME_OUT;
64 }
65 
GetTimeout(const std::string & target)66 uint32_t AdapterStub::GetTimeout(const std::string &target)
67 {
68     (void)target;
69     return GetTimeout();
70 }
71 
GetLocalIdentity(std::string & outTarget)72 int AdapterStub::GetLocalIdentity(std::string &outTarget)
73 {
74     outTarget = localTarget_;
75     return E_OK;
76 }
77 
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length)78 int AdapterStub::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length)
79 {
80     LOGI("[UT][Stub][Send] Send length=%" PRIu32 " to dstTarget=%s begin.", length, dstTarget.c_str());
81     ApplySendBlock();
82 
83     if (QuerySendRetry(dstTarget)) {
84         LOGI("[UT][Stub][Send] Retry for %s true.", dstTarget.c_str());
85         return -E_WAIT_RETRY;
86     }
87 
88     if (QuerySendTotalLoss()) {
89         LOGI("[UT][Stub][Send] Total loss for %s true.", dstTarget.c_str());
90         return E_OK;
91     }
92 
93     if (QuerySendPartialLoss()) {
94         LOGI("[UT][Stub][Send] Partial loss for %s true.", dstTarget.c_str());
95         return E_OK;
96     }
97 
98     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
99     if (targetMapAdapter_.count(dstTarget) == 0) {
100         LOGI("[UT][Stub][Send] dstTarget=%s not found.", dstTarget.c_str());
101         return -E_NOT_FOUND;
102     }
103 
104     ApplySendBitError(bytes, length);
105 
106     AdapterStub *toAdapter = targetMapAdapter_[dstTarget];
107     toAdapter->DeliverBytes(localTarget_, bytes, length);
108     LOGI("[UT][Stub][Send] Send to dstTarget=%s end.", dstTarget.c_str());
109     return E_OK;
110 }
111 
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)112 int AdapterStub::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
113 {
114     std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
115     return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
116 }
117 
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)118 int AdapterStub::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
119 {
120     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
121     return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
122 }
123 
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)124 int AdapterStub::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
125 {
126     std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
127     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
128 }
129 
130 
IsDeviceOnline(const std::string & device)131 bool AdapterStub::IsDeviceOnline(const std::string &device)
132 {
133     (void)device;
134     return true;
135 }
136 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)137 std::shared_ptr<ExtendHeaderHandle> AdapterStub::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
138 {
139     std::shared_ptr<ExtendHeaderHandle> handle = std::make_shared<ExtendHeaderHandleTest>(paramInfo);
140     return handle;
141 }
142 /*
143  * Extended Part
144  */
ConnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)145 void AdapterStub::ConnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
146 {
147     LOGI("[UT][Stub][ConnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
148         thatStub->GetLocalTarget().c_str());
149     thisStub->Connect(thatStub);
150     thatStub->Connect(thisStub);
151 }
152 
DisconnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)153 void AdapterStub::DisconnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
154 {
155     LOGI("[UT][Stub][DisconnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
156         thatStub->GetLocalTarget().c_str());
157     thisStub->Disconnect(thatStub);
158     thatStub->Disconnect(thisStub);
159 }
160 
AdapterStub(const std::string & inLocalTarget)161 AdapterStub::AdapterStub(const std::string &inLocalTarget)
162     : localTarget_(inLocalTarget)
163 {
164 }
165 
GetLocalTarget()166 const std::string &AdapterStub::GetLocalTarget()
167 {
168     return localTarget_;
169 }
170 
Connect(AdapterStub * inStub)171 void AdapterStub::Connect(AdapterStub *inStub)
172 {
173     LOGI("[UT][Stub][Connect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
174     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
175     targetMapAdapter_[inStub->GetLocalTarget()] = inStub;
176     if (onChangeHandle_) {
177         onChangeHandle_(inStub->GetLocalTarget(), true);
178     }
179 }
180 
Disconnect(AdapterStub * inStub)181 void AdapterStub::Disconnect(AdapterStub *inStub)
182 {
183     LOGI("[UT][Stub][Disconnect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
184     std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
185     targetMapAdapter_.erase(inStub->GetLocalTarget());
186     if (onChangeHandle_) {
187         onChangeHandle_(inStub->GetLocalTarget(), false);
188     }
189 }
190 
DeliverBytes(const std::string & srcTarget,const uint8_t * bytes,uint32_t length)191 void AdapterStub::DeliverBytes(const std::string &srcTarget, const uint8_t *bytes, uint32_t length)
192 {
193     std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
194     if (onReceiveHandle_) {
195         uint32_t headLength = 0;
196         std::string userId;
197         CheckAndGetDataHeadInfo(bytes, length, headLength, userId);
198         onReceiveHandle_(srcTarget, bytes + headLength, length - headLength, userId);
199     }
200 }
201 
CheckAndGetDataHeadInfo(const uint8_t * data,uint32_t totalLen,uint32_t & headLength,std::string & userId)202 void AdapterStub::CheckAndGetDataHeadInfo(const uint8_t *data, uint32_t totalLen, uint32_t &headLength,
203     std::string &userId)
204 {
205     auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
206     NetToHost(info->magic);
207     if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
208         NetToHost(info->length);
209         NetToHost(info->version);
210         headLength = info->length;
211         std::string tmpUserId(BUFF_LEN, 0);
212         for (uint8_t i = 0; i < BUFF_LEN; i++) {
213             tmpUserId[i] = info->userId[i];
214         }
215         userId = tmpUserId;
216     } else {
217         headLength = 0;
218     }
219 }
220 
221 /*
222  * Simulate Part
223  */
SimulateSendBlock()224 void AdapterStub::SimulateSendBlock()
225 {
226     LOGI("[UT][Stub][Block] Before Lock.");
227     block_.lock();
228     LOGI("[UT][Stub][Block] After Lock.");
229 }
230 
SimulateSendBlockClear()231 void AdapterStub::SimulateSendBlockClear()
232 {
233     LOGI("[UT][Stub][UnBlock] Before UnLock.");
234     block_.unlock();
235     LOGI("[UT][Stub][UnBlock] After UnLock.");
236 }
237 
SimulateSendRetry(const std::string & dstTarget)238 void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
239 {
240     std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
241     targetRetrySet_.insert(dstTarget);
242 }
243 
SimulateSendRetryClear(const std::string & dstTarget)244 void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget)
245 {
246     std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
247     if (targetRetrySet_.count(dstTarget) == 0) {
248         return;
249     }
250     targetRetrySet_.erase(dstTarget);
251     std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
252     if (onSendableHandle_) {
253         onSendableHandle_(dstTarget);
254     }
255 }
256 
SimulateSendPartialLoss()257 void AdapterStub::SimulateSendPartialLoss()
258 {
259     isPartialLossSimulated_ = true;
260 }
261 
SimulateSendPartialLossClear()262 void AdapterStub::SimulateSendPartialLossClear()
263 {
264     isPartialLossSimulated_ = false;
265 }
266 
SimulateSendTotalLoss()267 void AdapterStub::SimulateSendTotalLoss()
268 {
269     isTotalLossSimulated_ = true;
270 }
271 
SimulateSendTotalLossClear()272 void AdapterStub::SimulateSendTotalLossClear()
273 {
274     isTotalLossSimulated_ = false;
275 }
276 
SimulateSendBitErrorInMagicField(bool doFlag,uint16_t inMagic)277 void AdapterStub::SimulateSendBitErrorInMagicField(bool doFlag, uint16_t inMagic)
278 {
279     doChangeMagicFlag_ = doFlag;
280     magicField_ = inMagic;
281 }
282 
SimulateSendBitErrorInVersionField(bool doFlag,uint16_t inVersion)283 void AdapterStub::SimulateSendBitErrorInVersionField(bool doFlag, uint16_t inVersion)
284 {
285     doChangeVersionFlag_ = doFlag;
286     versionField_ = inVersion;
287 }
288 
SimulateSendBitErrorInCheckSumField(bool doFlag,uint64_t inCheckSum)289 void AdapterStub::SimulateSendBitErrorInCheckSumField(bool doFlag, uint64_t inCheckSum)
290 {
291     doChangeCheckSumFlag_ = doFlag;
292     checkSumField_ = inCheckSum;
293 }
294 
SimulateSendBitErrorInPacketLenField(bool doFlag,uint32_t inPacketLen)295 void AdapterStub::SimulateSendBitErrorInPacketLenField(bool doFlag, uint32_t inPacketLen)
296 {
297     doChangePacketLenFlag_ = doFlag;
298     packetLenField_ = inPacketLen;
299 }
300 
SimulateSendBitErrorInPacketTypeField(bool doFlag,uint8_t inPacketType)301 void AdapterStub::SimulateSendBitErrorInPacketTypeField(bool doFlag, uint8_t inPacketType)
302 {
303     doChangePacketTypeFlag_ = doFlag;
304     packetTypeField_ = inPacketType;
305 }
306 
SimulateSendBitErrorInPaddingLenField(bool doFlag,uint8_t inPaddingLen)307 void AdapterStub::SimulateSendBitErrorInPaddingLenField(bool doFlag, uint8_t inPaddingLen)
308 {
309     doChangePaddingLenFlag_ = doFlag;
310     paddingLenField_ = inPaddingLen;
311 }
312 
SimulateSendBitErrorInMessageIdField(bool doFlag,uint32_t inMessageId)313 void AdapterStub::SimulateSendBitErrorInMessageIdField(bool doFlag, uint32_t inMessageId)
314 {
315     doChangeMessageIdFlag_ = doFlag;
316     messageIdField_ = inMessageId;
317 }
318 
ApplySendBlock()319 void AdapterStub::ApplySendBlock()
320 {
321     LOGI("[UT][Stub][ApplyBlock] Before Lock&UnLock.");
322     block_.lock();
323     block_.unlock();
324     LOGI("[UT][Stub][ApplyBlock] After Lock&UnLock.");
325 }
326 
QuerySendRetry(const std::string & dstTarget)327 bool AdapterStub::QuerySendRetry(const std::string &dstTarget)
328 {
329     std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
330     if (targetRetrySet_.count(dstTarget) == 0) {
331         return false;
332     } else {
333         return true;
334     }
335 }
336 
QuerySendPartialLoss()337 bool AdapterStub::QuerySendPartialLoss()
338 {
339     if (isPartialLossSimulated_) {
340         uint64_t count = countForPartialLoss_.fetch_add(1, std::memory_order_seq_cst);
341         if (count % 2 == 0) { // 2 is half
342             return true;
343         }
344     }
345     return false;
346 }
347 
QuerySendTotalLoss()348 bool AdapterStub::QuerySendTotalLoss()
349 {
350     return isTotalLossSimulated_;
351 }
352 
353 namespace {
CalculateXorSum(const uint8_t * bytes,uint32_t length)354 uint64_t CalculateXorSum(const uint8_t *bytes, uint32_t length)
355 {
356     if (length % sizeof(uint64_t) != 0) {
357         return 0;
358     }
359     int count = length / sizeof(uint64_t);
360     auto array = reinterpret_cast<const uint64_t *>(bytes);
361     uint64_t outSum = 0;
362     for (int i = 0; i < count; i++) {
363         outSum ^= array[i];
364     }
365     return outSum;
366 }
367 const uint32_t LENGTH_BEFORE_SUM_RANGE = sizeof(uint64_t) + sizeof(uint64_t);
368 }
369 
ApplySendBitError(const uint8_t * bytes,uint32_t length)370 void AdapterStub::ApplySendBitError(const uint8_t *bytes, uint32_t length)
371 {
372     // Change field in CommPhyHeader
373     if (length < sizeof(CommPhyHeader)) {
374         return;
375     }
376     auto edibleBytes = const_cast<uint8_t *>(bytes);
377     auto phyHeader = reinterpret_cast<CommPhyHeader *>(edibleBytes);
378     if (doChangeMagicFlag_) {
379         phyHeader->magic = HostToNet(magicField_);
380     }
381     if (doChangeVersionFlag_) {
382         phyHeader->version = HostToNet(versionField_);
383     }
384     if (doChangeCheckSumFlag_) {
385         phyHeader->checkSum = HostToNet(checkSumField_);
386     }
387     if (doChangePacketLenFlag_) {
388         phyHeader->packetLen = HostToNet(packetLenField_);
389     }
390     if (doChangePacketTypeFlag_) {
391         phyHeader->packetType = HostToNet(packetTypeField_);
392     }
393     if (doChangePaddingLenFlag_) {
394         phyHeader->paddingLen = HostToNet(paddingLenField_);
395     }
396     // Change field in MessageHeader. Assumpt that no fragment
397     if (length < sizeof(CommPhyHeader) + sizeof(CommDivergeHeader) + sizeof(MessageHeader)) {
398         return;
399     }
400     edibleBytes += (sizeof(CommPhyHeader) + sizeof(CommDivergeHeader));
401     auto msgHeader = reinterpret_cast<MessageHeader *>(edibleBytes);
402     if (doChangeMessageIdFlag_) {
403         msgHeader->messageId = HostToNet(messageIdField_);
404         phyHeader->checkSum = HostToNet(CalculateXorSum(bytes + LENGTH_BEFORE_SUM_RANGE,
405             length - LENGTH_BEFORE_SUM_RANGE));
406     }
407 }
408