1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "adapter_stub.h"
17 #include <memory>
18 #include "db_errno.h"
19 #include "endian_convert.h"
20 #include "frame_header.h"
21 #include "iprocess_communicator.h"
22 #include "log_print.h"
23 #include "distributeddb_communicator_common.h"
24
25 using namespace DistributedDB;
26
27 namespace {
28 const uint32_t STUB_MTU_SIZE = 5 * 1024 * 1024; // 5 M, 1024 is scale
29 const uint32_t STUB_TIME_OUT = 5 * 1000; // 5 S, 1000 is scale
30 }
31
32 /*
33 * Override Part
34 */
~AdapterStub()35 AdapterStub::~AdapterStub()
36 {
37 // Do nothing
38 }
39
StartAdapter()40 int AdapterStub::StartAdapter()
41 {
42 return E_OK;
43 }
44
StopAdapter()45 void AdapterStub::StopAdapter()
46 {
47 // Do nothing
48 }
49
GetMtuSize()50 uint32_t AdapterStub::GetMtuSize()
51 {
52 return STUB_MTU_SIZE;
53 }
54
GetMtuSize(const std::string & target)55 uint32_t AdapterStub::GetMtuSize(const std::string &target)
56 {
57 (void)target;
58 return GetMtuSize();
59 }
60
GetTimeout()61 uint32_t AdapterStub::GetTimeout()
62 {
63 return STUB_TIME_OUT;
64 }
65
GetTimeout(const std::string & target)66 uint32_t AdapterStub::GetTimeout(const std::string &target)
67 {
68 (void)target;
69 return GetTimeout();
70 }
71
GetLocalIdentity(std::string & outTarget)72 int AdapterStub::GetLocalIdentity(std::string &outTarget)
73 {
74 outTarget = localTarget_;
75 return E_OK;
76 }
77
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)78 int AdapterStub::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
79 {
80 LOGI("[UT][Stub][Send] Send length=%" PRIu32 " to dstTarget=%s begin.", length, dstTarget.c_str());
81 ApplySendBlock();
82
83 (void)totalLength;
84 {
85 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
86 if (onSendBytes_) {
87 int errCode = onSendBytes_();
88 if (errCode != E_OK) {
89 LOGI("[UT][Stub][Send] failed for %s errCode %d.", dstTarget.c_str(), errCode);
90 return errCode;
91 }
92 }
93 }
94
95 if (QuerySendRetry(dstTarget)) {
96 LOGI("[UT][Stub][Send] Retry for %s true.", dstTarget.c_str());
97 return -E_WAIT_RETRY;
98 }
99
100 if (QuerySendTotalLoss()) {
101 LOGI("[UT][Stub][Send] Total loss for %s true.", dstTarget.c_str());
102 return E_OK;
103 }
104
105 if (QuerySendPartialLoss()) {
106 LOGI("[UT][Stub][Send] Partial loss for %s true.", dstTarget.c_str());
107 return E_OK;
108 }
109
110 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
111 if (targetMapAdapter_.count(dstTarget) == 0) {
112 LOGI("[UT][Stub][Send] dstTarget=%s not found.", dstTarget.c_str());
113 return -E_NOT_FOUND;
114 }
115
116 ApplySendBitError(bytes, length);
117
118 AdapterStub *toAdapter = targetMapAdapter_[dstTarget];
119 toAdapter->DeliverBytes(localTarget_, bytes, length);
120 LOGI("[UT][Stub][Send] Send to dstTarget=%s end.", dstTarget.c_str());
121 return E_OK;
122 }
123
SendBytes(const DeviceInfos & deviceInfos,const uint8_t * bytes,uint32_t length,uint32_t totalLength)124 int AdapterStub::SendBytes(const DeviceInfos &deviceInfos, const uint8_t *bytes, uint32_t length,
125 uint32_t totalLength)
126 {
127 return AdapterStub::SendBytes(deviceInfos.identifier, bytes, length, totalLength);
128 }
129
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)130 int AdapterStub::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
131 {
132 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
133 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
134 }
135
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)136 int AdapterStub::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
137 {
138 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
139 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
140 }
141
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)142 int AdapterStub::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
143 {
144 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
145 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
146 }
147
148
IsDeviceOnline(const std::string & device)149 bool AdapterStub::IsDeviceOnline(const std::string &device)
150 {
151 (void)device;
152 return true;
153 }
154
GetExtendHeaderHandle(const ExtendInfo & paramInfo)155 std::shared_ptr<ExtendHeaderHandle> AdapterStub::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
156 {
157 std::shared_ptr<ExtendHeaderHandle> handle = std::make_shared<ExtendHeaderHandleTest>(paramInfo);
158 return handle;
159 }
160 /*
161 * Extended Part
162 */
ConnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)163 void AdapterStub::ConnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
164 {
165 LOGI("[UT][Stub][ConnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
166 thatStub->GetLocalTarget().c_str());
167 thisStub->Connect(thatStub);
168 thatStub->Connect(thisStub);
169 }
170
DisconnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)171 void AdapterStub::DisconnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
172 {
173 LOGI("[UT][Stub][DisconnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
174 thatStub->GetLocalTarget().c_str());
175 thisStub->Disconnect(thatStub);
176 thatStub->Disconnect(thisStub);
177 }
178
AdapterStub(const std::string & inLocalTarget)179 AdapterStub::AdapterStub(const std::string &inLocalTarget)
180 : localTarget_(inLocalTarget)
181 {
182 }
183
GetLocalTarget()184 const std::string &AdapterStub::GetLocalTarget()
185 {
186 return localTarget_;
187 }
188
Connect(AdapterStub * inStub)189 void AdapterStub::Connect(AdapterStub *inStub)
190 {
191 LOGI("[UT][Stub][Connect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
192 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
193 bool isOnlineBefore = targetMapAdapter_.find(inStub->GetLocalTarget()) != targetMapAdapter_.end();
194 targetMapAdapter_[inStub->GetLocalTarget()] = inStub;
195 if (!isOnlineBefore && onChangeHandle_) {
196 onChangeHandle_(inStub->GetLocalTarget(), true);
197 }
198 }
199
Disconnect(AdapterStub * inStub)200 void AdapterStub::Disconnect(AdapterStub *inStub)
201 {
202 LOGI("[UT][Stub][Disconnect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
203 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
204 targetMapAdapter_.erase(inStub->GetLocalTarget());
205 if (onChangeHandle_) {
206 onChangeHandle_(inStub->GetLocalTarget(), false);
207 }
208 }
209
DeliverBytes(const std::string & srcTarget,const uint8_t * bytes,uint32_t length)210 void AdapterStub::DeliverBytes(const std::string &srcTarget, const uint8_t *bytes, uint32_t length)
211 {
212 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
213 if (onReceiveHandle_) {
214 uint32_t headLength = 0;
215 GetDataHeadInfo(bytes, headLength);
216 std::vector<UserInfo> userInfos;
217 if (userInfo_.empty()) {
218 GetDataUserInfo(bytes, userInfos);
219 } else {
220 userInfos = userInfo_;
221 }
222 if (processCommunicator_ == nullptr) {
223 processCommunicator_ = std::make_shared<ProcessCommunicatorTestStub>();
224 }
225 processCommunicator_->SetDataUserInfo(userInfos);
226 DataUserInfoProc userInfoProc = {bytes, length, processCommunicator_};
227 ReceiveBytesInfo receiveBytesInfo = {bytes + headLength, srcTarget, length - headLength, headLength != 0};
228 onReceiveHandle_(receiveBytesInfo, userInfoProc);
229 }
230 }
231
GetDataHeadInfo(const uint8_t * data,uint32_t & headLength)232 void AdapterStub::GetDataHeadInfo(const uint8_t *data, uint32_t &headLength)
233 {
234 auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
235 NetToHost(info->magic);
236 if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
237 NetToHost(info->length);
238 NetToHost(info->version);
239 headLength = info->length;
240 } else {
241 headLength = 0;
242 }
243 }
244
GetDataUserInfo(const uint8_t * data,std::vector<UserInfo> & userInfos)245 void AdapterStub::GetDataUserInfo(const uint8_t *data, std::vector<UserInfo> &userInfos)
246 {
247 auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
248 NetToHost(info->magic);
249 if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
250 UserInfo userInfo;
251 for (uint8_t i = 0; i < BUFF_LEN; i++) {
252 if (info->userId[i] == 0) {
253 break;
254 }
255 userInfo.receiveUser.push_back(info->userId[i]);
256 userInfo.sendUser.push_back(info->userId[i]);
257 }
258 userInfos.push_back(userInfo);
259 }
260 }
261
262 /*
263 * Simulate Part
264 */
SimulateSendBlock()265 void AdapterStub::SimulateSendBlock()
266 {
267 LOGI("[UT][Stub][Block] Before Lock.");
268 block_.lock();
269 LOGI("[UT][Stub][Block] After Lock.");
270 }
271
SimulateSendBlockClear()272 void AdapterStub::SimulateSendBlockClear()
273 {
274 LOGI("[UT][Stub][UnBlock] Before UnLock.");
275 block_.unlock();
276 LOGI("[UT][Stub][UnBlock] After UnLock.");
277 }
278
SimulateSendRetry(const std::string & dstTarget)279 void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
280 {
281 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
282 targetRetrySet_.insert(dstTarget);
283 }
284
SimulateSendRetryClear(const std::string & dstTarget,int deviceCommErrCode)285 void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget, int deviceCommErrCode)
286 {
287 {
288 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
289 if (targetRetrySet_.count(dstTarget) == 0) {
290 return;
291 }
292 targetRetrySet_.erase(dstTarget);
293 }
294 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
295 if (onSendableHandle_) {
296 onSendableHandle_(dstTarget, deviceCommErrCode);
297 }
298 }
299
SimulateTriggerSendableCallback(const std::string & dstTarget,int deviceCommErrCode)300 void AdapterStub::SimulateTriggerSendableCallback(const std::string &dstTarget, int deviceCommErrCode)
301 {
302 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
303 if (onSendableHandle_) {
304 onSendableHandle_(dstTarget, deviceCommErrCode);
305 }
306 }
307
SimulateSendPartialLoss()308 void AdapterStub::SimulateSendPartialLoss()
309 {
310 isPartialLossSimulated_ = true;
311 }
312
SimulateSendPartialLossClear()313 void AdapterStub::SimulateSendPartialLossClear()
314 {
315 isPartialLossSimulated_ = false;
316 }
317
SimulateSendTotalLoss()318 void AdapterStub::SimulateSendTotalLoss()
319 {
320 isTotalLossSimulated_ = true;
321 }
322
SimulateSendTotalLossClear()323 void AdapterStub::SimulateSendTotalLossClear()
324 {
325 isTotalLossSimulated_ = false;
326 }
327
SimulateSendBitErrorInMagicField(bool doFlag,uint16_t inMagic)328 void AdapterStub::SimulateSendBitErrorInMagicField(bool doFlag, uint16_t inMagic)
329 {
330 doChangeMagicFlag_ = doFlag;
331 magicField_ = inMagic;
332 }
333
SimulateSendBitErrorInVersionField(bool doFlag,uint16_t inVersion)334 void AdapterStub::SimulateSendBitErrorInVersionField(bool doFlag, uint16_t inVersion)
335 {
336 doChangeVersionFlag_ = doFlag;
337 versionField_ = inVersion;
338 }
339
SimulateSendBitErrorInCheckSumField(bool doFlag,uint64_t inCheckSum)340 void AdapterStub::SimulateSendBitErrorInCheckSumField(bool doFlag, uint64_t inCheckSum)
341 {
342 doChangeCheckSumFlag_ = doFlag;
343 checkSumField_ = inCheckSum;
344 }
345
SimulateSendBitErrorInPacketLenField(bool doFlag,uint32_t inPacketLen)346 void AdapterStub::SimulateSendBitErrorInPacketLenField(bool doFlag, uint32_t inPacketLen)
347 {
348 doChangePacketLenFlag_ = doFlag;
349 packetLenField_ = inPacketLen;
350 }
351
SimulateSendBitErrorInPacketTypeField(bool doFlag,uint8_t inPacketType)352 void AdapterStub::SimulateSendBitErrorInPacketTypeField(bool doFlag, uint8_t inPacketType)
353 {
354 doChangePacketTypeFlag_ = doFlag;
355 packetTypeField_ = inPacketType;
356 }
357
SimulateSendBitErrorInPaddingLenField(bool doFlag,uint8_t inPaddingLen)358 void AdapterStub::SimulateSendBitErrorInPaddingLenField(bool doFlag, uint8_t inPaddingLen)
359 {
360 doChangePaddingLenFlag_ = doFlag;
361 paddingLenField_ = inPaddingLen;
362 }
363
SimulateSendBitErrorInMessageIdField(bool doFlag,uint32_t inMessageId)364 void AdapterStub::SimulateSendBitErrorInMessageIdField(bool doFlag, uint32_t inMessageId)
365 {
366 doChangeMessageIdFlag_ = doFlag;
367 messageIdField_ = inMessageId;
368 }
369
ApplySendBlock()370 void AdapterStub::ApplySendBlock()
371 {
372 LOGI("[UT][Stub][ApplyBlock] Before Lock&UnLock.");
373 block_.lock();
374 block_.unlock();
375 LOGI("[UT][Stub][ApplyBlock] After Lock&UnLock.");
376 }
377
QuerySendRetry(const std::string & dstTarget)378 bool AdapterStub::QuerySendRetry(const std::string &dstTarget)
379 {
380 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
381 if (targetRetrySet_.count(dstTarget) == 0) {
382 return false;
383 } else {
384 return true;
385 }
386 }
387
QuerySendPartialLoss()388 bool AdapterStub::QuerySendPartialLoss()
389 {
390 if (isPartialLossSimulated_) {
391 uint64_t count = countForPartialLoss_.fetch_add(1, std::memory_order_seq_cst);
392 if (count % 2 == 0) { // 2 is half
393 return true;
394 }
395 }
396 return false;
397 }
398
QuerySendTotalLoss()399 bool AdapterStub::QuerySendTotalLoss()
400 {
401 return isTotalLossSimulated_;
402 }
403
ForkSendBytes(const DistributedDB::OnSendBytes & onSendBytes)404 void AdapterStub::ForkSendBytes(const DistributedDB::OnSendBytes &onSendBytes)
405 {
406 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
407 onSendBytes_ = onSendBytes;
408 }
409
410 namespace {
CalculateXorSum(const uint8_t * bytes,uint32_t length)411 uint64_t CalculateXorSum(const uint8_t *bytes, uint32_t length)
412 {
413 if (length % sizeof(uint64_t) != 0) {
414 return 0;
415 }
416 int count = static_cast<int>(length / static_cast<uint32_t>(sizeof(uint64_t)));
417 auto array = reinterpret_cast<const uint64_t *>(bytes);
418 uint64_t outSum = 0;
419 for (int i = 0; i < count; i++) {
420 outSum ^= array[i];
421 }
422 return outSum;
423 }
424 const uint32_t LENGTH_BEFORE_SUM_RANGE = sizeof(uint64_t) + sizeof(uint64_t);
425 }
426
ApplySendBitError(const uint8_t * bytes,uint32_t length)427 void AdapterStub::ApplySendBitError(const uint8_t *bytes, uint32_t length)
428 {
429 // Change field in CommPhyHeader
430 if (length < sizeof(CommPhyHeader)) {
431 return;
432 }
433 auto edibleBytes = const_cast<uint8_t *>(bytes);
434 auto phyHeader = reinterpret_cast<CommPhyHeader *>(edibleBytes);
435 if (doChangeMagicFlag_) {
436 phyHeader->magic = HostToNet(magicField_);
437 }
438 if (doChangeVersionFlag_) {
439 phyHeader->version = HostToNet(versionField_);
440 }
441 if (doChangeCheckSumFlag_) {
442 phyHeader->checkSum = HostToNet(checkSumField_);
443 }
444 if (doChangePacketLenFlag_) {
445 phyHeader->packetLen = HostToNet(packetLenField_);
446 }
447 if (doChangePacketTypeFlag_) {
448 phyHeader->packetType = HostToNet(packetTypeField_);
449 }
450 if (doChangePaddingLenFlag_) {
451 phyHeader->paddingLen = HostToNet(paddingLenField_);
452 }
453 // Change field in MessageHeader. Assumpt that no fragment
454 if (length < sizeof(CommPhyHeader) + sizeof(CommDivergeHeader) + sizeof(MessageHeader)) {
455 return;
456 }
457 edibleBytes += (sizeof(CommPhyHeader) + sizeof(CommDivergeHeader));
458 auto msgHeader = reinterpret_cast<MessageHeader *>(edibleBytes);
459 if (doChangeMessageIdFlag_) {
460 msgHeader->messageId = HostToNet(messageIdField_);
461 phyHeader->checkSum = HostToNet(CalculateXorSum(bytes + LENGTH_BEFORE_SUM_RANGE,
462 length - LENGTH_BEFORE_SUM_RANGE));
463 }
464 }
465
SetUserInfo(const std::vector<UserInfo> & userInfo)466 void AdapterStub::SetUserInfo(const std::vector<UserInfo> &userInfo)
467 {
468 userInfo_ = userInfo;
469 }
470
GetUserInfo()471 std::vector<UserInfo> AdapterStub::GetUserInfo()
472 {
473 return userInfo_;
474 }
475
SetProcessCommunicator(std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator)476 void AdapterStub::SetProcessCommunicator(std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator)
477 {
478 processCommunicator_ = std::move(processCommunicator);
479 }