1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "adapter_stub.h"
17 #include <memory>
18 #include "db_errno.h"
19 #include "endian_convert.h"
20 #include "frame_header.h"
21 #include "iprocess_communicator.h"
22 #include "log_print.h"
23 #include "distributeddb_communicator_common.h"
24
25 using namespace DistributedDB;
26
27 namespace {
28 const uint32_t STUB_MTU_SIZE = 5 * 1024 * 1024; // 5 M, 1024 is scale
29 const uint32_t STUB_TIME_OUT = 5 * 1000; // 5 S, 1000 is scale
30 }
31
32 /*
33 * Override Part
34 */
~AdapterStub()35 AdapterStub::~AdapterStub()
36 {
37 // Do nothing
38 }
39
StartAdapter()40 int AdapterStub::StartAdapter()
41 {
42 return E_OK;
43 }
44
StopAdapter()45 void AdapterStub::StopAdapter()
46 {
47 // Do nothing
48 }
49
GetMtuSize()50 uint32_t AdapterStub::GetMtuSize()
51 {
52 return STUB_MTU_SIZE;
53 }
54
GetMtuSize(const std::string & target)55 uint32_t AdapterStub::GetMtuSize(const std::string &target)
56 {
57 (void)target;
58 return GetMtuSize();
59 }
60
GetTimeout()61 uint32_t AdapterStub::GetTimeout()
62 {
63 return STUB_TIME_OUT;
64 }
65
GetTimeout(const std::string & target)66 uint32_t AdapterStub::GetTimeout(const std::string &target)
67 {
68 (void)target;
69 return GetTimeout();
70 }
71
GetLocalIdentity(std::string & outTarget)72 int AdapterStub::GetLocalIdentity(std::string &outTarget)
73 {
74 outTarget = localTarget_;
75 return E_OK;
76 }
77
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)78 int AdapterStub::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
79 {
80 LOGI("[UT][Stub][Send] Send length=%" PRIu32 " to dstTarget=%s begin.", length, dstTarget.c_str());
81 ApplySendBlock();
82
83 (void)totalLength;
84 {
85 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
86 if (onSendBytes_) {
87 int errCode = onSendBytes_();
88 if (errCode != E_OK) {
89 LOGI("[UT][Stub][Send] failed for %s errCode %d.", dstTarget.c_str(), errCode);
90 return errCode;
91 }
92 }
93 }
94
95 if (QuerySendRetry(dstTarget)) {
96 LOGI("[UT][Stub][Send] Retry for %s true.", dstTarget.c_str());
97 return -E_WAIT_RETRY;
98 }
99
100 if (QuerySendTotalLoss()) {
101 LOGI("[UT][Stub][Send] Total loss for %s true.", dstTarget.c_str());
102 return E_OK;
103 }
104
105 if (QuerySendPartialLoss()) {
106 LOGI("[UT][Stub][Send] Partial loss for %s true.", dstTarget.c_str());
107 return E_OK;
108 }
109
110 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
111 if (targetMapAdapter_.count(dstTarget) == 0) {
112 LOGI("[UT][Stub][Send] dstTarget=%s not found.", dstTarget.c_str());
113 return -E_NOT_FOUND;
114 }
115
116 ApplySendBitError(bytes, length);
117
118 AdapterStub *toAdapter = targetMapAdapter_[dstTarget];
119 toAdapter->DeliverBytes(localTarget_, bytes, length);
120 LOGI("[UT][Stub][Send] Send to dstTarget=%s end.", dstTarget.c_str());
121 return E_OK;
122 }
123
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)124 int AdapterStub::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
125 {
126 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
127 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
128 }
129
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)130 int AdapterStub::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
131 {
132 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
133 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
134 }
135
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)136 int AdapterStub::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
137 {
138 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
139 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
140 }
141
142
IsDeviceOnline(const std::string & device)143 bool AdapterStub::IsDeviceOnline(const std::string &device)
144 {
145 (void)device;
146 return true;
147 }
148
GetExtendHeaderHandle(const ExtendInfo & paramInfo)149 std::shared_ptr<ExtendHeaderHandle> AdapterStub::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
150 {
151 std::shared_ptr<ExtendHeaderHandle> handle = std::make_shared<ExtendHeaderHandleTest>(paramInfo);
152 return handle;
153 }
154 /*
155 * Extended Part
156 */
ConnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)157 void AdapterStub::ConnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
158 {
159 LOGI("[UT][Stub][ConnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
160 thatStub->GetLocalTarget().c_str());
161 thisStub->Connect(thatStub);
162 thatStub->Connect(thisStub);
163 }
164
DisconnectAdapterStub(AdapterStub * thisStub,AdapterStub * thatStub)165 void AdapterStub::DisconnectAdapterStub(AdapterStub *thisStub, AdapterStub *thatStub)
166 {
167 LOGI("[UT][Stub][DisconnectAdapter] thisStub=%s, thatStub=%s.", thisStub->GetLocalTarget().c_str(),
168 thatStub->GetLocalTarget().c_str());
169 thisStub->Disconnect(thatStub);
170 thatStub->Disconnect(thisStub);
171 }
172
AdapterStub(const std::string & inLocalTarget)173 AdapterStub::AdapterStub(const std::string &inLocalTarget)
174 : localTarget_(inLocalTarget)
175 {
176 }
177
GetLocalTarget()178 const std::string &AdapterStub::GetLocalTarget()
179 {
180 return localTarget_;
181 }
182
Connect(AdapterStub * inStub)183 void AdapterStub::Connect(AdapterStub *inStub)
184 {
185 LOGI("[UT][Stub][Connect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
186 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
187 bool isOnlineBefore = targetMapAdapter_.find(inStub->GetLocalTarget()) != targetMapAdapter_.end();
188 targetMapAdapter_[inStub->GetLocalTarget()] = inStub;
189 if (!isOnlineBefore && onChangeHandle_) {
190 onChangeHandle_(inStub->GetLocalTarget(), true);
191 }
192 }
193
Disconnect(AdapterStub * inStub)194 void AdapterStub::Disconnect(AdapterStub *inStub)
195 {
196 LOGI("[UT][Stub][Disconnect] thisStub=%s, thatStub=%s.", localTarget_.c_str(), inStub->GetLocalTarget().c_str());
197 std::lock_guard<std::mutex> onChangeLockGuard(onChangeMutex_);
198 targetMapAdapter_.erase(inStub->GetLocalTarget());
199 if (onChangeHandle_) {
200 onChangeHandle_(inStub->GetLocalTarget(), false);
201 }
202 }
203
DeliverBytes(const std::string & srcTarget,const uint8_t * bytes,uint32_t length)204 void AdapterStub::DeliverBytes(const std::string &srcTarget, const uint8_t *bytes, uint32_t length)
205 {
206 std::lock_guard<std::mutex> onReceiveLockGuard(onReceiveMutex_);
207 if (onReceiveHandle_) {
208 uint32_t headLength = 0;
209 std::string userId;
210 CheckAndGetDataHeadInfo(bytes, length, headLength, userId);
211 onReceiveHandle_(srcTarget, bytes + headLength, length - headLength, userId);
212 }
213 }
214
CheckAndGetDataHeadInfo(const uint8_t * data,uint32_t totalLen,uint32_t & headLength,std::string & userId)215 void AdapterStub::CheckAndGetDataHeadInfo(const uint8_t *data, uint32_t totalLen, uint32_t &headLength,
216 std::string &userId)
217 {
218 auto info = reinterpret_cast<const ExtendHeadInfo *>(data);
219 NetToHost(info->magic);
220 if (info->magic == ExtendHeaderHandleTest::MAGIC_NUM) {
221 NetToHost(info->length);
222 NetToHost(info->version);
223 headLength = info->length;
224 std::string tmpUserId(BUFF_LEN, 0);
225 for (uint8_t i = 0; i < BUFF_LEN; i++) {
226 tmpUserId[i] = info->userId[i];
227 }
228 userId = tmpUserId;
229 } else {
230 headLength = 0;
231 }
232 }
233
234 /*
235 * Simulate Part
236 */
SimulateSendBlock()237 void AdapterStub::SimulateSendBlock()
238 {
239 LOGI("[UT][Stub][Block] Before Lock.");
240 block_.lock();
241 LOGI("[UT][Stub][Block] After Lock.");
242 }
243
SimulateSendBlockClear()244 void AdapterStub::SimulateSendBlockClear()
245 {
246 LOGI("[UT][Stub][UnBlock] Before UnLock.");
247 block_.unlock();
248 LOGI("[UT][Stub][UnBlock] After UnLock.");
249 }
250
SimulateSendRetry(const std::string & dstTarget)251 void AdapterStub::SimulateSendRetry(const std::string &dstTarget)
252 {
253 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
254 targetRetrySet_.insert(dstTarget);
255 }
256
SimulateSendRetryClear(const std::string & dstTarget)257 void AdapterStub::SimulateSendRetryClear(const std::string &dstTarget)
258 {
259 {
260 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
261 if (targetRetrySet_.count(dstTarget) == 0) {
262 return;
263 }
264 targetRetrySet_.erase(dstTarget);
265 }
266 std::lock_guard<std::mutex> onSendableLockGuard(onSendableMutex_);
267 if (onSendableHandle_) {
268 onSendableHandle_(dstTarget);
269 }
270 }
271
SimulateSendPartialLoss()272 void AdapterStub::SimulateSendPartialLoss()
273 {
274 isPartialLossSimulated_ = true;
275 }
276
SimulateSendPartialLossClear()277 void AdapterStub::SimulateSendPartialLossClear()
278 {
279 isPartialLossSimulated_ = false;
280 }
281
SimulateSendTotalLoss()282 void AdapterStub::SimulateSendTotalLoss()
283 {
284 isTotalLossSimulated_ = true;
285 }
286
SimulateSendTotalLossClear()287 void AdapterStub::SimulateSendTotalLossClear()
288 {
289 isTotalLossSimulated_ = false;
290 }
291
SimulateSendBitErrorInMagicField(bool doFlag,uint16_t inMagic)292 void AdapterStub::SimulateSendBitErrorInMagicField(bool doFlag, uint16_t inMagic)
293 {
294 doChangeMagicFlag_ = doFlag;
295 magicField_ = inMagic;
296 }
297
SimulateSendBitErrorInVersionField(bool doFlag,uint16_t inVersion)298 void AdapterStub::SimulateSendBitErrorInVersionField(bool doFlag, uint16_t inVersion)
299 {
300 doChangeVersionFlag_ = doFlag;
301 versionField_ = inVersion;
302 }
303
SimulateSendBitErrorInCheckSumField(bool doFlag,uint64_t inCheckSum)304 void AdapterStub::SimulateSendBitErrorInCheckSumField(bool doFlag, uint64_t inCheckSum)
305 {
306 doChangeCheckSumFlag_ = doFlag;
307 checkSumField_ = inCheckSum;
308 }
309
SimulateSendBitErrorInPacketLenField(bool doFlag,uint32_t inPacketLen)310 void AdapterStub::SimulateSendBitErrorInPacketLenField(bool doFlag, uint32_t inPacketLen)
311 {
312 doChangePacketLenFlag_ = doFlag;
313 packetLenField_ = inPacketLen;
314 }
315
SimulateSendBitErrorInPacketTypeField(bool doFlag,uint8_t inPacketType)316 void AdapterStub::SimulateSendBitErrorInPacketTypeField(bool doFlag, uint8_t inPacketType)
317 {
318 doChangePacketTypeFlag_ = doFlag;
319 packetTypeField_ = inPacketType;
320 }
321
SimulateSendBitErrorInPaddingLenField(bool doFlag,uint8_t inPaddingLen)322 void AdapterStub::SimulateSendBitErrorInPaddingLenField(bool doFlag, uint8_t inPaddingLen)
323 {
324 doChangePaddingLenFlag_ = doFlag;
325 paddingLenField_ = inPaddingLen;
326 }
327
SimulateSendBitErrorInMessageIdField(bool doFlag,uint32_t inMessageId)328 void AdapterStub::SimulateSendBitErrorInMessageIdField(bool doFlag, uint32_t inMessageId)
329 {
330 doChangeMessageIdFlag_ = doFlag;
331 messageIdField_ = inMessageId;
332 }
333
ApplySendBlock()334 void AdapterStub::ApplySendBlock()
335 {
336 LOGI("[UT][Stub][ApplyBlock] Before Lock&UnLock.");
337 block_.lock();
338 block_.unlock();
339 LOGI("[UT][Stub][ApplyBlock] After Lock&UnLock.");
340 }
341
QuerySendRetry(const std::string & dstTarget)342 bool AdapterStub::QuerySendRetry(const std::string &dstTarget)
343 {
344 std::lock_guard<std::mutex> retryLockGuard(retryMutex_);
345 if (targetRetrySet_.count(dstTarget) == 0) {
346 return false;
347 } else {
348 return true;
349 }
350 }
351
QuerySendPartialLoss()352 bool AdapterStub::QuerySendPartialLoss()
353 {
354 if (isPartialLossSimulated_) {
355 uint64_t count = countForPartialLoss_.fetch_add(1, std::memory_order_seq_cst);
356 if (count % 2 == 0) { // 2 is half
357 return true;
358 }
359 }
360 return false;
361 }
362
QuerySendTotalLoss()363 bool AdapterStub::QuerySendTotalLoss()
364 {
365 return isTotalLossSimulated_;
366 }
367
ForkSendBytes(const DistributedDB::OnSendBytes & onSendBytes)368 void AdapterStub::ForkSendBytes(const DistributedDB::OnSendBytes &onSendBytes)
369 {
370 std::lock_guard<std::mutex> autoLock(sendBytesMutex_);
371 onSendBytes_ = onSendBytes;
372 }
373
374 namespace {
CalculateXorSum(const uint8_t * bytes,uint32_t length)375 uint64_t CalculateXorSum(const uint8_t *bytes, uint32_t length)
376 {
377 if (length % sizeof(uint64_t) != 0) {
378 return 0;
379 }
380 int count = length / sizeof(uint64_t);
381 auto array = reinterpret_cast<const uint64_t *>(bytes);
382 uint64_t outSum = 0;
383 for (int i = 0; i < count; i++) {
384 outSum ^= array[i];
385 }
386 return outSum;
387 }
388 const uint32_t LENGTH_BEFORE_SUM_RANGE = sizeof(uint64_t) + sizeof(uint64_t);
389 }
390
ApplySendBitError(const uint8_t * bytes,uint32_t length)391 void AdapterStub::ApplySendBitError(const uint8_t *bytes, uint32_t length)
392 {
393 // Change field in CommPhyHeader
394 if (length < sizeof(CommPhyHeader)) {
395 return;
396 }
397 auto edibleBytes = const_cast<uint8_t *>(bytes);
398 auto phyHeader = reinterpret_cast<CommPhyHeader *>(edibleBytes);
399 if (doChangeMagicFlag_) {
400 phyHeader->magic = HostToNet(magicField_);
401 }
402 if (doChangeVersionFlag_) {
403 phyHeader->version = HostToNet(versionField_);
404 }
405 if (doChangeCheckSumFlag_) {
406 phyHeader->checkSum = HostToNet(checkSumField_);
407 }
408 if (doChangePacketLenFlag_) {
409 phyHeader->packetLen = HostToNet(packetLenField_);
410 }
411 if (doChangePacketTypeFlag_) {
412 phyHeader->packetType = HostToNet(packetTypeField_);
413 }
414 if (doChangePaddingLenFlag_) {
415 phyHeader->paddingLen = HostToNet(paddingLenField_);
416 }
417 // Change field in MessageHeader. Assumpt that no fragment
418 if (length < sizeof(CommPhyHeader) + sizeof(CommDivergeHeader) + sizeof(MessageHeader)) {
419 return;
420 }
421 edibleBytes += (sizeof(CommPhyHeader) + sizeof(CommDivergeHeader));
422 auto msgHeader = reinterpret_cast<MessageHeader *>(edibleBytes);
423 if (doChangeMessageIdFlag_) {
424 msgHeader->messageId = HostToNet(messageIdField_);
425 phyHeader->checkSum = HostToNet(CalculateXorSum(bytes + LENGTH_BEFORE_SUM_RANGE,
426 length - LENGTH_BEFORE_SUM_RANGE));
427 }
428 }