• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "uart.h"
16 
17 using namespace std::chrono;
18 namespace Hdc {
19 ExternInterface HdcUARTBase::defaultInterface;
20 
SetTcpOptions(uv_tcp_t * tcpHandle)21 void ExternInterface::SetTcpOptions(uv_tcp_t *tcpHandle)
22 {
23     return Base::SetTcpOptions(tcpHandle);
24 }
25 
SendToStream(uv_stream_t * handleStream,const uint8_t * buf,const int len)26 int ExternInterface::SendToStream(uv_stream_t *handleStream, const uint8_t *buf, const int len)
27 {
28     return Base::SendToStream(handleStream, buf, len);
29 }
30 
UvTcpInit(uv_loop_t * loop,uv_tcp_t * tcp,int socketFd)31 int ExternInterface::UvTcpInit(uv_loop_t *loop, uv_tcp_t *tcp, int socketFd)
32 {
33     if (uv_tcp_init(loop, tcp) == 0) {
34         return uv_tcp_open(tcp, socketFd);
35     } else {
36         return -1;
37     }
38 }
39 
UvRead(uv_stream_t * stream,uv_alloc_cb allocCallBack,uv_read_cb readCallBack)40 int ExternInterface::UvRead(uv_stream_t *stream, uv_alloc_cb allocCallBack, uv_read_cb readCallBack)
41 {
42     return uv_read_start(stream, allocCallBack, readCallBack);
43 }
44 
StartWorkThread(uv_loop_t * loop,uv_work_cb pFuncWorkThread,uv_after_work_cb pFuncAfterThread,void * pThreadData)45 int ExternInterface::StartWorkThread(uv_loop_t *loop, uv_work_cb pFuncWorkThread,
46                                      uv_after_work_cb pFuncAfterThread, void *pThreadData)
47 {
48     return Base::StartWorkThread(loop, pFuncWorkThread, pFuncAfterThread, pThreadData);
49 }
50 
TryCloseHandle(const uv_handle_t * handle,uv_close_cb closeCallBack)51 void ExternInterface::TryCloseHandle(const uv_handle_t *handle, uv_close_cb closeCallBack)
52 {
53     return Base::TryCloseHandle(handle, closeCallBack);
54 }
55 
TimerUvTask(uv_loop_t * loop,void * data,uv_timer_cb cb)56 bool ExternInterface::TimerUvTask(uv_loop_t *loop, void *data, uv_timer_cb cb)
57 {
58     return Base::TimerUvTask(loop, data, cb);
59 }
UvTimerStart(uv_timer_t * handle,uv_timer_cb cb,uint64_t timeout,uint64_t repeat)60 bool ExternInterface::UvTimerStart(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout,
61                                    uint64_t repeat)
62 {
63     return uv_timer_start(handle, cb, timeout, repeat);
64 }
65 
DelayDo(uv_loop_t * loop,const int delayMs,const uint8_t flag,string msg,void * data,DelayCB cb)66 bool ExternInterface::DelayDo(uv_loop_t *loop, const int delayMs, const uint8_t flag, string msg,
67                               void *data, DelayCB cb)
68 {
69     return Base::DelayDo(loop, delayMs, flag, msg, data, cb);
70 }
71 
HdcUARTBase(HdcSessionBase & sessionBaseIn,ExternInterface & interfaceIn)72 HdcUARTBase::HdcUARTBase(HdcSessionBase &sessionBaseIn, ExternInterface &interfaceIn)
73     : externInterface(interfaceIn), sessionBase(sessionBaseIn)
74 {
75     uartOpened = false;
76 }
77 
~HdcUARTBase(void)78 HdcUARTBase::~HdcUARTBase(void) {}
79 
80 #ifndef _WIN32
GetUartSpeed(int speed)81 int HdcUARTBase::GetUartSpeed(int speed)
82 {
83     switch (speed) {
84         case UART_SPEED2400:
85             return (B2400);
86             break;
87         case UART_SPEED4800:
88             return (B4800);
89             break;
90         case UART_SPEED9600:
91             return (B9600);
92             break;
93         case UART_SPEED115200:
94             return (B115200);
95             break;
96         case UART_SPEED921600:
97             return (B921600);
98             break;
99         default:
100             return (B921600);
101             break;
102     }
103 }
GetUartBits(int bits)104 int HdcUARTBase::GetUartBits(int bits)
105 {
106     switch (bits) {
107         case UART_BIT1:
108             return (CS7);
109             break;
110         case UART_BIT2:
111             return (CS8);
112             break;
113         default:
114             return (CS8);
115             break;
116     }
117 }
118 
SetSerial(int fd,int nSpeed,int nBits,char nEvent,int nStop)119 int HdcUARTBase::SetSerial(int fd, int nSpeed, int nBits, char nEvent, int nStop)
120 {
121     struct termios newttys1, oldttys1;
122     if (tcgetattr(fd, &oldttys1) != 0) {
123         constexpr int bufSize = 1024;
124         char buf[bufSize] = { 0 };
125         strerror_r(errno, buf, bufSize);
126         WRITE_LOG(LOG_DEBUG, "tcgetattr failed with %s\n", buf);
127         return ERR_GENERIC;
128     }
129     bzero(&newttys1, sizeof(newttys1));
130     newttys1.c_cflag = GetUartSpeed(nSpeed);
131     newttys1.c_cflag |= (CLOCAL | CREAD);
132     newttys1.c_cflag &= ~CSIZE;
133     newttys1.c_lflag &= ~ICANON;
134     newttys1.c_cflag |= GetUartBits(nBits);
135     switch (nEvent) {
136         case '0':
137             newttys1.c_cflag |= PARENB;
138             newttys1.c_iflag |= (INPCK | ISTRIP);
139             newttys1.c_cflag |= PARODD;
140             break;
141         case 'E':
142             newttys1.c_cflag |= PARENB;
143             newttys1.c_iflag |= (INPCK | ISTRIP);
144             newttys1.c_cflag &= ~PARODD;
145             break;
146         case 'N':
147             newttys1.c_cflag &= ~PARENB;
148             break;
149         default:
150             break;
151     }
152     if (nStop == UART_STOP1) {
153         newttys1.c_cflag &= ~CSTOPB;
154     } else if (nStop == UART_STOP2) {
155         newttys1.c_cflag |= CSTOPB;
156     }
157     newttys1.c_cc[VTIME] = 0;
158     newttys1.c_cc[VMIN] = 0;
159     if (tcflush(fd, TCIOFLUSH)) {
160         WRITE_LOG(LOG_DEBUG, " tcflush error.");
161         return ERR_GENERIC;
162     }
163     if ((tcsetattr(fd, TCSANOW, &newttys1)) != 0) {
164         WRITE_LOG(LOG_DEBUG, " com set error");
165         return ERR_GENERIC;
166     }
167     WRITE_LOG(LOG_DEBUG, " SetSerial OK");
168     return RET_SUCCESS;
169 }
170 #endif // _WIN32
171 
ReadUartDev(std::vector<uint8_t> & readBuf,size_t expectedSize,HdcUART & uart)172 ssize_t HdcUARTBase::ReadUartDev(std::vector<uint8_t> &readBuf, size_t expectedSize, HdcUART &uart)
173 {
174     ssize_t totalBytesRead = 0;
175     uint8_t uartReadBuffer[MAX_UART_SIZE_IOBUF];
176 #ifdef _WIN32
177     DWORD bytesRead = 0;
178 #else
179     ssize_t bytesRead = 0;
180 #endif
181     do {
182         bytesRead = 0;
183 #ifdef _WIN32
184         BOOL bReadStatus = ReadFile(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer),
185                                     &bytesRead, &uart.ovRead);
186         if (!bReadStatus) {
187             if (GetLastError() == ERROR_IO_PENDING) {
188                 bytesRead = 0;
189                 DWORD dwMilliseconds = ReadGiveUpTimeOutTimeMs;
190                 if (expectedSize == 0) {
191                     dwMilliseconds = INFINITE;
192                 }
193                 if (!GetOverlappedResultEx(uart.devUartHandle, &uart.ovRead, &bytesRead,
194                                            dwMilliseconds, FALSE)) {
195                     // wait io failed
196                     DWORD error = GetLastError();
197                     if (error == ERROR_OPERATION_ABORTED) {
198                         totalBytesRead += bytesRead;
199                         WRITE_LOG(LOG_DEBUG, "%s error cancel read. %u %zd", __FUNCTION__,
200                                   bytesRead, totalBytesRead);
201                         // Generally speaking, this is the cacnel caused by freesession
202                         // Returning allows the outer read loop to run again. This checks the exit
203                         // condition.
204                         return totalBytesRead;
205                     } else if (error == WAIT_TIMEOUT) {
206                         totalBytesRead += bytesRead;
207                         WRITE_LOG(LOG_DEBUG, "%s error timeout. %u %zd", __FUNCTION__, bytesRead,
208                                   totalBytesRead);
209                         return totalBytesRead;
210                     } else {
211                         WRITE_LOG(LOG_DEBUG, "%s error wait io:%d.", __FUNCTION__, GetLastError());
212                     }
213                     return -1;
214                 }
215             } else {
216                 // not ERROR_IO_PENDING
217                 WRITE_LOG(LOG_DEBUG, "%s  err:%d. ", __FUNCTION__, GetLastError());
218                 return -1;
219             }
220         }
221 #else
222         int ret = 0;
223         fd_set readFds;
224         FD_ZERO(&readFds);
225         FD_SET(uart.devUartHandle, &readFds);
226         const constexpr int msTous = 1000;
227         struct timeval tv;
228         tv.tv_sec = 0;
229 
230         if (expectedSize == 0) {
231             tv.tv_usec = WaitResponseTimeOutMs * msTous;
232 #ifdef HDC_HOST
233             // only host side need this
234             // in this caes
235             // We need a way to exit from the select for the destruction and recovery of the
236             // serial port read thread.
237             ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
238 #else
239             ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, nullptr);
240 #endif
241         } else {
242             // when we have expect size , we need timeout for link data drop issue
243             tv.tv_usec = ReadGiveUpTimeOutTimeMs * msTous;
244             ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
245         }
246         if (ret == 0 and expectedSize == 0) {
247             // no expect but timeout
248             if (uart.ioCancel) {
249                 WRITE_LOG(LOG_DEBUG, "%s:uart select time out and io cancel", __FUNCTION__);
250                 uart.ioCancel = true;
251                 return totalBytesRead;
252             } else {
253                 continue;
254             }
255         } else if (ret == 0) {
256             WRITE_LOG(LOG_DEBUG, "%s:uart select time out!", __FUNCTION__);
257             // we expected some byte , but not arrive before timeout
258             return totalBytesRead;
259         } else if (ret < 0) {
260             WRITE_LOG(LOG_DEBUG, "%s:uart select error! %d", __FUNCTION__, errno);
261             return -1; // wait failed.
262         } else {
263             // select > 0
264             bytesRead = read(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer));
265             if (bytesRead <= 0) {
266                 // read failed !
267                 WRITE_LOG(LOG_WARN, "%s:read failed! %zd:%d", __FUNCTION__, bytesRead, errno);
268                 return -1;
269             }
270         }
271 #endif
272         if (bytesRead > 0) {
273             readBuf.insert(readBuf.end(), uartReadBuffer, uartReadBuffer + bytesRead);
274             totalBytesRead += bytesRead;
275         }
276     } while (readBuf.size() < expectedSize or
277              bytesRead == 0); // if caller know how many bytes it want
278     return totalBytesRead;
279 }
280 
WriteUartDev(uint8_t * data,const size_t length,HdcUART & uart)281 ssize_t HdcUARTBase::WriteUartDev(uint8_t *data, const size_t length, HdcUART &uart)
282 {
283     ssize_t totalBytesWrite = 0;
284     WRITE_LOG(LOG_ALL, "%s %d data %x %x", __FUNCTION__, length, *(data + sizeof(UartHead)),
285               *(data + sizeof(UartHead) + 1));
286     do {
287 #ifdef _WIN32
288         DWORD bytesWrite = 0;
289         BOOL bWriteStat = WriteFile(uart.devUartHandle, data + totalBytesWrite,
290                                     length - totalBytesWrite, &bytesWrite, &uart.ovWrite);
291         if (!bWriteStat) {
292             if (GetLastError() == ERROR_IO_PENDING) {
293                 if (!GetOverlappedResult(uart.devUartHandle, &uart.ovWrite, &bytesWrite, TRUE)) {
294                     WRITE_LOG(LOG_DEBUG, "%s error wait io:%d. bytesWrite %zu", __FUNCTION__,
295                               GetLastError(), bytesWrite);
296                     return -1;
297                 }
298             } else {
299                 WRITE_LOG(LOG_DEBUG, "%s err:%d. bytesWrite %zu", __FUNCTION__, GetLastError(),
300                           bytesWrite);
301                 return -1;
302             }
303         }
304 #else // not win32
305         ssize_t bytesWrite = 0;
306         bytesWrite = write(uart.devUartHandle, data + totalBytesWrite, length - totalBytesWrite);
307         if (bytesWrite < 0) {
308             if (errno == EINTR or errno == EAGAIN) {
309                 WRITE_LOG(LOG_WARN, "EINTR/EAGAIN, try again");
310                 continue;
311             } else {
312                 // we don't know how to recory in this function
313                 // need reopen device ?
314                 constexpr int bufSize = 1024;
315                 char buf[bufSize] = { 0 };
316                 strerror_r(errno, buf, bufSize);
317                 WRITE_LOG(LOG_FATAL, "write fatal errno %d:%s", errno, buf);
318                 return -1;
319             }
320         } else {
321             // waits until all output written to the object referred to by fd has been transmitted.
322             tcdrain(uart.devUartHandle);
323         }
324 #endif
325         totalBytesWrite += bytesWrite;
326     } while (totalBytesWrite < signed(length));
327 
328     return totalBytesWrite;
329 }
330 
UartToHdcProtocol(uv_stream_t * stream,uint8_t * data,int dataSize)331 int HdcUARTBase::UartToHdcProtocol(uv_stream_t *stream, uint8_t *data, int dataSize)
332 {
333     HSession hSession = (HSession)stream->data;
334     unsigned int fd = hSession->dataFd[STREAM_MAIN];
335     fd_set fdSet;
336     struct timeval timeout = {3, 0};
337     FD_ZERO(&fdSet);
338     FD_SET(fd, &fdSet);
339     int index = 0;
340     int childRet = 0;
341 
342     while (index < dataSize) {
343         childRet = select(fd + 1, NULL, &fdSet, NULL, &timeout);
344         if (childRet <= 0) {
345             constexpr int bufSize = 1024;
346             char buf[bufSize] = { 0 };
347 #ifdef _WIN32
348             strerror_s(buf, bufSize, errno);
349 #else
350             strerror_r(errno, buf, bufSize);
351 #endif
352             WRITE_LOG(LOG_FATAL, "%s select error:%d [%s][%d]", __FUNCTION__, errno,
353                       buf, childRet);
354             break;
355         }
356         childRet = send(fd, (const char *)data + index, dataSize - index, 0);
357         if (childRet < 0) {
358             constexpr int bufSize = 1024;
359             char buf[bufSize] = { 0 };
360 #ifdef _WIN32
361             strerror_s(buf, bufSize, errno);
362 #else
363             strerror_r(errno, buf, bufSize);
364 #endif
365             WRITE_LOG(LOG_FATAL, "%s senddata err:%d [%s]", __FUNCTION__, errno, buf);
366             break;
367         }
368         index += childRet;
369     }
370     if (index != dataSize) {
371         WRITE_LOG(LOG_FATAL, "%s partialsenddata err:%d [%d]", __FUNCTION__, index, dataSize);
372         return ERR_IO_FAIL;
373     }
374     return index;
375 }
376 
DispatchToWorkThread(HSession hSession,uint8_t * readBuf,int readBytes)377 RetErrCode HdcUARTBase::DispatchToWorkThread(HSession hSession, uint8_t *readBuf, int readBytes)
378 {
379     if (hSession == nullptr) {
380         return ERR_SESSION_NOFOUND;
381     }
382     if (!UartSendToHdcStream(hSession, readBuf, readBytes)) {
383         return ERR_IO_FAIL;
384     }
385     return RET_SUCCESS;
386 }
387 
PackageProcess(vector<uint8_t> & data,HSession hSession)388 size_t HdcUARTBase::PackageProcess(vector<uint8_t> &data, HSession hSession)
389 {
390     while (data.size() >= sizeof(UartHead)) {
391         // is size more than one head
392         size_t packetSize = 0;
393         uint32_t sessionId = 0;
394         uint32_t packageIndex = 0;
395         // we erase all buffer. wait next read.
396         if (ValidateUartPacket(data, sessionId, packageIndex, packetSize) != RET_SUCCESS) {
397             WRITE_LOG(LOG_WARN, "%s package error. clean the read buffer.", __FUNCTION__);
398             data.clear();
399         } else if (packetSize == sizeof(UartHead)) {
400             // nothing need to send, this is a head only package
401             // only used in link layer
402             WRITE_LOG(LOG_ALL, "%s headonly Package(%zu). dont send to session, erase it",
403                       __FUNCTION__, packetSize);
404         } else {
405             // at least we got one package
406             // if the size of packge have all received ?
407             if (data.size() >= packetSize) {
408                 // send the data to logic level (link to logic)
409                 if (hSession == nullptr) {
410 #ifdef HDC_HOST
411                     hSession = GetSession(sessionId);
412 #else
413                     // for daemon side we can make a new session for it
414                     hSession = GetSession(sessionId, true);
415 #endif
416                 }
417                 if (hSession == nullptr) {
418                     WRITE_LOG(LOG_WARN, "%s have not found seesion (%u). skip it", __FUNCTION__, sessionId);
419                 } else {
420                     if (hSession->hUART->dispatchedPackageIndex == packageIndex) {
421                         // we need check if the duplication pacakge we have already send
422                         WRITE_LOG(LOG_WARN, "%s dup package %u, skip send to session logic",
423                                   __FUNCTION__, packageIndex);
424                     } else {
425                         // update the last package we will send to hdc
426                         hSession->hUART->dispatchedPackageIndex = packageIndex;
427                         RetErrCode ret = DispatchToWorkThread(hSession, data.data(), packetSize);
428                         if (ret == RET_SUCCESS) {
429                             WRITE_LOG(LOG_DEBUG, "%s DispatchToWorkThread successful",
430                                       __FUNCTION__);
431                         } else {
432                             // send to logic failed.
433                             // this kind of issue unable handle in link layer
434                             WRITE_LOG(LOG_FATAL,
435                                       "%s DispatchToWorkThread fail %d. requeset free session in "
436                                       "other side",
437                                       __FUNCTION__, ret);
438                             ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex,
439                                               PKG_OPTION_FREE);
440                         }
441                     }
442                 }
443             } else {
444                 WRITE_LOG(LOG_DEBUG, "%s valid package, however size not enough. expect %zu",
445                           __FUNCTION__, packetSize);
446                 return packetSize;
447             }
448         }
449 
450         if (data.size() >= packetSize) {
451             data.erase(data.begin(), data.begin() + packetSize);
452         } else {
453             // dont clean , should merge with next package
454         }
455         WRITE_LOG(LOG_DEBUG, "PackageProcess data.size():%d left", data.size());
456     }
457     // if we have at least one byte, we think there should be a head
458     return data.size() > 1 ? sizeof(UartHead) : 0;
459 }
460 
SendUARTRaw(HSession hSession,uint8_t * data,const size_t length)461 bool HdcUARTBase::SendUARTRaw(HSession hSession, uint8_t *data, const size_t length)
462 {
463     struct UartHead *uartHeader = (struct UartHead *)data;
464 #ifndef HDC_HOST
465     // review nobody can plug out the daemon uart , if we still need split write in daemon side?
466     HdcUART deamonUart;
467     deamonUart.devUartHandle = uartHandle;
468     if (uartHeader->IsResponsePackage()) {
469         // for the response package and in daemon side,
470         // we dont need seesion info
471         ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
472         return sendBytes > 0;
473     }
474 #endif
475 
476     // for normal package
477     if (hSession == nullptr) {
478         hSession = GetSession(uartHeader->sessionId);
479         if (hSession == nullptr) {
480             // session is not found
481             WRITE_LOG(LOG_WARN, "%s hSession not found:%zu", __FUNCTION__, uartHeader->sessionId);
482             return false;
483         }
484     }
485     hSession->ref++;
486     WRITE_LOG(LOG_DEBUG, "%s length:%d", __FUNCTION__, length);
487 #ifdef HDC_HOST
488     ssize_t sendBytes = WriteUartDev(data, length, *hSession->hUART);
489 #else
490     ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
491 #endif
492     WRITE_LOG(LOG_DEBUG, "%s sendBytes %zu", __FUNCTION__, sendBytes);
493     if (sendBytes < 0) {
494         WRITE_LOG(LOG_DEBUG, "%s send fail. try to freesession", __FUNCTION__);
495         OnTransferError(hSession);
496     }
497     hSession->ref--;
498     return sendBytes > 0;
499 }
500 
501 // this function will not check the data correct again
502 // just send the data to hdc session side
UartSendToHdcStream(HSession hSession,uint8_t * data,size_t size)503 bool HdcUARTBase::UartSendToHdcStream(HSession hSession, uint8_t *data, size_t size)
504 {
505     WRITE_LOG(LOG_DEBUG, "%s send to session %s package size %zu", __FUNCTION__,
506               hSession->ToDebugString().c_str(), size);
507 
508     int ret = RET_SUCCESS;
509 
510     if (size < sizeof(UartHead)) {
511         WRITE_LOG(LOG_FATAL, "%s buf size too small %zu", __FUNCTION__, size);
512         return ERR_BUF_SIZE;
513     }
514 
515     UartHead *head = reinterpret_cast<UartHead *>(data);
516     WRITE_LOG(LOG_DEBUG, "%s uartHeader:%s data: %x %x", __FUNCTION__,
517               head->ToDebugString().c_str(), *(data + sizeof(UartHead)),
518               *(data + sizeof(UartHead) + 1));
519 
520     // review need check logic again here or err process
521     if (head->sessionId != hSession->sessionId) {
522         if (hSession->serverOrDaemon && !hSession->hUART->resetIO) {
523             WRITE_LOG(LOG_FATAL, "%s sessionId not matched, reset sessionId:%d.", __FUNCTION__,
524                       head->sessionId);
525             SendUartSoftReset(hSession, head->sessionId);
526             hSession->hUART->resetIO = true;
527             ret = ERR_IO_SOFT_RESET;
528             // dont break ,we need rease these data in recv buffer
529         }
530     } else {
531         //  data to session
532         hSession->hUART->streamSize += head->dataSize; // this is only for debug,
533         WRITE_LOG(LOG_ALL, "%s stream wait session read size: %zu", __FUNCTION__,
534                   hSession->hUART->streamSize.load());
535         if (UartToHdcProtocol(reinterpret_cast<uv_stream_t *>(&hSession->dataPipe[STREAM_MAIN]),
536                               data + sizeof(UartHead), head->dataSize) < 0) {
537             ret = ERR_IO_FAIL;
538             WRITE_LOG(LOG_FATAL, "%s Error uart send to stream", __FUNCTION__);
539         }
540     }
541 
542     return ret == RET_SUCCESS;
543 }
544 
NotifyTransfer()545 void HdcUARTBase::NotifyTransfer()
546 {
547     WRITE_LOG(LOG_DEBUG, "%s", __FUNCTION__);
548     transfer.Request();
549 }
550 
551 /*
552 here we have a HandleOutputPkg vector
553 It is used to maintain the data reliability of the link layer
554 It consists of the following part
555 Log data to send (caller thread)                        --> RequestSendPackage
556 Send recorded data (loop sending thread)                --> SendPkgInUARTOutMap
557 Process the returned reply data (loop reading thread)   --> ProcessResponsePackage
558 Send reply packet (loop reading thread)                 --> ResponseUartTrans
559 
560 The key scenarios are as follows:
561 Package is sent from side A to side B
562 Here we call the complete data package
563 package is divided into head and data
564 The response information is in the header.
565 data contains binary data.
566 
567 case 1: Normal Process
568     package
569 A   -->   B
570     ACK
571 A   <--   B
572 
573 case 2: packet is incorrect
574 At least one header must be received
575 For this the B side needs to have an accept timeout.
576 There is no new data within a certain period of time as the end of the packet.
577 (This mechanism is not handled in HandleOutputPkg retransmission)
578 
579     incorrect
580 A   -->   B
581 B sends NAK and A resends the packet.
582     NAK
583 A   <--   B
584     package resend
585 A   -->   B
586 
587 case 3: packet is complete lost()
588     package(complete lost)
589 A   -x->   B
590 The A side needs to resend the Package after a certain timeout
591 A   -->   B
592 Until the B side has a data report (ACK or NAK), or the number of retransmissions reaches the upper
593 limit.
594 */
RequestSendPackage(uint8_t * data,const size_t length,bool queue)595 void HdcUARTBase::RequestSendPackage(uint8_t *data, const size_t length, bool queue)
596 {
597     UartHead *head = reinterpret_cast<UartHead *>(data);
598     bool response = head->IsResponsePackage();
599 
600     if (queue) {
601         slots.Wait(head->sessionId);
602     }
603 
604     std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
605 
606     std::string pkgId = head->ToPkgIdentityString(response);
607     auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
608     if (it == outPkgs.end()) {
609         // update che checksum , both head and data
610         head->UpdateCheckSum();
611         outPkgs.emplace_back(pkgId, head->sessionId, data, length, response,
612                              head->option & PKG_OPTION_ACK);
613         WRITE_LOG(LOG_DEBUG, "UartPackageManager: add pkg %s (pkgs size %zu)",
614                   head->ToDebugString().c_str(), outPkgs.size());
615     } else {
616         WRITE_LOG(LOG_FATAL, "UartPackageManager: add pkg %s fail, %s has already been exist.",
617                   head->ToDebugString().c_str(), pkgId.c_str());
618     }
619     NotifyTransfer();
620 }
621 
ProcessResponsePackage(const UartHead & head)622 void HdcUARTBase::ProcessResponsePackage(const UartHead &head)
623 {
624     std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
625     bool ack = head.option & PKG_OPTION_ACK;
626     // response package
627     std::string pkgId = head.ToPkgIdentityString();
628     WRITE_LOG(LOG_ALL, "UartPackageManager: got response pkgId:%s ack:%d.", pkgId.c_str(), ack);
629 
630     auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
631     if (it != outPkgs.end()) {
632         if (ack) { // response ACK.
633             slots.Free(it->sessionId);
634             outPkgs.erase(it);
635             WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkgId:%s.", pkgId.c_str());
636         } else {                           // response NAK
637             it->pkgStatus = PKG_WAIT_SEND; // Re send the pkg
638             WRITE_LOG(LOG_WARN, "UartPackageManager: resend pkgId:%s.", pkgId.c_str());
639         }
640     } else {
641         WRITE_LOG(LOG_FATAL, "UartPackageManager: hasn't found pkg for pkgId:%s.", pkgId.c_str());
642         for (auto pkg : outPkgs) {
643             WRITE_LOG(LOG_ALL, "UartPackageManager:  pkgId:%s.", pkg.key.c_str());
644         }
645     }
646     NotifyTransfer();
647     return;
648 }
649 
SendPkgInUARTOutMap()650 void HdcUARTBase::SendPkgInUARTOutMap()
651 {
652     std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
653     if (outPkgs.empty()) {
654         WRITE_LOG(LOG_ALL, "UartPackageManager: No pkgs needs to be sent.");
655         return;
656     }
657     WRITE_LOG(LOG_DEBUG, "UartPackageManager: send pkgs, have:%zu pkgs", outPkgs.size());
658     // we have maybe more than one session
659     // each session has it owner serial port
660     std::unordered_set<uint32_t> hasWaitPkg;
661     auto it = outPkgs.begin();
662     while (it != outPkgs.end()) {
663         if (it->pkgStatus == PKG_WAIT_SEND) {
664             // we found a pkg wait for send
665             // if a response package
666             // response package always send nowait noorder
667             if (!it->response and hasWaitPkg.find(it->sessionId) != hasWaitPkg.end()) {
668                 // this is not a response package
669                 // and this session is wait response
670                 // so we can send nothing
671                 // process next
672                 it++;
673                 continue;
674             }
675             // we will ready to send the package
676             WRITE_LOG(LOG_DEBUG, "UartPackageManager: send pkg %s", it->ToDebugString().c_str());
677             SendUARTRaw(nullptr, it->msgSendBuf.data(), it->msgSendBuf.size());
678             if (it->response) {
679                 // response pkg dont need wait response again.
680                 WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkg %s",
681                           it->ToDebugString().c_str());
682                 it = outPkgs.erase(it);
683                 continue;
684             } else {
685                 // normal send package
686                 it->pkgStatus = PKG_WAIT_RESPONSE;
687                 it->sendTimePoint = steady_clock::now();
688                 hasWaitPkg.emplace(it->sessionId);
689                 transfer.Sent(); // something is sendout, transfer will timeout for next wait.
690             }
691         } else if (it->pkgStatus == PKG_WAIT_RESPONSE) {
692             // we found a pkg wiat for response
693             auto elapsedTime = duration_cast<milliseconds>(steady_clock::now() - it->sendTimePoint);
694             WRITE_LOG(LOG_DEBUG, "UartPackageManager: pkg:%s is wait ACK. elapsedTime %lld",
695                       it->ToDebugString().c_str(), (long long)elapsedTime.count());
696             if (elapsedTime.count() >= WaitResponseTimeOutMs) {
697                 // check the response timeout
698                 if (it->retryChance > 0) {
699                     // if it send timeout, resend it again.
700                     WRITE_LOG(LOG_WARN, "UartPackageManager: pkg:%s try resend it.",
701                               it->ToDebugString().c_str());
702                     it->pkgStatus = PKG_WAIT_SEND;
703                     it->retryChance--;
704                     NotifyTransfer(); // make transfer reschedule
705                     break;            // dont process anything now.
706                 } else {
707                     // the response it timeout and retry counx is 0
708                     // the link maybe not stable
709                     // let's free this session
710                     WRITE_LOG(LOG_WARN, "UartPackageManager: reach max retry ,free the seesion %u",
711                               it->sessionId);
712                     OnTransferError(GetSession(it->sessionId));
713                     // dont reschedule here
714                     // wait next schedule from this path
715                     // OnTransferError -> FreeSession -> ClearUARTOutMap -> NotifyTransfer
716                     break;
717                 }
718             }
719             hasWaitPkg.emplace(it->sessionId);
720         }
721         it++; // next package
722     }
723     WRITE_LOG(LOG_DEBUG, "UartPackageManager: send finish, have %zu pkgs", outPkgs.size());
724 }
725 
ClearUARTOutMap(uint32_t sessionId)726 void HdcUARTBase::ClearUARTOutMap(uint32_t sessionId)
727 {
728     WRITE_LOG(LOG_DEBUG, "%s UartPackageManager clean for sessionId %u", __FUNCTION__, sessionId);
729     size_t erased = 0;
730     std::lock_guard<std::recursive_mutex> lock(mapOutPkgsMutex);
731     auto it = outPkgs.begin();
732     while (it != outPkgs.end()) {
733         if (it->sessionId == sessionId) {
734             if (!it->response) {
735                 slots.Free(it->sessionId);
736             }
737             it = outPkgs.erase(it);
738             erased++;
739         } else {
740             it++;
741         }
742     }
743     WRITE_LOG(LOG_DEBUG, "%s erased %zu", __FUNCTION__, erased);
744 
745     NotifyTransfer(); // tell transfer we maybe have some change
746 }
747 
EnsureAllPkgsSent()748 void HdcUARTBase::EnsureAllPkgsSent()
749 {
750     WRITE_LOG(LOG_DEBUG, "%s", __FUNCTION__);
751     slots.WaitFree();
752     if (!outPkgs.empty()) {
753         std::this_thread::sleep_for(1000ms);
754     }
755     WRITE_LOG(LOG_DEBUG, "%s done.", __FUNCTION__);
756 }
757 
ValidateUartPacket(vector<uint8_t> & data,uint32_t & sessionId,uint32_t & packageIndex,size_t & packetSize)758 RetErrCode HdcUARTBase::ValidateUartPacket(vector<uint8_t> &data, uint32_t &sessionId,
759                                            uint32_t &packageIndex, size_t &packetSize)
760 {
761     constexpr auto maxBufFactor = 1;
762     struct UartHead *head = (struct UartHead *)data.data();
763     WRITE_LOG(LOG_DEBUG, "%s %s", __FUNCTION__, head->ToDebugString().c_str());
764 
765     if (memcmp(head->flag, PACKET_FLAG.c_str(), PACKET_FLAG.size()) != 0) {
766         WRITE_LOG(LOG_FATAL, "%s,PACKET_FLAG not correct %x %x", __FUNCTION__, head->flag[0],
767                   head->flag[1]);
768         return ERR_BUF_CHECK;
769     }
770 
771     if (!head->ValidateHead()) {
772         WRITE_LOG(LOG_FATAL, "%s head checksum not correct", __FUNCTION__);
773         return ERR_BUF_CHECK;
774     }
775     // after validate , id and fullPackageLength is correct
776     sessionId = head->sessionId;
777     packetSize = head->dataSize + sizeof(UartHead);
778     packageIndex = head->packageIndex;
779 
780     if ((head->dataSize + sizeof(UartHead)) > MAX_UART_SIZE_IOBUF * maxBufFactor) {
781         WRITE_LOG(LOG_FATAL, "%s dataSize too larger:%d", __FUNCTION__, head->dataSize);
782         return ERR_BUF_OVERFLOW;
783     }
784 
785     if ((head->option & PKG_OPTION_RESET)) {
786         // The Host end program is restarted, but the UART cable is still connected
787         WRITE_LOG(LOG_WARN, "%s host side want restart daemon, restart old sessionId:%u",
788                   __FUNCTION__, head->sessionId);
789         ResetOldSession(head->sessionId);
790         return ERR_IO_SOFT_RESET;
791     }
792 
793     if ((head->option & PKG_OPTION_FREE)) {
794         // other side tell us the session need reset
795         // we should free it
796         WRITE_LOG(LOG_WARN, "%s other side tell us the session need free:%u", __FUNCTION__,
797                   head->sessionId);
798         Restartession(GetSession(head->sessionId));
799     }
800 
801     // check data
802     if (data.size() >= packetSize) {
803         // if we have full package now ?
804         if (!head->ValidateData()) {
805             WRITE_LOG(LOG_FATAL, "%s data checksum not correct", __FUNCTION__);
806             return ERR_BUF_CHECK;
807         }
808         if (head->IsResponsePackage()) {
809             // response package
810             ProcessResponsePackage(*head);
811         } else {
812             // link layer response for no response package
813             ResponseUartTrans(head->sessionId, head->packageIndex, PKG_OPTION_ACK);
814         }
815     }
816 
817     return RET_SUCCESS;
818 }
819 
ResponseUartTrans(uint32_t sessionId,uint32_t packageIndex,UartProtocolOption option)820 void HdcUARTBase::ResponseUartTrans(uint32_t sessionId, uint32_t packageIndex,
821                                     UartProtocolOption option)
822 {
823     UartHead uartHeader(sessionId, option, 0, packageIndex);
824     WRITE_LOG(LOG_DEBUG, "%s option:%u", __FUNCTION__, option);
825     RequestSendPackage(reinterpret_cast<uint8_t *>(&uartHeader), sizeof(UartHead), false);
826 }
827 
SendUARTData(HSession hSession,uint8_t * data,const size_t length)828 int HdcUARTBase::SendUARTData(HSession hSession, uint8_t *data, const size_t length)
829 {
830     constexpr int maxIOSize = MAX_UART_SIZE_IOBUF;
831     WRITE_LOG(LOG_DEBUG, "SendUARTData hSession:%u, total length:%d", hSession->sessionId, length);
832     const int packageDataMaxSize = maxIOSize - sizeof(UartHead);
833     size_t offset = 0;
834     uint8_t sendDataBuf[MAX_UART_SIZE_IOBUF];
835 
836     WRITE_LOG(LOG_ALL, "SendUARTData data length :%d", length);
837 
838     do {
839         UartHead *head = (UartHead *)sendDataBuf;
840         if (memset_s(head, sizeof(UartHead), 0, sizeof(UartHead)) != EOK) {
841             return ERR_BUF_RESET;
842         }
843         if (memcpy_s(head->flag, sizeof(head->flag), PACKET_FLAG.c_str(), PACKET_FLAG.size()) !=
844             EOK) {
845             return ERR_BUF_COPY;
846         }
847         head->sessionId = hSession->sessionId;
848         head->packageIndex = ++hSession->hUART->packageIndex;
849 
850         int RemainingDataSize = length - offset;
851         if (RemainingDataSize > packageDataMaxSize) {
852             // more than one package max data size
853             head->dataSize = static_cast<uint16_t>(packageDataMaxSize);
854         } else {
855             // less then the max size
856             head->dataSize = static_cast<uint16_t>(RemainingDataSize);
857             // this is the last package . all the data will send after this time
858             head->option = head->option | PKG_OPTION_TAIL;
859         }
860 #ifdef UART_FULL_LOG
861         WRITE_LOG(LOG_FULL, "offset %d length %d", offset, length);
862 #endif
863         uint8_t *payload = sendDataBuf + sizeof(UartHead);
864         if (EOK !=
865             memcpy_s(payload, packageDataMaxSize, (uint8_t *)data + offset, head->dataSize)) {
866             WRITE_LOG(LOG_FATAL, "memcpy_s failed max %zu , need %zu",
867                       packageDataMaxSize, head->dataSize);
868             return ERR_BUF_COPY;
869         }
870         offset += head->dataSize;
871         int packageFullSize = sizeof(UartHead) + head->dataSize;
872         WRITE_LOG(LOG_ALL, "SendUARTData =============> %s", head->ToDebugString().c_str());
873         RequestSendPackage(sendDataBuf, packageFullSize);
874     } while (offset != length);
875 
876     return offset;
877 }
878 
ReadDataFromUARTStream(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)879 void HdcUARTBase::ReadDataFromUARTStream(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
880 {
881     HSession hSession = (HSession)stream->data;
882     HdcUARTBase *hUARTBase = (HdcUARTBase *)hSession->classModule;
883     std::lock_guard<std::mutex> lock(hUARTBase->workThreadProcessingData);
884 
885     constexpr int bufSize = 1024;
886     char buffer[bufSize] = { 0 };
887     if (nread < 0) {
888         uv_err_name_r(nread, buffer, bufSize);
889     }
890     WRITE_LOG(LOG_DEBUG, "%s sessionId:%u, nread:%zd %s streamSize %zu", __FUNCTION__,
891               hSession->sessionId, nread, buffer,
892               hSession->hUART->streamSize.load());
893     HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
894     if (nread <= 0 or nread > signed(hSession->hUART->streamSize)) {
895         WRITE_LOG(LOG_FATAL, "%s nothing need to do ! because no data here", __FUNCTION__);
896         return;
897     }
898     if (hSessionBase->FetchIOBuf(hSession, hSession->ioBuf, nread) < 0) {
899         WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the other side session", __FUNCTION__);
900         // seesion side said the dont understand this seesion data
901         // so we also need tell other side to free it session.
902         hUARTBase->ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex,
903                                      PKG_OPTION_FREE);
904 
905         WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the session", __FUNCTION__);
906         hSessionBase->FreeSession(hSession->sessionId);
907     }
908     hSession->hUART->streamSize -= nread;
909     WRITE_LOG(LOG_DEBUG, "%s sessionId:%u, nread:%d", __FUNCTION__, hSession->sessionId, nread);
910 }
911 
ReadyForWorkThread(HSession hSession)912 bool HdcUARTBase::ReadyForWorkThread(HSession hSession)
913 {
914     if (externInterface.UvTcpInit(&hSession->childLoop, &hSession->dataPipe[STREAM_WORK],
915                                   hSession->dataFd[STREAM_WORK])) {
916         WRITE_LOG(LOG_FATAL, "%s init child TCP failed", __FUNCTION__);
917         return false;
918     }
919     hSession->dataPipe[STREAM_WORK].data = hSession;
920     HdcSessionBase *pSession = (HdcSessionBase *)hSession->classInstance;
921     externInterface.SetTcpOptions(&hSession->dataPipe[STREAM_WORK]);
922     if (externInterface.UvRead((uv_stream_t *)&hSession->dataPipe[STREAM_WORK],
923                                pSession->AllocCallback, &HdcUARTBase::ReadDataFromUARTStream)) {
924         WRITE_LOG(LOG_FATAL, "%s child TCP read failed", __FUNCTION__);
925         return false;
926     }
927     WRITE_LOG(LOG_DEBUG, "%s finish", __FUNCTION__);
928     return true;
929 }
930 
Restartession(const HSession session)931 void HdcUARTBase::Restartession(const HSession session)
932 {
933     if (session != nullptr) {
934         WRITE_LOG(LOG_FATAL, "%s:%s", __FUNCTION__, session->ToDebugString().c_str());
935         ClearUARTOutMap(session->sessionId);
936         sessionBase.FreeSession(session->sessionId);
937     }
938 }
939 
StopSession(HSession hSession)940 void HdcUARTBase::StopSession(HSession hSession)
941 {
942     if (hSession != nullptr) {
943         WRITE_LOG(LOG_WARN, "%s:%s", __FUNCTION__, hSession->ToDebugString().c_str());
944         ClearUARTOutMap(hSession->sessionId);
945     } else {
946         WRITE_LOG(LOG_FATAL, "%s: clean null session", __FUNCTION__);
947     }
948 }
949 
Wait()950 void HdcUARTBase::TransferStateMachine::Wait()
951 {
952     std::unique_lock<std::mutex> lock(mutex);
953     WRITE_LOG(LOG_ALL, "%s", __FUNCTION__);
954     if (timeout) {
955         auto waitTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
956             timeoutPoint - std::chrono::steady_clock::now());
957         WRITE_LOG(LOG_ALL, "wait timeout %lld", waitTimeout.count());
958         if (cv.wait_for(lock, waitTimeout, [=] { return requested; }) == false) {
959             // must wait one timeout
960             // because sometime maybe not timeout but we got a request first.
961             timeout = false;
962             WRITE_LOG(LOG_ALL, "timeout");
963         }
964     } else {
965         cv.wait(lock, [=] { return requested; });
966     }
967     requested = false;
968 }
969 
HdcUART()970 HdcUART::HdcUART()
971 {
972 #ifdef _WIN32
973     Base::ZeroStruct(ovWrite);
974     ovWrite.hEvent = CreateEvent(NULL, false, false, NULL);
975     Base::ZeroStruct(ovRead);
976     ovRead.hEvent = CreateEvent(NULL, false, false, NULL);
977 #endif
978 }
979 
~HdcUART()980 HdcUART::~HdcUART()
981 {
982 #ifdef _WIN32
983     CloseHandle(ovWrite.hEvent);
984     ovWrite.hEvent = NULL;
985     CloseHandle(ovRead.hEvent);
986     ovRead.hEvent = NULL;
987 #endif
988 }
989 } // namespace Hdc
990