1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "uart.h"
16
17 using namespace std::chrono;
18 namespace Hdc {
19 ExternInterface HdcUARTBase::defaultInterface;
20
SetTcpOptions(uv_tcp_t * tcpHandle)21 void ExternInterface::SetTcpOptions(uv_tcp_t *tcpHandle)
22 {
23 return Base::SetTcpOptions(tcpHandle);
24 }
25
SendToStream(uv_stream_t * handleStream,const uint8_t * buf,const int len)26 int ExternInterface::SendToStream(uv_stream_t *handleStream, const uint8_t *buf, const int len)
27 {
28 return Base::SendToStream(handleStream, buf, len);
29 }
30
UvTcpInit(uv_loop_t * loop,uv_tcp_t * tcp,int socketFd)31 int ExternInterface::UvTcpInit(uv_loop_t *loop, uv_tcp_t *tcp, int socketFd)
32 {
33 if (uv_tcp_init(loop, tcp) == 0) {
34 return uv_tcp_open(tcp, socketFd);
35 } else {
36 return -1;
37 }
38 }
39
UvRead(uv_stream_t * stream,uv_alloc_cb allocCallBack,uv_read_cb readCallBack)40 int ExternInterface::UvRead(uv_stream_t *stream, uv_alloc_cb allocCallBack, uv_read_cb readCallBack)
41 {
42 return uv_read_start(stream, allocCallBack, readCallBack);
43 }
44
StartWorkThread(uv_loop_t * loop,uv_work_cb pFuncWorkThread,uv_after_work_cb pFuncAfterThread,void * pThreadData)45 int ExternInterface::StartWorkThread(uv_loop_t *loop, uv_work_cb pFuncWorkThread,
46 uv_after_work_cb pFuncAfterThread, void *pThreadData)
47 {
48 return Base::StartWorkThread(loop, pFuncWorkThread, pFuncAfterThread, pThreadData);
49 }
50
TryCloseHandle(const uv_handle_t * handle,uv_close_cb closeCallBack)51 void ExternInterface::TryCloseHandle(const uv_handle_t *handle, uv_close_cb closeCallBack)
52 {
53 return Base::TryCloseHandle(handle, closeCallBack);
54 }
55
TimerUvTask(uv_loop_t * loop,void * data,uv_timer_cb cb)56 bool ExternInterface::TimerUvTask(uv_loop_t *loop, void *data, uv_timer_cb cb)
57 {
58 return Base::TimerUvTask(loop, data, cb);
59 }
UvTimerStart(uv_timer_t * handle,uv_timer_cb cb,uint64_t timeout,uint64_t repeat)60 bool ExternInterface::UvTimerStart(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout,
61 uint64_t repeat)
62 {
63 return uv_timer_start(handle, cb, timeout, repeat);
64 }
65
DelayDo(uv_loop_t * loop,const int delayMs,const uint8_t flag,string msg,void * data,DelayCB cb)66 bool ExternInterface::DelayDo(uv_loop_t *loop, const int delayMs, const uint8_t flag, string msg,
67 void *data, DelayCB cb)
68 {
69 return Base::DelayDo(loop, delayMs, flag, msg, data, cb);
70 }
71
HdcUARTBase(HdcSessionBase & sessionBaseIn,ExternInterface & interfaceIn)72 HdcUARTBase::HdcUARTBase(HdcSessionBase &sessionBaseIn, ExternInterface &interfaceIn)
73 : externInterface(interfaceIn), sessionBase(sessionBaseIn)
74 {
75 uartOpened = false;
76 }
77
~HdcUARTBase(void)78 HdcUARTBase::~HdcUARTBase(void) {}
79
80 #ifndef _WIN32
GetUartSpeed(int speed)81 int HdcUARTBase::GetUartSpeed(int speed)
82 {
83 switch (speed) {
84 case UART_SPEED2400:
85 return (B2400);
86 break;
87 case UART_SPEED4800:
88 return (B4800);
89 break;
90 case UART_SPEED9600:
91 return (B9600);
92 break;
93 case UART_SPEED115200:
94 return (B115200);
95 break;
96 case UART_SPEED921600:
97 return (B921600);
98 break;
99 default:
100 return (B921600);
101 break;
102 }
103 }
GetUartBits(int bits)104 int HdcUARTBase::GetUartBits(int bits)
105 {
106 switch (bits) {
107 case UART_BIT1:
108 return (CS7);
109 break;
110 case UART_BIT2:
111 return (CS8);
112 break;
113 default:
114 return (CS8);
115 break;
116 }
117 }
118
SetSerial(int fd,int nSpeed,int nBits,char nEvent,int nStop)119 int HdcUARTBase::SetSerial(int fd, int nSpeed, int nBits, char nEvent, int nStop)
120 {
121 struct termios newttys1, oldttys1;
122 if (tcgetattr(fd, &oldttys1) != 0) {
123 constexpr int bufSize = 1024;
124 char buf[bufSize] = { 0 };
125 strerror_r(errno, buf, bufSize);
126 WRITE_LOG(LOG_DEBUG, "tcgetattr failed with %s\n", buf);
127 return ERR_GENERIC;
128 }
129 bzero(&newttys1, sizeof(newttys1));
130 newttys1.c_cflag = GetUartSpeed(nSpeed);
131 newttys1.c_cflag |= (CLOCAL | CREAD);
132 newttys1.c_cflag &= ~CSIZE;
133 newttys1.c_lflag &= ~ICANON;
134 newttys1.c_cflag |= GetUartBits(nBits);
135 switch (nEvent) {
136 case '0':
137 newttys1.c_cflag |= PARENB;
138 newttys1.c_iflag |= (INPCK | ISTRIP);
139 newttys1.c_cflag |= PARODD;
140 break;
141 case 'E':
142 newttys1.c_cflag |= PARENB;
143 newttys1.c_iflag |= (INPCK | ISTRIP);
144 newttys1.c_cflag &= ~PARODD;
145 break;
146 case 'N':
147 newttys1.c_cflag &= ~PARENB;
148 break;
149 default:
150 break;
151 }
152 if (nStop == UART_STOP1) {
153 newttys1.c_cflag &= ~CSTOPB;
154 } else if (nStop == UART_STOP2) {
155 newttys1.c_cflag |= CSTOPB;
156 }
157 newttys1.c_cc[VTIME] = 0;
158 newttys1.c_cc[VMIN] = 0;
159 if (tcflush(fd, TCIOFLUSH)) {
160 WRITE_LOG(LOG_DEBUG, " tcflush error.");
161 return ERR_GENERIC;
162 }
163 if ((tcsetattr(fd, TCSANOW, &newttys1)) != 0) {
164 WRITE_LOG(LOG_DEBUG, " com set error");
165 return ERR_GENERIC;
166 }
167 WRITE_LOG(LOG_DEBUG, " SetSerial OK");
168 return RET_SUCCESS;
169 }
170 #endif // _WIN32
171
ReadUartDev(std::vector<uint8_t> & readBuf,size_t expectedSize,HdcUART & uart)172 ssize_t HdcUARTBase::ReadUartDev(std::vector<uint8_t> &readBuf, size_t expectedSize, HdcUART &uart)
173 {
174 ssize_t totalBytesRead = 0;
175 uint8_t uartReadBuffer[MAX_UART_SIZE_IOBUF];
176 #ifdef _WIN32
177 DWORD bytesRead = 0;
178 #else
179 ssize_t bytesRead = 0;
180 #endif
181 do {
182 bytesRead = 0;
183 #ifdef _WIN32
184 BOOL bReadStatus = ReadFile(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer),
185 &bytesRead, &uart.ovRead);
186 if (!bReadStatus) {
187 if (GetLastError() == ERROR_IO_PENDING) {
188 bytesRead = 0;
189 DWORD dwMilliseconds = ReadGiveUpTimeOutTimeMs;
190 if (expectedSize == 0) {
191 dwMilliseconds = INFINITE;
192 }
193 if (!GetOverlappedResultEx(uart.devUartHandle, &uart.ovRead, &bytesRead,
194 dwMilliseconds, FALSE)) {
195 // wait io failed
196 DWORD error = GetLastError();
197 if (error == ERROR_OPERATION_ABORTED) {
198 totalBytesRead += bytesRead;
199 WRITE_LOG(LOG_DEBUG, "%s error cancel read. %u %zd", __FUNCTION__,
200 bytesRead, totalBytesRead);
201 // Generally speaking, this is the cacnel caused by freesession
202 // Returning allows the outer read loop to run again. This checks the exit
203 // condition.
204 return totalBytesRead;
205 } else if (error == WAIT_TIMEOUT) {
206 totalBytesRead += bytesRead;
207 WRITE_LOG(LOG_DEBUG, "%s error timeout. %u %zd", __FUNCTION__, bytesRead,
208 totalBytesRead);
209 return totalBytesRead;
210 } else {
211 WRITE_LOG(LOG_DEBUG, "%s error wait io:%d.", __FUNCTION__, GetLastError());
212 }
213 return -1;
214 }
215 } else {
216 // not ERROR_IO_PENDING
217 WRITE_LOG(LOG_DEBUG, "%s err:%d. ", __FUNCTION__, GetLastError());
218 return -1;
219 }
220 }
221 #else
222 int ret = 0;
223 fd_set readFds;
224 FD_ZERO(&readFds);
225 FD_SET(uart.devUartHandle, &readFds);
226 const constexpr int msTous = 1000;
227 struct timeval tv;
228 tv.tv_sec = 0;
229
230 if (expectedSize == 0) {
231 tv.tv_usec = WaitResponseTimeOutMs * msTous;
232 #ifdef HDC_HOST
233 // only host side need this
234 // in this caes
235 // We need a way to exit from the select for the destruction and recovery of the
236 // serial port read thread.
237 ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
238 #else
239 ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, nullptr);
240 #endif
241 } else {
242 // when we have expect size , we need timeout for link data drop issue
243 tv.tv_usec = ReadGiveUpTimeOutTimeMs * msTous;
244 ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
245 }
246 if (ret == 0 and expectedSize == 0) {
247 // no expect but timeout
248 if (uart.ioCancel) {
249 WRITE_LOG(LOG_DEBUG, "%s:uart select time out and io cancel", __FUNCTION__);
250 uart.ioCancel = true;
251 return totalBytesRead;
252 } else {
253 continue;
254 }
255 } else if (ret == 0) {
256 WRITE_LOG(LOG_DEBUG, "%s:uart select time out!", __FUNCTION__);
257 // we expected some byte , but not arrive before timeout
258 return totalBytesRead;
259 } else if (ret < 0) {
260 WRITE_LOG(LOG_DEBUG, "%s:uart select error! %d", __FUNCTION__, errno);
261 return -1; // wait failed.
262 } else {
263 // select > 0
264 bytesRead = read(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer));
265 if (bytesRead <= 0) {
266 // read failed !
267 WRITE_LOG(LOG_WARN, "%s:read failed! %zd:%d", __FUNCTION__, bytesRead, errno);
268 return -1;
269 }
270 }
271 #endif
272 if (bytesRead > 0) {
273 readBuf.insert(readBuf.end(), uartReadBuffer, uartReadBuffer + bytesRead);
274 totalBytesRead += bytesRead;
275 }
276 } while (readBuf.size() < expectedSize or
277 bytesRead == 0); // if caller know how many bytes it want
278 return totalBytesRead;
279 }
280
WriteUartDev(uint8_t * data,const size_t length,HdcUART & uart)281 ssize_t HdcUARTBase::WriteUartDev(uint8_t *data, const size_t length, HdcUART &uart)
282 {
283 ssize_t totalBytesWrite = 0;
284 WRITE_LOG(LOG_ALL, "%s %d data %x %x", __FUNCTION__, length, *(data + sizeof(UartHead)),
285 *(data + sizeof(UartHead) + 1));
286 do {
287 #ifdef _WIN32
288 DWORD bytesWrite = 0;
289 BOOL bWriteStat = WriteFile(uart.devUartHandle, data + totalBytesWrite,
290 length - totalBytesWrite, &bytesWrite, &uart.ovWrite);
291 if (!bWriteStat) {
292 if (GetLastError() == ERROR_IO_PENDING) {
293 if (!GetOverlappedResult(uart.devUartHandle, &uart.ovWrite, &bytesWrite, TRUE)) {
294 WRITE_LOG(LOG_DEBUG, "%s error wait io:%d. bytesWrite %zu", __FUNCTION__,
295 GetLastError(), bytesWrite);
296 return -1;
297 }
298 } else {
299 WRITE_LOG(LOG_DEBUG, "%s err:%d. bytesWrite %zu", __FUNCTION__, GetLastError(),
300 bytesWrite);
301 return -1;
302 }
303 }
304 #else // not win32
305 ssize_t bytesWrite = 0;
306 bytesWrite = write(uart.devUartHandle, data + totalBytesWrite, length - totalBytesWrite);
307 if (bytesWrite < 0) {
308 if (errno == EINTR or errno == EAGAIN) {
309 WRITE_LOG(LOG_WARN, "EINTR/EAGAIN, try again");
310 continue;
311 } else {
312 // we don't know how to recory in this function
313 // need reopen device ?
314 constexpr int bufSize = 1024;
315 char buf[bufSize] = { 0 };
316 strerror_r(errno, buf, bufSize);
317 WRITE_LOG(LOG_FATAL, "write fatal errno %d:%s", errno, buf);
318 return -1;
319 }
320 } else {
321 // waits until all output written to the object referred to by fd has been transmitted.
322 tcdrain(uart.devUartHandle);
323 }
324 #endif
325 totalBytesWrite += bytesWrite;
326 } while (totalBytesWrite < signed(length));
327
328 return totalBytesWrite;
329 }
330
UartToHdcProtocol(uv_stream_t * stream,uint8_t * data,int dataSize)331 int HdcUARTBase::UartToHdcProtocol(uv_stream_t *stream, uint8_t *data, int dataSize)
332 {
333 HSession hSession = (HSession)stream->data;
334 unsigned int fd = hSession->dataFd[STREAM_MAIN];
335 fd_set fdSet;
336 struct timeval timeout = {3, 0};
337 FD_ZERO(&fdSet);
338 FD_SET(fd, &fdSet);
339 int index = 0;
340 int childRet = 0;
341
342 while (index < dataSize) {
343 childRet = select(fd + 1, NULL, &fdSet, NULL, &timeout);
344 if (childRet <= 0) {
345 constexpr int bufSize = 1024;
346 char buf[bufSize] = { 0 };
347 #ifdef _WIN32
348 strerror_s(buf, bufSize, errno);
349 #else
350 strerror_r(errno, buf, bufSize);
351 #endif
352 WRITE_LOG(LOG_FATAL, "%s select error:%d [%s][%d]", __FUNCTION__, errno,
353 buf, childRet);
354 break;
355 }
356 childRet = send(fd, (const char *)data + index, dataSize - index, 0);
357 if (childRet < 0) {
358 constexpr int bufSize = 1024;
359 char buf[bufSize] = { 0 };
360 #ifdef _WIN32
361 strerror_s(buf, bufSize, errno);
362 #else
363 strerror_r(errno, buf, bufSize);
364 #endif
365 WRITE_LOG(LOG_FATAL, "%s senddata err:%d [%s]", __FUNCTION__, errno, buf);
366 break;
367 }
368 index += childRet;
369 }
370 if (index != dataSize) {
371 WRITE_LOG(LOG_FATAL, "%s partialsenddata err:%d [%d]", __FUNCTION__, index, dataSize);
372 return ERR_IO_FAIL;
373 }
374 return index;
375 }
376
DispatchToWorkThread(HSession hSession,uint8_t * readBuf,int readBytes)377 RetErrCode HdcUARTBase::DispatchToWorkThread(HSession hSession, uint8_t *readBuf, int readBytes)
378 {
379 if (hSession == nullptr) {
380 return ERR_SESSION_NOFOUND;
381 }
382 if (!UartSendToHdcStream(hSession, readBuf, readBytes)) {
383 return ERR_IO_FAIL;
384 }
385 return RET_SUCCESS;
386 }
387
PackageProcess(vector<uint8_t> & data,HSession hSession)388 size_t HdcUARTBase::PackageProcess(vector<uint8_t> &data, HSession hSession)
389 {
390 while (data.size() >= sizeof(UartHead)) {
391 // is size more than one head
392 size_t packetSize = 0;
393 uint32_t sessionId = 0;
394 uint32_t packageIndex = 0;
395 // we erase all buffer. wait next read.
396 if (ValidateUartPacket(data, sessionId, packageIndex, packetSize) != RET_SUCCESS) {
397 WRITE_LOG(LOG_WARN, "%s package error. clean the read buffer.", __FUNCTION__);
398 data.clear();
399 } else if (packetSize == sizeof(UartHead)) {
400 // nothing need to send, this is a head only package
401 // only used in link layer
402 WRITE_LOG(LOG_ALL, "%s headonly Package(%zu). dont send to session, erase it",
403 __FUNCTION__, packetSize);
404 } else {
405 // at least we got one package
406 // if the size of packge have all received ?
407 if (data.size() >= packetSize) {
408 // send the data to logic level (link to logic)
409 if (hSession == nullptr) {
410 #ifdef HDC_HOST
411 hSession = GetSession(sessionId);
412 #else
413 // for daemon side we can make a new session for it
414 hSession = GetSession(sessionId, true);
415 #endif
416 }
417 if (hSession == nullptr) {
418 WRITE_LOG(LOG_WARN, "%s have not found seesion (%u). skip it", __FUNCTION__, sessionId);
419 } else {
420 if (hSession->hUART->dispatchedPackageIndex == packageIndex) {
421 // we need check if the duplication pacakge we have already send
422 WRITE_LOG(LOG_WARN, "%s dup package %u, skip send to session logic",
423 __FUNCTION__, packageIndex);
424 } else {
425 // update the last package we will send to hdc
426 hSession->hUART->dispatchedPackageIndex = packageIndex;
427 RetErrCode ret = DispatchToWorkThread(hSession, data.data(), packetSize);
428 if (ret == RET_SUCCESS) {
429 WRITE_LOG(LOG_DEBUG, "%s DispatchToWorkThread successful",
430 __FUNCTION__);
431 } else {
432 // send to logic failed.
433 // this kind of issue unable handle in link layer
434 WRITE_LOG(LOG_FATAL,
435 "%s DispatchToWorkThread fail %d. requeset free session in "
436 "other side",
437 __FUNCTION__, ret);
438 ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex,
439 PKG_OPTION_FREE);
440 }
441 }
442 }
443 } else {
444 WRITE_LOG(LOG_DEBUG, "%s valid package, however size not enough. expect %zu",
445 __FUNCTION__, packetSize);
446 return packetSize;
447 }
448 }
449
450 if (data.size() >= packetSize) {
451 data.erase(data.begin(), data.begin() + packetSize);
452 } else {
453 // dont clean , should merge with next package
454 }
455 WRITE_LOG(LOG_DEBUG, "PackageProcess data.size():%d left", data.size());
456 }
457 // if we have at least one byte, we think there should be a head
458 return data.size() > 1 ? sizeof(UartHead) : 0;
459 }
460
SendUARTRaw(HSession hSession,uint8_t * data,const size_t length)461 bool HdcUARTBase::SendUARTRaw(HSession hSession, uint8_t *data, const size_t length)
462 {
463 struct UartHead *uartHeader = (struct UartHead *)data;
464 #ifndef HDC_HOST
465 // review nobody can plug out the daemon uart , if we still need split write in daemon side?
466 HdcUART deamonUart;
467 deamonUart.devUartHandle = uartHandle;
468 if (uartHeader->IsResponsePackage()) {
469 // for the response package and in daemon side,
470 // we dont need seesion info
471 ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
472 return sendBytes > 0;
473 }
474 #endif
475
476 // for normal package
477 if (hSession == nullptr) {
478 hSession = GetSession(uartHeader->sessionId);
479 if (hSession == nullptr) {
480 // session is not found
481 WRITE_LOG(LOG_WARN, "%s hSession not found:%zu", __FUNCTION__, uartHeader->sessionId);
482 return false;
483 }
484 }
485 hSession->ref++;
486 WRITE_LOG(LOG_DEBUG, "%s length:%d", __FUNCTION__, length);
487 #ifdef HDC_HOST
488 ssize_t sendBytes = WriteUartDev(data, length, *hSession->hUART);
489 #else
490 ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
491 #endif
492 WRITE_LOG(LOG_DEBUG, "%s sendBytes %zu", __FUNCTION__, sendBytes);
493 if (sendBytes < 0) {
494 WRITE_LOG(LOG_DEBUG, "%s send fail. try to freesession", __FUNCTION__);
495 OnTransferError(hSession);
496 }
497 hSession->ref--;
498 return sendBytes > 0;
499 }
500
501 // this function will not check the data correct again
502 // just send the data to hdc session side
UartSendToHdcStream(HSession hSession,uint8_t * data,size_t size)503 bool HdcUARTBase::UartSendToHdcStream(HSession hSession, uint8_t *data, size_t size)
504 {
505 WRITE_LOG(LOG_DEBUG, "%s send to session %s package size %zu", __FUNCTION__,
506 hSession->ToDebugString().c_str(), size);
507
508 int ret = RET_SUCCESS;
509
510 if (size < sizeof(UartHead)) {
511 WRITE_LOG(LOG_FATAL, "%s buf size too small %zu", __FUNCTION__, size);
512 return ERR_BUF_SIZE;
513 }
514
515 UartHead *head = reinterpret_cast<UartHead *>(data);
516 WRITE_LOG(LOG_DEBUG, "%s uartHeader:%s data: %x %x", __FUNCTION__,
517 head->ToDebugString().c_str(), *(data + sizeof(UartHead)),
518 *(data + sizeof(UartHead) + 1));
519
520 // review need check logic again here or err process
521 if (head->sessionId != hSession->sessionId) {
522 if (hSession->serverOrDaemon && !hSession->hUART->resetIO) {
523 WRITE_LOG(LOG_FATAL, "%s sessionId not matched, reset sessionId:%d.", __FUNCTION__,
524 head->sessionId);
525 SendUartSoftReset(hSession, head->sessionId);
526 hSession->hUART->resetIO = true;
527 ret = ERR_IO_SOFT_RESET;
528 // dont break ,we need rease these data in recv buffer
529 }
530 } else {
531 // data to session
532 hSession->hUART->streamSize += head->dataSize; // this is only for debug,
533 WRITE_LOG(LOG_ALL, "%s stream wait session read size: %zu", __FUNCTION__,
534 hSession->hUART->streamSize.load());
535 if (UartToHdcProtocol(reinterpret_cast<uv_stream_t *>(&hSession->dataPipe[STREAM_MAIN]),
536 data + sizeof(UartHead), head->dataSize) < 0) {
537 ret = ERR_IO_FAIL;
538 WRITE_LOG(LOG_FATAL, "%s Error uart send to stream", __FUNCTION__);
539 }
540 }
541
542 return ret == RET_SUCCESS;
543 }
544
NotifyTransfer()545 void HdcUARTBase::NotifyTransfer()
546 {
547 WRITE_LOG(LOG_DEBUG, "%s", __FUNCTION__);
548 transfer.Request();
549 }
550
551 /*
552 here we have a HandleOutputPkg vector
553 It is used to maintain the data reliability of the link layer
554 It consists of the following part
555 Log data to send (caller thread) --> RequestSendPackage
556 Send recorded data (loop sending thread) --> SendPkgInUARTOutMap
557 Process the returned reply data (loop reading thread) --> ProcessResponsePackage
558 Send reply packet (loop reading thread) --> ResponseUartTrans
559
560 The key scenarios are as follows:
561 Package is sent from side A to side B
562 Here we call the complete data package
563 package is divided into head and data
564 The response information is in the header.
565 data contains binary data.
566
567 case 1: Normal Process
568 package
569 A --> B
570 ACK
571 A <-- B
572
573 case 2: packet is incorrect
574 At least one header must be received
575 For this the B side needs to have an accept timeout.
576 There is no new data within a certain period of time as the end of the packet.
577 (This mechanism is not handled in HandleOutputPkg retransmission)
578
579 incorrect
580 A --> B
581 B sends NAK and A resends the packet.
582 NAK
583 A <-- B
584 package resend
585 A --> B
586
587 case 3: packet is complete lost()
588 package(complete lost)
589 A -x-> B
590 The A side needs to resend the Package after a certain timeout
591 A --> B
592 Until the B side has a data report (ACK or NAK), or the number of retransmissions reaches the upper
593 limit.
594 */
RequestSendPackage(uint8_t * data,const size_t length,bool queue)595 void HdcUARTBase::RequestSendPackage(uint8_t *data, const size_t length, bool queue)
596 {
597 UartHead *head = reinterpret_cast<UartHead *>(data);
598 bool response = head->IsResponsePackage();
599
600 if (queue) {
601 slots.Wait(head->sessionId);
602 }
603
604 std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
605
606 std::string pkgId = head->ToPkgIdentityString(response);
607 auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
608 if (it == outPkgs.end()) {
609 // update che checksum , both head and data
610 head->UpdateCheckSum();
611 outPkgs.emplace_back(pkgId, head->sessionId, data, length, response,
612 head->option & PKG_OPTION_ACK);
613 WRITE_LOG(LOG_DEBUG, "UartPackageManager: add pkg %s (pkgs size %zu)",
614 head->ToDebugString().c_str(), outPkgs.size());
615 } else {
616 WRITE_LOG(LOG_FATAL, "UartPackageManager: add pkg %s fail, %s has already been exist.",
617 head->ToDebugString().c_str(), pkgId.c_str());
618 }
619 NotifyTransfer();
620 }
621
ProcessResponsePackage(const UartHead & head)622 void HdcUARTBase::ProcessResponsePackage(const UartHead &head)
623 {
624 std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
625 bool ack = head.option & PKG_OPTION_ACK;
626 // response package
627 std::string pkgId = head.ToPkgIdentityString();
628 WRITE_LOG(LOG_ALL, "UartPackageManager: got response pkgId:%s ack:%d.", pkgId.c_str(), ack);
629
630 auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
631 if (it != outPkgs.end()) {
632 if (ack) { // response ACK.
633 slots.Free(it->sessionId);
634 outPkgs.erase(it);
635 WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkgId:%s.", pkgId.c_str());
636 } else { // response NAK
637 it->pkgStatus = PKG_WAIT_SEND; // Re send the pkg
638 WRITE_LOG(LOG_WARN, "UartPackageManager: resend pkgId:%s.", pkgId.c_str());
639 }
640 } else {
641 WRITE_LOG(LOG_FATAL, "UartPackageManager: hasn't found pkg for pkgId:%s.", pkgId.c_str());
642 for (auto pkg : outPkgs) {
643 WRITE_LOG(LOG_ALL, "UartPackageManager: pkgId:%s.", pkg.key.c_str());
644 }
645 }
646 NotifyTransfer();
647 return;
648 }
649
SendPkgInUARTOutMap()650 void HdcUARTBase::SendPkgInUARTOutMap()
651 {
652 std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
653 if (outPkgs.empty()) {
654 WRITE_LOG(LOG_ALL, "UartPackageManager: No pkgs needs to be sent.");
655 return;
656 }
657 WRITE_LOG(LOG_DEBUG, "UartPackageManager: send pkgs, have:%zu pkgs", outPkgs.size());
658 // we have maybe more than one session
659 // each session has it owner serial port
660 std::unordered_set<uint32_t> hasWaitPkg;
661 auto it = outPkgs.begin();
662 while (it != outPkgs.end()) {
663 if (it->pkgStatus == PKG_WAIT_SEND) {
664 // we found a pkg wait for send
665 // if a response package
666 // response package always send nowait noorder
667 if (!it->response and hasWaitPkg.find(it->sessionId) != hasWaitPkg.end()) {
668 // this is not a response package
669 // and this session is wait response
670 // so we can send nothing
671 // process next
672 it++;
673 continue;
674 }
675 // we will ready to send the package
676 WRITE_LOG(LOG_DEBUG, "UartPackageManager: send pkg %s", it->ToDebugString().c_str());
677 SendUARTRaw(nullptr, it->msgSendBuf.data(), it->msgSendBuf.size());
678 if (it->response) {
679 // response pkg dont need wait response again.
680 WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkg %s",
681 it->ToDebugString().c_str());
682 it = outPkgs.erase(it);
683 continue;
684 } else {
685 // normal send package
686 it->pkgStatus = PKG_WAIT_RESPONSE;
687 it->sendTimePoint = steady_clock::now();
688 hasWaitPkg.emplace(it->sessionId);
689 transfer.Sent(); // something is sendout, transfer will timeout for next wait.
690 }
691 } else if (it->pkgStatus == PKG_WAIT_RESPONSE) {
692 // we found a pkg wiat for response
693 auto elapsedTime = duration_cast<milliseconds>(steady_clock::now() - it->sendTimePoint);
694 WRITE_LOG(LOG_DEBUG, "UartPackageManager: pkg:%s is wait ACK. elapsedTime %lld",
695 it->ToDebugString().c_str(), (long long)elapsedTime.count());
696 if (elapsedTime.count() >= WaitResponseTimeOutMs) {
697 // check the response timeout
698 if (it->retryChance > 0) {
699 // if it send timeout, resend it again.
700 WRITE_LOG(LOG_WARN, "UartPackageManager: pkg:%s try resend it.",
701 it->ToDebugString().c_str());
702 it->pkgStatus = PKG_WAIT_SEND;
703 it->retryChance--;
704 NotifyTransfer(); // make transfer reschedule
705 break; // dont process anything now.
706 } else {
707 // the response it timeout and retry counx is 0
708 // the link maybe not stable
709 // let's free this session
710 WRITE_LOG(LOG_WARN, "UartPackageManager: reach max retry ,free the seesion %u",
711 it->sessionId);
712 OnTransferError(GetSession(it->sessionId));
713 // dont reschedule here
714 // wait next schedule from this path
715 // OnTransferError -> FreeSession -> ClearUARTOutMap -> NotifyTransfer
716 break;
717 }
718 }
719 hasWaitPkg.emplace(it->sessionId);
720 }
721 it++; // next package
722 }
723 WRITE_LOG(LOG_DEBUG, "UartPackageManager: send finish, have %zu pkgs", outPkgs.size());
724 }
725
ClearUARTOutMap(uint32_t sessionId)726 void HdcUARTBase::ClearUARTOutMap(uint32_t sessionId)
727 {
728 WRITE_LOG(LOG_DEBUG, "%s UartPackageManager clean for sessionId %u", __FUNCTION__, sessionId);
729 size_t erased = 0;
730 std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
731 auto it = outPkgs.begin();
732 while (it != outPkgs.end()) {
733 if (it->sessionId == sessionId) {
734 if (!it->response) {
735 slots.Free(it->sessionId);
736 }
737 it = outPkgs.erase(it);
738 erased++;
739 } else {
740 it++;
741 }
742 }
743 WRITE_LOG(LOG_DEBUG, "%s erased %zu", __FUNCTION__, erased);
744
745 NotifyTransfer(); // tell transfer we maybe have some change
746 }
747
EnsureAllPkgsSent()748 void HdcUARTBase::EnsureAllPkgsSent()
749 {
750 WRITE_LOG(LOG_DEBUG, "%s", __FUNCTION__);
751 slots.WaitFree();
752 if (!outPkgs.empty()) {
753 std::this_thread::sleep_for(1000ms);
754 }
755 WRITE_LOG(LOG_DEBUG, "%s done.", __FUNCTION__);
756 }
757
ValidateUartPacket(vector<uint8_t> & data,uint32_t & sessionId,uint32_t & packageIndex,size_t & packetSize)758 RetErrCode HdcUARTBase::ValidateUartPacket(vector<uint8_t> &data, uint32_t &sessionId,
759 uint32_t &packageIndex, size_t &packetSize)
760 {
761 constexpr auto maxBufFactor = 1;
762 struct UartHead *head = (struct UartHead *)data.data();
763 WRITE_LOG(LOG_DEBUG, "%s %s", __FUNCTION__, head->ToDebugString().c_str());
764
765 if (memcmp(head->flag, PACKET_FLAG.c_str(), PACKET_FLAG.size()) != 0) {
766 WRITE_LOG(LOG_FATAL, "%s,PACKET_FLAG not correct %x %x", __FUNCTION__, head->flag[0],
767 head->flag[1]);
768 return ERR_BUF_CHECK;
769 }
770
771 if (!head->ValidateHead()) {
772 WRITE_LOG(LOG_FATAL, "%s head checksum not correct", __FUNCTION__);
773 return ERR_BUF_CHECK;
774 }
775 // after validate , id and fullPackageLength is correct
776 sessionId = head->sessionId;
777 packetSize = head->dataSize + sizeof(UartHead);
778 packageIndex = head->packageIndex;
779
780 if ((head->dataSize + sizeof(UartHead)) > MAX_UART_SIZE_IOBUF * maxBufFactor) {
781 WRITE_LOG(LOG_FATAL, "%s dataSize too larger:%d", __FUNCTION__, head->dataSize);
782 return ERR_BUF_OVERFLOW;
783 }
784
785 if ((head->option & PKG_OPTION_RESET)) {
786 // The Host end program is restarted, but the UART cable is still connected
787 WRITE_LOG(LOG_WARN, "%s host side want restart daemon, restart old sessionId:%u",
788 __FUNCTION__, head->sessionId);
789 ResetOldSession(head->sessionId);
790 return ERR_IO_SOFT_RESET;
791 }
792
793 if ((head->option & PKG_OPTION_FREE)) {
794 // other side tell us the session need reset
795 // we should free it
796 WRITE_LOG(LOG_WARN, "%s other side tell us the session need free:%u", __FUNCTION__,
797 head->sessionId);
798 Restartession(GetSession(head->sessionId));
799 }
800
801 // check data
802 if (data.size() >= packetSize) {
803 // if we have full package now ?
804 if (!head->ValidateData()) {
805 WRITE_LOG(LOG_FATAL, "%s data checksum not correct", __FUNCTION__);
806 return ERR_BUF_CHECK;
807 }
808 if (head->IsResponsePackage()) {
809 // response package
810 ProcessResponsePackage(*head);
811 } else {
812 // link layer response for no response package
813 ResponseUartTrans(head->sessionId, head->packageIndex, PKG_OPTION_ACK);
814 }
815 }
816
817 return RET_SUCCESS;
818 }
819
ResponseUartTrans(uint32_t sessionId,uint32_t packageIndex,UartProtocolOption option)820 void HdcUARTBase::ResponseUartTrans(uint32_t sessionId, uint32_t packageIndex,
821 UartProtocolOption option)
822 {
823 UartHead uartHeader(sessionId, option, 0, packageIndex);
824 WRITE_LOG(LOG_DEBUG, "%s option:%u", __FUNCTION__, option);
825 RequestSendPackage(reinterpret_cast<uint8_t *>(&uartHeader), sizeof(UartHead), false);
826 }
827
SendUARTData(HSession hSession,uint8_t * data,const size_t length)828 int HdcUARTBase::SendUARTData(HSession hSession, uint8_t *data, const size_t length)
829 {
830 constexpr int maxIOSize = MAX_UART_SIZE_IOBUF;
831 WRITE_LOG(LOG_DEBUG, "SendUARTData hSession:%u, total length:%d", hSession->sessionId, length);
832 const int packageDataMaxSize = maxIOSize - sizeof(UartHead);
833 size_t offset = 0;
834 uint8_t sendDataBuf[MAX_UART_SIZE_IOBUF];
835
836 WRITE_LOG(LOG_ALL, "SendUARTData data length :%d", length);
837
838 do {
839 UartHead *head = (UartHead *)sendDataBuf;
840 if (memset_s(head, sizeof(UartHead), 0, sizeof(UartHead)) != EOK) {
841 return ERR_BUF_RESET;
842 }
843 if (memcpy_s(head->flag, sizeof(head->flag), PACKET_FLAG.c_str(), PACKET_FLAG.size()) !=
844 EOK) {
845 return ERR_BUF_COPY;
846 }
847 head->sessionId = hSession->sessionId;
848 head->packageIndex = ++hSession->hUART->packageIndex;
849
850 int RemainingDataSize = length - offset;
851 if (RemainingDataSize > packageDataMaxSize) {
852 // more than one package max data size
853 head->dataSize = static_cast<uint16_t>(packageDataMaxSize);
854 } else {
855 // less then the max size
856 head->dataSize = static_cast<uint16_t>(RemainingDataSize);
857 // this is the last package . all the data will send after this time
858 head->option = head->option | PKG_OPTION_TAIL;
859 }
860 #ifdef UART_FULL_LOG
861 WRITE_LOG(LOG_FULL, "offset %d length %d", offset, length);
862 #endif
863 uint8_t *payload = sendDataBuf + sizeof(UartHead);
864 if (EOK !=
865 memcpy_s(payload, packageDataMaxSize, (uint8_t *)data + offset, head->dataSize)) {
866 WRITE_LOG(LOG_FATAL, "memcpy_s failed max %zu , need %zu",
867 packageDataMaxSize, head->dataSize);
868 return ERR_BUF_COPY;
869 }
870 offset += head->dataSize;
871 int packageFullSize = sizeof(UartHead) + head->dataSize;
872 WRITE_LOG(LOG_ALL, "SendUARTData =============> %s", head->ToDebugString().c_str());
873 RequestSendPackage(sendDataBuf, packageFullSize);
874 } while (offset != length);
875
876 return offset;
877 }
878
ReadDataFromUARTStream(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)879 void HdcUARTBase::ReadDataFromUARTStream(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
880 {
881 HSession hSession = (HSession)stream->data;
882 HdcUARTBase *hUARTBase = (HdcUARTBase *)hSession->classModule;
883 std::lock_guard<std::mutex> lock(hUARTBase->workThreadProcessingData);
884
885 constexpr int bufSize = 1024;
886 char buffer[bufSize] = { 0 };
887 if (nread < 0) {
888 uv_err_name_r(nread, buffer, bufSize);
889 }
890 WRITE_LOG(LOG_DEBUG, "%s sessionId:%u, nread:%zd %s streamSize %zu", __FUNCTION__,
891 hSession->sessionId, nread, buffer,
892 hSession->hUART->streamSize.load());
893 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
894 if (nread <= 0 or nread > signed(hSession->hUART->streamSize)) {
895 WRITE_LOG(LOG_FATAL, "%s nothing need to do ! because no data here", __FUNCTION__);
896 return;
897 }
898 if (hSessionBase->FetchIOBuf(hSession, hSession->ioBuf, nread) < 0) {
899 WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the other side session", __FUNCTION__);
900 // seesion side said the dont understand this seesion data
901 // so we also need tell other side to free it session.
902 hUARTBase->ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex,
903 PKG_OPTION_FREE);
904
905 WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the session", __FUNCTION__);
906 hSessionBase->FreeSession(hSession->sessionId);
907 }
908 hSession->hUART->streamSize -= nread;
909 WRITE_LOG(LOG_DEBUG, "%s sessionId:%u, nread:%d", __FUNCTION__, hSession->sessionId, nread);
910 }
911
ReadyForWorkThread(HSession hSession)912 bool HdcUARTBase::ReadyForWorkThread(HSession hSession)
913 {
914 if (externInterface.UvTcpInit(&hSession->childLoop, &hSession->dataPipe[STREAM_WORK],
915 hSession->dataFd[STREAM_WORK])) {
916 WRITE_LOG(LOG_FATAL, "%s init child TCP failed", __FUNCTION__);
917 return false;
918 }
919 hSession->dataPipe[STREAM_WORK].data = hSession;
920 HdcSessionBase *pSession = (HdcSessionBase *)hSession->classInstance;
921 externInterface.SetTcpOptions(&hSession->dataPipe[STREAM_WORK]);
922 if (externInterface.UvRead((uv_stream_t *)&hSession->dataPipe[STREAM_WORK],
923 pSession->AllocCallback, &HdcUARTBase::ReadDataFromUARTStream)) {
924 WRITE_LOG(LOG_FATAL, "%s child TCP read failed", __FUNCTION__);
925 return false;
926 }
927 WRITE_LOG(LOG_DEBUG, "%s finish", __FUNCTION__);
928 return true;
929 }
930
Restartession(const HSession session)931 void HdcUARTBase::Restartession(const HSession session)
932 {
933 if (session != nullptr) {
934 WRITE_LOG(LOG_FATAL, "%s:%s", __FUNCTION__, session->ToDebugString().c_str());
935 ClearUARTOutMap(session->sessionId);
936 sessionBase.FreeSession(session->sessionId);
937 }
938 }
939
StopSession(HSession hSession)940 void HdcUARTBase::StopSession(HSession hSession)
941 {
942 if (hSession != nullptr) {
943 WRITE_LOG(LOG_WARN, "%s:%s", __FUNCTION__, hSession->ToDebugString().c_str());
944 ClearUARTOutMap(hSession->sessionId);
945 } else {
946 WRITE_LOG(LOG_FATAL, "%s: clean null session", __FUNCTION__);
947 }
948 }
949
Wait()950 void HdcUARTBase::TransferStateMachine::Wait()
951 {
952 std::unique_lock<std::mutex> lock(mutex);
953 WRITE_LOG(LOG_ALL, "%s", __FUNCTION__);
954 if (timeout) {
955 auto waitTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
956 timeoutPoint - std::chrono::steady_clock::now());
957 WRITE_LOG(LOG_ALL, "wait timeout %lld", waitTimeout.count());
958 if (cv.wait_for(lock, waitTimeout, [=] { return requested; }) == false) {
959 // must wait one timeout
960 // because sometime maybe not timeout but we got a request first.
961 timeout = false;
962 WRITE_LOG(LOG_ALL, "timeout");
963 }
964 } else {
965 cv.wait(lock, [=] { return requested; });
966 }
967 requested = false;
968 }
969
HdcUART()970 HdcUART::HdcUART()
971 {
972 #ifdef _WIN32
973 Base::ZeroStruct(ovWrite);
974 ovWrite.hEvent = CreateEvent(NULL, false, false, NULL);
975 Base::ZeroStruct(ovRead);
976 ovRead.hEvent = CreateEvent(NULL, false, false, NULL);
977 #endif
978 }
979
~HdcUART()980 HdcUART::~HdcUART()
981 {
982 #ifdef _WIN32
983 CloseHandle(ovWrite.hEvent);
984 ovWrite.hEvent = NULL;
985 CloseHandle(ovRead.hEvent);
986 ovRead.hEvent = NULL;
987 #endif
988 }
989 } // namespace Hdc
990