1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "adapter_stub.h"
17 #include <memory>
18 #include "db_errno.h"
19 #include "endian_convert.h"
20 #include "frame_header.h"
21 #include "iprocess_communicator.h"
22 #include "log_print.h"
23 #include "distributeddb_communicator_common.h"
24 #include "process_communicator_test_stub.h"
25
26 using namespace DistributedDB;
27
28 namespace {
29 const uint32_t STUB_MTU_SIZE = 5 * 1024 * 1024; // 5 M, 1024 is scale
30 const uint32_t STUB_TIME_OUT = 5 * 1000; // 5 S, 1000 is scale
31 }
32
33 /*
34 * Override Part
35 */
~AdapterStub()36 AdapterStub::~AdapterStub()
37 {
38 // Do nothing
39 }
40
StartAdapter()41 int AdapterStub::StartAdapter()
42 {
43 return E_OK;
44 }
45
StopAdapter()46 void AdapterStub::StopAdapter()
47 {
48 // Do nothing
49 }
50
GetMtuSize()51 uint32_t AdapterStub::GetMtuSize()
52 {
53 return STUB_MTU_SIZE;
54 }
55
GetMtuSize(const std::string & target)56 uint32_t AdapterStub::GetMtuSize(const std::string &target)
57 {
58 (void)target;
59 return GetMtuSize();
60 }
61
GetTimeout()62 uint32_t AdapterStub::GetTimeout()
63 {
64 return STUB_TIME_OUT;
65 }
66
GetTimeout(const std::string & target)67 uint32_t AdapterStub::GetTimeout(const std::string &target)
68 {
69 (void)target;
70 return GetTimeout();
71 }
72
GetLocalIdentity(std::string & outTarget)73 int AdapterStub::GetLocalIdentity(std::string &outTarget)
74 {
75 outTarget = localTarget_;
76 return E_OK;
77 }
78
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)79 int AdapterStub::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
80 {
81 LOGI("[UT][Stub][Send] Send length=%" PRIu32 " to dstTarget=%s begin.", length, dstTarget.c_str());
82 ApplySendBlock();
83
84 (void)totalLength;
85 {
86 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
87 if (onSendBytes_) {
88 int errCode = onSendBytes_();
89 if (errCode != E_OK) {
90 LOGI("[UT][Stub][Send] failed for %s errCode %d.", dstTarget.c_str(), errCode);
91 return errCode;
92 }
93 }
94 }
95
96 if (QuerySendRetry(dstTarget)) {
97 LOGI("[UT][Stub][Send] Retry for %s true.", dstTarget.c_str());
98 return -E_WAIT_RETRY;
99 }
100
101 if (QuerySendTotalLoss()) {
102 LOGI("[UT][Stub][Send] Total loss for %s true.", dstTarget.c_str());
103 return E_OK;
104 }
105
106 if (QuerySendPartialLoss()) {
107 LOGI("[UT][Stub][Send] Partial loss for %s true.", dstTarget.c_str());
108 return E_OK;
109 }
110
111 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
112 if (targetMapAdapter_.count(dstTarget) == 0) {
113 LOGI("[UT][Stub][Send] dstTarget=%s not found.", dstTarget.c_str());
114 return -E_NOT_FOUND;
115 }
116
117 ApplySendBitError(bytes, length);
118
119 AdapterStub *toAdapter = targetMapAdapter_[dstTarget];
120 toAdapter->DeliverBytes(localTarget_, bytes, length);
121 LOGI("[UT][Stub][Send] Send to dstTarget=%s end.", dstTarget.c_str());
122 return E_OK;
123 }
124
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)125 int AdapterStub::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
126 {
127 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
128 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
129 }
130
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)131 int AdapterStub::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
132 {
133 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
134 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
135 }
136
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)137 int AdapterStub::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
138 {
139 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
140 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
141 }
142
143
IsDeviceOnline(const std::string & device)144 bool AdapterStub::IsDeviceOnline(const std::string &device)
145 {
146 (void)device;
147 return true;
148 }
149
GetExtendHeaderHandle(const ExtendInfo & paramInfo)150 std::shared_ptr<ExtendHeaderHandle> AdapterStub::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
151 {
152 std::shared_ptr<ExtendHeaderHandle> handle = std::make_shared<ExtendHeaderHandleTest>(paramInfo);
153 return handle;
154 }
155 /*
156 * Extended Part
157 */
ConnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)158 void AdapterStub::ConnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
159 {
160 LOGI("[UT][Stub][ConnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
161 thatStub->GetLocalTarget().c_str());
162 thisStub->Connect(thatStub);
163 thatStub->Connect(thisStub);
164 }
165
DisconnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)166 void AdapterStub::DisconnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
167 {
168 LOGI("[UT][Stub][DisconnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
169 thatStub->GetLocalTarget().c_str());
170 thisStub->Disconnect(thatStub);
171 thatStub->Disconnect(thisStub);
172 }
173
AdapterStub(const std::string & inLocalTarget)174 AdapterStub::AdapterStub(const std::string &inLocalTarget)
175 : localTarget_(inLocalTarget)
176 {
177 }
178
GetLocalTarget()179 const std::string &AdapterStub::GetLocalTarget()
180 {
181 return localTarget_;
182 }
183
Connect(AdapterStub * inStub)184 void AdapterStub::Connect(AdapterStub *inStub)
185 {
186 LOGI("[UT][Stub][Connect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
187 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
188 bool isOnlineBefore = targetMapAdapter_.find(inStub->GetLocalTarget()) != targetMapAdapter_.end();
189 targetMapAdapter_[inStub->GetLocalTarget()] = inStub;
190 if (!isOnlineBefore && onChangeHandle_) {
191 onChangeHandle_(inStub->GetLocalTarget(), true);
192 }
193 }
194
Disconnect(AdapterStub * inStub)195 void AdapterStub::Disconnect(AdapterStub *inStub)
196 {
197 LOGI("[UT][Stub][Disconnect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
198 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
199 targetMapAdapter_.erase(inStub->GetLocalTarget());
200 if (onChangeHandle_) {
201 onChangeHandle_(inStub->GetLocalTarget(), false);
202 }
203 }
204
DeliverBytes(const std::string & srcTarget,const uint8_t * bytes,uint32_t length)205 void AdapterStub::DeliverBytes(const std::string &srcTarget, const uint8_t *bytes, uint32_t length)
206 {
207 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
208 if (onReceiveHandle_) {
209 uint32_t headLength = 0;
210 GetDataHeadInfo(bytes, headLength);
211 std::vector<UserInfo> userInfos;
212 GetDataUserInfo(bytes, userInfos);
213 std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator =
214 std::make_shared<ProcessCommunicatorTestStub>();
215 processCommunicator->SetDataUserInfo(userInfos);
216 DataUserInfoProc userInfoProc = {bytes, length, processCommunicator};
217 onReceiveHandle_(srcTarget, bytes + headLength, length - headLength, userInfoProc);
218 }
219 }
220
GetDataHeadInfo(const uint8_t * data,uint32_t & headLength)221 void AdapterStub::GetDataHeadInfo(const uint8_t *data, uint32_t &headLength)
222 {
223 auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
224 NetToHost(info->magic);
225 if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
226 NetToHost(info->length);
227 NetToHost(info->version);
228 headLength = info->length;
229 } else {
230 headLength = 0;
231 }
232 }
233
GetDataUserInfo(const uint8_t * data,std::vector<UserInfo> & userInfos)234 void AdapterStub::GetDataUserInfo(const uint8_t *data, std::vector<UserInfo> &userInfos)
235 {
236 auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
237 NetToHost(info->magic);
238 if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
239 UserInfo userInfo;
240 for (uint8_t i = 0; i < BUFF_LEN; i++) {
241 if (info->userId[i] == 0) {
242 break;
243 }
244 userInfo.receiveUser.push_back(info->userId[i]);
245 }
246 userInfos.push_back(userInfo);
247 }
248 }
249
250 /*
251 * Simulate Part
252 */
SimulateSendBlock()253 void AdapterStub::SimulateSendBlock()
254 {
255 LOGI("[UT][Stub][Block] Before Lock.");
256 block_.lock();
257 LOGI("[UT][Stub][Block] After Lock.");
258 }
259
SimulateSendBlockClear()260 void AdapterStub::SimulateSendBlockClear()
261 {
262 LOGI("[UT][Stub][UnBlock] Before UnLock.");
263 block_.unlock();
264 LOGI("[UT][Stub][UnBlock] After UnLock.");
265 }
266
SimulateSendRetry(const std::string & dstTarget)267 void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
268 {
269 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
270 targetRetrySet_.insert(dstTarget);
271 }
272
SimulateSendRetryClear(const std::string & dstTarget,int deviceCommErrCode)273 void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget, int deviceCommErrCode)
274 {
275 {
276 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
277 if (targetRetrySet_.count(dstTarget) == 0) {
278 return;
279 }
280 targetRetrySet_.erase(dstTarget);
281 }
282 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
283 if (onSendableHandle_) {
284 onSendableHandle_(dstTarget, deviceCommErrCode);
285 }
286 }
287
SimulateTriggerSendableCallback(const std::string & dstTarget,int deviceCommErrCode)288 void AdapterStub::SimulateTriggerSendableCallback(const std::string &dstTarget, int deviceCommErrCode)
289 {
290 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
291 if (onSendableHandle_) {
292 onSendableHandle_(dstTarget, deviceCommErrCode);
293 }
294 }
295
SimulateSendPartialLoss()296 void AdapterStub::SimulateSendPartialLoss()
297 {
298 isPartialLossSimulated_ = true;
299 }
300
SimulateSendPartialLossClear()301 void AdapterStub::SimulateSendPartialLossClear()
302 {
303 isPartialLossSimulated_ = false;
304 }
305
SimulateSendTotalLoss()306 void AdapterStub::SimulateSendTotalLoss()
307 {
308 isTotalLossSimulated_ = true;
309 }
310
SimulateSendTotalLossClear()311 void AdapterStub::SimulateSendTotalLossClear()
312 {
313 isTotalLossSimulated_ = false;
314 }
315
SimulateSendBitErrorInMagicField(bool doFlag,uint16_t inMagic)316 void AdapterStub::SimulateSendBitErrorInMagicField(bool doFlag, uint16_t inMagic)
317 {
318 doChangeMagicFlag_ = doFlag;
319 magicField_ = inMagic;
320 }
321
SimulateSendBitErrorInVersionField(bool doFlag,uint16_t inVersion)322 void AdapterStub::SimulateSendBitErrorInVersionField(bool doFlag, uint16_t inVersion)
323 {
324 doChangeVersionFlag_ = doFlag;
325 versionField_ = inVersion;
326 }
327
SimulateSendBitErrorInCheckSumField(bool doFlag,uint64_t inCheckSum)328 void AdapterStub::SimulateSendBitErrorInCheckSumField(bool doFlag, uint64_t inCheckSum)
329 {
330 doChangeCheckSumFlag_ = doFlag;
331 checkSumField_ = inCheckSum;
332 }
333
SimulateSendBitErrorInPacketLenField(bool doFlag,uint32_t inPacketLen)334 void AdapterStub::SimulateSendBitErrorInPacketLenField(bool doFlag, uint32_t inPacketLen)
335 {
336 doChangePacketLenFlag_ = doFlag;
337 packetLenField_ = inPacketLen;
338 }
339
SimulateSendBitErrorInPacketTypeField(bool doFlag,uint8_t inPacketType)340 void AdapterStub::SimulateSendBitErrorInPacketTypeField(bool doFlag, uint8_t inPacketType)
341 {
342 doChangePacketTypeFlag_ = doFlag;
343 packetTypeField_ = inPacketType;
344 }
345
SimulateSendBitErrorInPaddingLenField(bool doFlag,uint8_t inPaddingLen)346 void AdapterStub::SimulateSendBitErrorInPaddingLenField(bool doFlag, uint8_t inPaddingLen)
347 {
348 doChangePaddingLenFlag_ = doFlag;
349 paddingLenField_ = inPaddingLen;
350 }
351
SimulateSendBitErrorInMessageIdField(bool doFlag,uint32_t inMessageId)352 void AdapterStub::SimulateSendBitErrorInMessageIdField(bool doFlag, uint32_t inMessageId)
353 {
354 doChangeMessageIdFlag_ = doFlag;
355 messageIdField_ = inMessageId;
356 }
357
ApplySendBlock()358 void AdapterStub::ApplySendBlock()
359 {
360 LOGI("[UT][Stub][ApplyBlock] Before Lock&UnLock.");
361 block_.lock();
362 block_.unlock();
363 LOGI("[UT][Stub][ApplyBlock] After Lock&UnLock.");
364 }
365
QuerySendRetry(const std::string & dstTarget)366 bool AdapterStub::QuerySendRetry(const std::string &dstTarget)
367 {
368 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
369 if (targetRetrySet_.count(dstTarget) == 0) {
370 return false;
371 } else {
372 return true;
373 }
374 }
375
QuerySendPartialLoss()376 bool AdapterStub::QuerySendPartialLoss()
377 {
378 if (isPartialLossSimulated_) {
379 uint64_t count = countForPartialLoss_.fetch_add(1, std::memory_order_seq_cst);
380 if (count % 2 == 0) { // 2 is half
381 return true;
382 }
383 }
384 return false;
385 }
386
QuerySendTotalLoss()387 bool AdapterStub::QuerySendTotalLoss()
388 {
389 return isTotalLossSimulated_;
390 }
391
ForkSendBytes(const DistributedDB::OnSendBytes & onSendBytes)392 void AdapterStub::ForkSendBytes(const DistributedDB::OnSendBytes &onSendBytes)
393 {
394 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
395 onSendBytes_ = onSendBytes;
396 }
397
398 namespace {
CalculateXorSum(const uint8_t * bytes,uint32_t length)399 uint64_t CalculateXorSum(const uint8_t *bytes, uint32_t length)
400 {
401 if (length % sizeof(uint64_t) != 0) {
402 return 0;
403 }
404 int count = static_cast<int>(length / static_cast<uint32_t>(sizeof(uint64_t)));
405 auto array = reinterpret_cast<const uint64_t *>(bytes);
406 uint64_t outSum = 0;
407 for (int i = 0; i < count; i++) {
408 outSum ^= array[i];
409 }
410 return outSum;
411 }
412 const uint32_t LENGTH_BEFORE_SUM_RANGE = sizeof(uint64_t) + sizeof(uint64_t);
413 }
414
ApplySendBitError(const uint8_t * bytes,uint32_t length)415 void AdapterStub::ApplySendBitError(const uint8_t *bytes, uint32_t length)
416 {
417 // Change field in CommPhyHeader
418 if (length < sizeof(CommPhyHeader)) {
419 return;
420 }
421 auto edibleBytes = const_cast<uint8_t *>(bytes);
422 auto phyHeader = reinterpret_cast<CommPhyHeader *>(edibleBytes);
423 if (doChangeMagicFlag_) {
424 phyHeader->magic = HostToNet(magicField_);
425 }
426 if (doChangeVersionFlag_) {
427 phyHeader->version = HostToNet(versionField_);
428 }
429 if (doChangeCheckSumFlag_) {
430 phyHeader->checkSum = HostToNet(checkSumField_);
431 }
432 if (doChangePacketLenFlag_) {
433 phyHeader->packetLen = HostToNet(packetLenField_);
434 }
435 if (doChangePacketTypeFlag_) {
436 phyHeader->packetType = HostToNet(packetTypeField_);
437 }
438 if (doChangePaddingLenFlag_) {
439 phyHeader->paddingLen = HostToNet(paddingLenField_);
440 }
441 // Change field in MessageHeader. Assumpt that no fragment
442 if (length < sizeof(CommPhyHeader) + sizeof(CommDivergeHeader) + sizeof(MessageHeader)) {
443 return;
444 }
445 edibleBytes += (sizeof(CommPhyHeader) + sizeof(CommDivergeHeader));
446 auto msgHeader = reinterpret_cast<MessageHeader *>(edibleBytes);
447 if (doChangeMessageIdFlag_) {
448 msgHeader->messageId = HostToNet(messageIdField_);
449 phyHeader->checkSum = HostToNet(CalculateXorSum(bytes + LENGTH_BEFORE_SUM_RANGE,
450 length - LENGTH_BEFORE_SUM_RANGE));
451 }
452 }