• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20 
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24     // print version pid
25     WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26     // server/daemon common initialization code
27     if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28         uvThreadSize = SIZE_THREAD_POOL_MIN;
29     } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30         uvThreadSize = SIZE_THREAD_POOL_MAX;
31     }
32     threadPoolCount = uvThreadSize;
33     WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34     string uvThreadEnv("UV_THREADPOOL_SIZE");
35     string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37     uvThreadEnv += "=";
38     uvThreadEnv += uvThreadVal;
39     _putenv(uvThreadEnv.c_str());
40 #else
41     setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43     uv_loop_init(&loopMain);
44     WRITE_LOG(LOG_DEBUG, "loopMain init");
45     uv_rwlock_init(&mainAsync);
46 #ifndef FUZZ_TEST
47     uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
48 #endif
49     uv_rwlock_init(&lockMapSession);
50     serverOrDaemon = serverOrDaemonIn;
51     ctxUSB = nullptr;
52     wantRestart = false;
53     threadSessionMain = uv_thread_self();
54 
55 #ifdef HDC_HOST
56     if (serverOrDaemon) {
57         if (libusb_init((libusb_context **)&ctxUSB) != 0) {
58             ctxUSB = nullptr;
59             WRITE_LOG(LOG_FATAL, "libusb_init failed ctxUSB is nullptr");
60         }
61     }
62 #endif
63 }
64 
~HdcSessionBase()65 HdcSessionBase::~HdcSessionBase()
66 {
67 #ifndef FUZZ_TEST
68     Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
69 #endif
70     uv_loop_close(&loopMain);
71     // clear base
72     uv_rwlock_destroy(&mainAsync);
73     uv_rwlock_destroy(&lockMapSession);
74 #ifdef HDC_HOST
75     if (serverOrDaemon and ctxUSB != nullptr) {
76         libusb_exit((libusb_context *)ctxUSB);
77     }
78 #endif
79     WRITE_LOG(LOG_WARN, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
80               serverOrDaemon ? "server" : "daemon");
81 }
82 
83 // remove step2
TryRemoveTask(HTaskInfo hTask)84 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
85 {
86     if (hTask->taskFree) {
87         WRITE_LOG(LOG_WARN, "TryRemoveTask channelId:%u", hTask->channelId);
88         return true;
89     }
90     bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
91     if (ret) {
92         hTask->taskFree = true;
93     } else {
94         // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
95         // here to see which task has not been released
96         // print task clear
97     }
98     return ret;
99 }
100 
101 // remove step1
BeginRemoveTask(HTaskInfo hTask)102 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
103 {
104     StartTraceScope("HdcSessionBase::BeginRemoveTask");
105     if (hTask->taskStop || hTask->taskFree) {
106         WRITE_LOG(LOG_WARN, "BeginRemoveTask channelId:%u taskStop:%d taskFree:%d",
107             hTask->channelId, hTask->taskStop, hTask->taskFree);
108         return;
109     }
110 
111     WRITE_LOG(LOG_WARN, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
112     auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
113         StartTraceScope("HdcSessionBase::BeginRemoveTask taskClassDeleteRetry");
114         HTaskInfo hTask = (HTaskInfo)handle->data;
115         HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
116         if (hTask->isCleared == false) {
117             hTask->isCleared = true;
118             WRITE_LOG(LOG_WARN, "taskClassDeleteRetry start clear task, taskType:%d cid:%u sid:%u",
119                 hTask->taskType, hTask->channelId, hTask->sessionId);
120             bool ret = thisClass->RemoveInstanceTask(OP_CLEAR, hTask);
121             if (!ret) {
122                 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry RemoveInstanceTask return false taskType:%d cid:%u sid:%u",
123                     hTask->taskType, hTask->channelId, hTask->sessionId);
124             }
125         }
126 
127         constexpr uint32_t count = 1000;
128         if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
129             WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
130                 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
131             hTask->closeRetryCount = 1;
132         }
133         hTask->closeRetryCount++;
134         if (!thisClass->TryRemoveTask(hTask)) {
135             WRITE_LOG(LOG_WARN, "TaskDelay TryRemoveTask false channelId:%u", hTask->channelId);
136             return;
137         }
138         WRITE_LOG(LOG_WARN, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
139         if (hTask != nullptr) {
140             delete hTask;
141             hTask = nullptr;
142         }
143         thisClass->taskCount--;
144         Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
145     };
146     Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
147 
148     hTask->taskStop = true;
149 }
150 
151 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
152 // after the end of the LOOP.
153 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
154 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)155 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
156 {
157     // First case: normal task cleanup process (STOP Remove)
158     // Second: The task is cleaned up, the session ends
159     // Third: The task is cleaned up, and the session is directly over the session.
160     StartTraceScope("HdcSessionBase::ClearOwnTasks");
161     hSession->mapTaskMutex.lock();
162     map<uint32_t, HTaskInfo>::iterator iter;
163     taskCount = hSession->mapTask->size();
164     for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
165         uint32_t channelId = iter->first;
166         HTaskInfo hTask = iter->second;
167         if (channelIDInput != 0) {  // single
168             if (channelIDInput != channelId) {
169                 ++iter;
170                 continue;
171             }
172             BeginRemoveTask(hTask);
173             WRITE_LOG(LOG_WARN, "ClearOwnTasks OP_CLEAR finish, sessionId:%u channelIDInput:%u",
174                 hSession->sessionId, channelIDInput);
175             iter = hSession->mapTask->erase(iter);
176             break;
177         }
178         // multi
179         BeginRemoveTask(hTask);
180         iter = hSession->mapTask->erase(iter);
181     }
182     hSession->mapTaskMutex.unlock();
183 }
184 
ClearSessions()185 void HdcSessionBase::ClearSessions()
186 {
187     // no need to lock mapSession
188     // broadcast free signal
189     for (auto v : mapSession) {
190         HSession hSession = (HSession)v.second;
191         if (!hSession->isDead) {
192             FreeSession(hSession->sessionId);
193         }
194     }
195 }
196 
ReMainLoopForInstanceClear()197 void HdcSessionBase::ReMainLoopForInstanceClear()
198 {  // reloop
199     StartTraceScope("HdcSessionBase::ReMainLoopForInstanceClear");
200     auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
201         HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
202         if (thisClass->sessionRef > 0) {
203             return;
204         }
205         // all task has been free
206         uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
207         uv_stop(&thisClass->loopMain);
208     };
209     Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
210     uv_run(&loopMain, UV_RUN_DEFAULT);
211 };
212 
213 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)214 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
215 {
216     uv_rwlock_rdlock(&lockMapSession);
217     map<uint32_t, HSession>::iterator i;
218     for (i = mapSession.begin(); i != mapSession.end(); ++i) {
219         HSession hs = i->second;
220         if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
221             continue;
222         }
223         kickOut(hs);
224         break;
225     }
226     uv_rwlock_rdunlock(&lockMapSession);
227 }
228 #endif
229 
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))230 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
231 {
232     if (!pCallBack) {
233         return;
234     }
235     uv_rwlock_rdlock(&lockMapSession);
236     map<uint32_t, HSession>::iterator i;
237     for (i = mapSession.begin(); i != mapSession.end(); ++i) {
238         HSession hs = i->second;
239         if (hs->connType != CONN_USB) {
240             continue;
241         }
242         if (hs->hUSB == nullptr) {
243             continue;
244         }
245         if (pCallBack) {
246             pCallBack(hs);
247         }
248         break;
249     }
250     uv_rwlock_rdunlock(&lockMapSession);
251 }
252 
253 // The PC side gives the device information, determines if the USB device is registered
254 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)255 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
256 {
257 #ifdef HDC_HOST
258     libusb_device *dev = (libusb_device *)pDev;
259     HSession hResult = nullptr;
260     if (!mapSession.size()) {
261         return nullptr;
262     }
263     uint8_t busId = 0;
264     uint8_t devId = 0;
265     if (pDev) {
266         busId = libusb_get_bus_number(dev);
267         devId = libusb_get_device_address(dev);
268     } else {
269         busId = busIDIn;
270         devId = devIDIn;
271     }
272     uv_rwlock_rdlock(&lockMapSession);
273     map<uint32_t, HSession>::iterator i;
274     for (i = mapSession.begin(); i != mapSession.end(); ++i) {
275         HSession hs = i->second;
276         if (hs->connType == CONN_USB) {
277             continue;
278         }
279         if (hs->hUSB == nullptr) {
280             continue;
281         }
282         if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
283             continue;
284         }
285         hResult = hs;
286         break;
287     }
288     uv_rwlock_rdunlock(&lockMapSession);
289     return hResult;
290 #else
291     return nullptr;
292 #endif
293 }
294 
AsyncMainLoopTask(uv_idle_t * handle)295 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
296 {
297     AsyncParam *param = (AsyncParam *)handle->data;
298     HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
299     switch (param->method) {
300         case ASYNC_FREE_SESSION:
301             // Destruction is unified in the main thread
302             thisClass->FreeSession(param->sid);
303             break;
304         case ASYNC_STOP_MAINLOOP:
305             uv_stop(&thisClass->loopMain);
306             break;
307         default:
308             break;
309     }
310     if (param->data) {
311         delete[]((uint8_t *)param->data);
312     }
313     delete param;
314     param = nullptr;
315     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
316 }
317 
MainAsyncCallback(uv_async_t * handle)318 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
319 {
320     HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
321     list<void *>::iterator i;
322     list<void *> &lst = thisClass->lstMainThreadOP;
323     uv_rwlock_wrlock(&thisClass->mainAsync);
324     for (i = lst.begin(); i != lst.end();) {
325         AsyncParam *param = (AsyncParam *)*i;
326         Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
327         i = lst.erase(i);
328     }
329     uv_rwlock_wrunlock(&thisClass->mainAsync);
330 }
331 
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)332 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
333                                       const int dataSize)
334 {
335     AsyncParam *param = new AsyncParam();
336     if (!param) {
337         return;
338     }
339     param->sid = sessionId;
340     param->thisClass = this;
341     param->method = method;
342     if (dataSize > 0) {
343         param->dataSize = dataSize;
344         param->data = new uint8_t[param->dataSize]();
345         if (!param->data) {
346             delete param;
347             return;
348         }
349         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
350             delete[]((uint8_t *)param->data);
351             delete param;
352             return;
353         }
354     }
355 
356     asyncMainLoop.data = this;
357     uv_rwlock_wrlock(&mainAsync);
358     lstMainThreadOP.push_back(param);
359     uv_rwlock_wrunlock(&mainAsync);
360     uv_async_send(&asyncMainLoop);
361 }
362 
WorkerPendding()363 void HdcSessionBase::WorkerPendding()
364 {
365     uv_run(&loopMain, UV_RUN_DEFAULT);
366     ClearInstanceResource();
367 }
368 
MallocSessionByConnectType(HSession hSession)369 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
370 {
371     int ret = 0;
372     switch (hSession->connType) {
373         case CONN_TCP: {
374             uv_tcp_init(&loopMain, &hSession->hWorkTCP);
375             ++hSession->uvHandleRef;
376             hSession->hWorkTCP.data = hSession;
377             break;
378         }
379         case CONN_USB: {
380             // Some members need to be placed at the primary thread
381             HUSB hUSB = new HdcUSB();
382             if (!hUSB) {
383                 ret = -1;
384                 break;
385             }
386             hSession->hUSB = hUSB;
387             hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
388             break;
389         }
390 #ifdef HDC_SUPPORT_UART
391         case CONN_SERIAL: {
392             HUART hUART = new HdcUART();
393             if (!hUART) {
394                 ret = -1;
395                 break;
396             }
397             hSession->hUART = hUART;
398             break;
399         }
400 #endif // HDC_SUPPORT_UART
401         default:
402             ret = -1;
403             break;
404     }
405     return ret;
406 }
407 
408 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()409 uint32_t HdcSessionBase::GetSessionPseudoUid()
410 {
411     uint32_t uid = 0;
412     do {
413         uid = Base::GetSecureRandom();
414     } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
415     return uid;
416 }
417 
418 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)419 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
420                                        uint32_t sessionId)
421 {
422 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
423     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
424     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
425 #endif
426     HSession hSession = new(std::nothrow) HdcSession();
427     if (!hSession) {
428         WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
429         return nullptr;
430     }
431     int ret = 0;
432     ++sessionRef;
433     hSession->classInstance = this;
434     hSession->connType = connType;
435     hSession->classModule = classModule;
436     hSession->isDead = false;
437     hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
438     hSession->serverOrDaemon = serverOrDaemon;
439     hSession->hWorkThread = uv_thread_self();
440     hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
441     if (hSession->mapTask == nullptr) {
442         WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
443         delete hSession;
444         hSession = nullptr;
445         return nullptr;
446     }
447     hSession->listKey = new(std::nothrow) list<void *>;
448     if (hSession->listKey == nullptr) {
449         WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
450         delete hSession;
451         hSession = nullptr;
452         return nullptr;
453     }
454     uv_loop_init(&hSession->childLoop);
455     hSession->uvHandleRef = 0;
456     // pullup child
457     WRITE_LOG(LOG_INFO, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
458               hSession->sessionId, hSession->connType);
459     ++hSession->uvHandleRef;
460     Base::CreateSocketPair(hSession->ctrlFd);
461     size_t handleSize = sizeof(uv_poll_t);
462     hSession->pollHandle[STREAM_WORK] = (uv_poll_t *)malloc(handleSize);
463     hSession->pollHandle[STREAM_MAIN] = (uv_poll_t *)malloc(handleSize);
464     uv_poll_t *pollHandleMain = hSession->pollHandle[STREAM_MAIN];
465     if (pollHandleMain == nullptr || hSession->pollHandle[STREAM_WORK] == nullptr) {
466         WRITE_LOG(LOG_FATAL, "MallocSession malloc hSession->pollHandle failed");
467         delete hSession;
468         hSession = nullptr;
469         return nullptr;
470     }
471     (void)memset_s(hSession->pollHandle[STREAM_WORK], handleSize, 0, handleSize);
472     (void)memset_s(hSession->pollHandle[STREAM_MAIN], handleSize, 0, handleSize);
473     int initResult = uv_poll_init_socket(&loopMain, pollHandleMain, hSession->ctrlFd[STREAM_MAIN]);
474     if (initResult != 0) {
475         WRITE_LOG(LOG_FATAL, "MallocSession init pollHandleMain->loop failed");
476         _exit(0);
477     }
478     uv_poll_start(pollHandleMain, UV_READABLE, ReadCtrlFromSession);
479     hSession->pollHandle[STREAM_MAIN]->data = hSession;
480     hSession->pollHandle[STREAM_WORK]->data = hSession;
481     // Activate USB DAEMON's data channel, may not for use
482     uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
483     (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
484                    0, sizeof(uv_tcp_t));
485     ++hSession->uvHandleRef;
486     int createResult = Base::CreateSocketPair(hSession->dataFd);
487     if (createResult < 0) {
488         WRITE_LOG(LOG_FATAL, "MallocSession init dataFd failed");
489         _exit(0);
490     }
491     uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
492     hSession->dataPipe[STREAM_MAIN].data = hSession;
493     hSession->dataPipe[STREAM_WORK].data = hSession;
494 #ifdef HDC_HOST
495     Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN], HOST_SOCKETPAIR_SIZE);
496 #else
497     Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
498 #endif
499     ret = MallocSessionByConnectType(hSession);
500     if (ret) {
501         delete hSession;
502         hSession = nullptr;
503     } else {
504         AdminSession(OP_ADD, hSession->sessionId, hSession);
505     }
506     return hSession;
507 }
508 
FreeSessionByConnectType(HSession hSession)509 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
510 {
511     WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
512 
513     if (hSession->connType == CONN_USB) {
514         // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
515         if (!hSession->hUSB) {
516             return;
517         }
518         HUSB hUSB = hSession->hUSB;
519         if (!hUSB) {
520             return;
521         }
522 #ifdef HDC_HOST
523         if (hUSB->devHandle) {
524             libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
525             libusb_close(hUSB->devHandle);
526             hUSB->devHandle = nullptr;
527         }
528 #else
529         Base::CloseFd(hUSB->bulkIn);
530         Base::CloseFd(hUSB->bulkOut);
531 #endif
532         delete hSession->hUSB;
533         hSession->hUSB = nullptr;
534     }
535 #ifdef HDC_SUPPORT_UART
536     if (CONN_SERIAL == hSession->connType) {
537         if (!hSession->hUART) {
538             return;
539         }
540         HUART hUART = hSession->hUART;
541         if (!hUART) {
542             return;
543         }
544         HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
545         // tell uart session will be free
546         uartBase->StopSession(hSession);
547 #ifdef HDC_HOST
548 #ifdef HOST_MINGW
549         if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
550             CloseHandle(hUART->devUartHandle);
551             hUART->devUartHandle = INVALID_HANDLE_VALUE;
552         }
553 #elif defined(HOST_LINUX)
554         Base::CloseFd(hUART->devUartHandle);
555 #endif // _WIN32
556 #endif
557         delete hSession->hUART;
558         hSession->hUART = nullptr;
559     }
560 #endif
561 }
562 
563 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)564 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
565 {
566     HSession hSession = (HSession)handle->data;
567     HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
568     if (hSession->uvHandleRef > 0) {
569         WRITE_LOG(LOG_INFO, "FreeSessionFinally uvHandleRef:%d sessionId:%u",
570             hSession->uvHandleRef, hSession->sessionId);
571         return;
572     }
573     // Notify Server or Daemon, just UI or display commandline
574     thisClass->NotifyInstanceSessionFree(hSession, true);
575     // all hsession uv handle has been clear
576     thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
577     WRITE_LOG(LOG_INFO, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
578     HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
579     delete hSession;
580     hSession = nullptr;  // fix CodeMars SetNullAfterFree issue
581     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
582     --thisClass->sessionRef;
583 }
584 
585 // work when child-work thread finish
FreeSessionContinue(HSession hSession)586 void HdcSessionBase::FreeSessionContinue(HSession hSession)
587 {
588     auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
589         HSession hSession = (HSession)handle->data;
590         --hSession->uvHandleRef;
591         Base::TryCloseHandle((uv_handle_t *)handle);
592         if (handle == reinterpret_cast<uv_handle_t *>(hSession->pollHandle[STREAM_MAIN])) {
593             Base::CloseSocketPair(hSession->ctrlFd);
594             free(hSession->pollHandle[STREAM_MAIN]);
595         }
596     };
597     if (hSession->connType == CONN_TCP) {
598         // Turn off TCP to prevent continuing writing
599         Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
600 #ifdef _WIN32
601         closesocket(hSession->dataFd[STREAM_WORK]);
602 #else
603         Base::CloseFd(hSession->dataFd[STREAM_WORK]);
604 #endif
605     }
606     hSession->availTailIndex = 0;
607     if (hSession->ioBuf) {
608         delete[] hSession->ioBuf;
609         hSession->ioBuf = nullptr;
610     }
611     Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_MAIN], true, closeSessionTCPHandle);
612     Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
613     FreeSessionByConnectType(hSession);
614     // finish
615     Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
616 }
617 
FreeSessionOpeate(uv_timer_t * handle)618 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
619 {
620     StartTraceScope("HdcSessionBase::FreeSessionOpeate");
621     HSession hSession = (HSession)handle->data;
622 #ifdef HDC_HOST
623     if (hSession->hUSB != nullptr
624         && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
625         HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
626         pUSB->CancelUsbIo(hSession);
627         return;
628     }
629 #endif
630     HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
631     if (hSession->ref > 0) {
632         WRITE_LOG(LOG_WARN, "FreeSessionOpeate sid:%u ref:%u > 0", hSession->sessionId, uint32_t(hSession->ref));
633         return;
634     }
635     WRITE_LOG(LOG_INFO, "FreeSessionOpeate sid:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
636     // wait workthread to free
637     if (hSession->pollHandle[STREAM_WORK]->loop) {
638         auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
639         Base::SendToPollFd(hSession->ctrlFd[STREAM_MAIN], ctrl.data(), ctrl.size());
640         WRITE_LOG(LOG_INFO, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
641         auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
642             HSession hSession = (HSession)handle->data;
643             HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
644             if (!hSession->childCleared) {
645                 WRITE_LOG(LOG_INFO, "FreeSessionOpeate childCleared:%d sessionId:%u",
646                     hSession->childCleared, hSession->sessionId);
647                 return;
648             }
649             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
650             thisClass->FreeSessionContinue(hSession);
651         };
652         Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
653     } else {
654         thisClass->FreeSessionContinue(hSession);
655     }
656     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
657 }
658 
FreeSession(const uint32_t sessionId)659 void HdcSessionBase::FreeSession(const uint32_t sessionId)
660 {
661     StartTraceScope("HdcSessionBase::FreeSession");
662     Base::AddDeletedSessionId(sessionId);
663     if (threadSessionMain != uv_thread_self()) {
664         PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
665         return;
666     }
667     HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
668     WRITE_LOG(LOG_INFO, "Begin to free session, sessionid:%u", sessionId);
669     do {
670         if (!hSession || hSession->isDead) {
671             WRITE_LOG(LOG_WARN, "FreeSession hSession nullptr or isDead sessionId:%u", sessionId);
672             break;
673         }
674         WRITE_LOG(LOG_INFO, "dataFdSend:%llu, dataFdRecv:%llu",
675             uint64_t(hSession->stat.dataSendBytes),
676             uint64_t(hSession->stat.dataRecvBytes));
677         hSession->isDead = true;
678         Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
679         NotifyInstanceSessionFree(hSession, false);
680         WRITE_LOG(LOG_INFO, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
681     } while (false);
682 }
683 
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)684 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
685 {
686     HSession hRet = nullptr;
687     switch (op) {
688         case OP_ADD:
689             uv_rwlock_wrlock(&lockMapSession);
690             mapSession[sessionId] = hInput;
691             uv_rwlock_wrunlock(&lockMapSession);
692             break;
693         case OP_REMOVE:
694             uv_rwlock_wrlock(&lockMapSession);
695             mapSession.erase(sessionId);
696             uv_rwlock_wrunlock(&lockMapSession);
697             break;
698         case OP_QUERY:
699             uv_rwlock_rdlock(&lockMapSession);
700             if (mapSession.count(sessionId)) {
701                 hRet = mapSession[sessionId];
702             }
703             uv_rwlock_rdunlock(&lockMapSession);
704             break;
705         case OP_QUERY_REF:
706             uv_rwlock_wrlock(&lockMapSession);
707             if (mapSession.count(sessionId)) {
708                 hRet = mapSession[sessionId];
709                 ++hRet->ref;
710             }
711             uv_rwlock_wrunlock(&lockMapSession);
712             break;
713         case OP_UPDATE:
714             uv_rwlock_wrlock(&lockMapSession);
715             // remove old
716             mapSession.erase(sessionId);
717             mapSession[hInput->sessionId] = hInput;
718             uv_rwlock_wrunlock(&lockMapSession);
719             break;
720         case OP_VOTE_RESET:
721             if (mapSession.count(sessionId) == 0) {
722                 break;
723             }
724             bool needReset;
725             if (serverOrDaemon) {
726                 uv_rwlock_wrlock(&lockMapSession);
727                 hRet = mapSession[sessionId];
728                 hRet->voteReset = true;
729                 needReset = true;
730                 for (auto &kv : mapSession) {
731                     if (sessionId == kv.first) {
732                         continue;
733                     }
734                     WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
735                               sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
736                     if (!kv.second->voteReset) {
737                         needReset = false;
738                     }
739                 }
740                 uv_rwlock_wrunlock(&lockMapSession);
741             } else {
742                 needReset = true;
743             }
744             if (needReset) {
745                 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
746                 abort();
747             }
748             break;
749         default:
750             break;
751     }
752     return hRet;
753 }
754 
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)755 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
756 {
757     int idx = 1;
758     for (auto t : mapTask) {
759         HTaskInfo ti = t.second;
760         WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
761                   idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
762     }
763 }
764 
765 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)766 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
767 {
768     HTaskInfo hRet = nullptr;
769     map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
770 
771     switch (op) {
772         case OP_ADD:
773             hRet = mapTask[channelId];
774             if (hRet != nullptr) {
775                 delete hRet;
776             }
777             mapTask[channelId] = hInput;
778             hRet = hInput;
779 
780             WRITE_LOG(LOG_WARN, "AdminTask add session %u, channelId %u, mapTask size: %zu",
781                       hSession->sessionId, channelId, mapTask.size());
782 
783             break;
784         case OP_REMOVE:
785             mapTask.erase(channelId);
786             WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
787                       hSession->sessionId, channelId, mapTask.size());
788             break;
789         case OP_QUERY:
790             if (mapTask.count(channelId)) {
791                 hRet = mapTask[channelId];
792             }
793             break;
794         case OP_VOTE_RESET:
795             AdminSession(op, hSession->sessionId, nullptr);
796             break;
797         default:
798             break;
799     }
800     return hRet;
801 }
802 
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)803 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo)
804 {
805     StartTraceScope("HdcSessionBase::SendByProtocol");
806     if (hSession->isDead) {
807         delete[] bufPtr;
808         WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
809         return ERR_SESSION_NOFOUND;
810     }
811     int ret = 0;
812     switch (hSession->connType) {
813         case CONN_TCP: {
814             HdcTCPBase *pTCP = ((HdcTCPBase *)hSession->classModule);
815             if (echo && !hSession->serverOrDaemon) {
816                 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
817             } else {
818                 if (hSession->hWorkThread == uv_thread_self()) {
819                     ret = pTCP->WriteUvTcpFd(&hSession->hWorkTCP, bufPtr, bufLen);
820                 } else {
821                     ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
822                 }
823             }
824             break;
825         }
826         case CONN_USB: {
827             HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
828             ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
829             delete[] bufPtr;
830             break;
831         }
832 #ifdef HDC_SUPPORT_UART
833         case CONN_SERIAL: {
834             HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
835             ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
836             delete[] bufPtr;
837             break;
838         }
839 #endif
840         default:
841             delete[] bufPtr;
842             break;
843     }
844     return ret;
845 }
846 
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)847 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
848                          const uint8_t *data, const int dataSize)
849 {
850     StartTraceScope("HdcSessionBase::Send");
851     HSession hSession = AdminSession(OP_QUERY_REF, sessionId, nullptr);
852     if (!hSession) {
853         WRITE_LOG(LOG_WARN, "Send to offline device, drop it, sessionId:%u", sessionId);
854         return ERR_SESSION_NOFOUND;
855     }
856     PayloadProtect protectBuf;  // noneed convert to big-endian
857     protectBuf.channelId = channelId;
858     protectBuf.commandFlag = commandFlag;
859     protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
860     protectBuf.vCode = payloadProtectStaticVcode;
861     string s = SerialStruct::SerializeToString(protectBuf);
862     // reserve for encrypt here
863     // xx-encrypt
864 
865     PayloadHead payloadHead = {};  // need convert to big-endian
866     payloadHead.flag[0] = PACKET_FLAG.at(0);
867     payloadHead.flag[1] = PACKET_FLAG.at(1);
868     payloadHead.protocolVer = VER_PROTOCOL;
869     payloadHead.headSize = htons(s.size());
870     payloadHead.dataSize = htonl(dataSize);
871     int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
872     uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
873     if (finayBuf == nullptr) {
874         WRITE_LOG(LOG_WARN, "send allocmem err");
875         --hSession->ref;
876         return ERR_BUF_ALLOC;
877     }
878     bool bufRet = false;
879     do {
880         if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
881             WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
882             break;
883         }
884         if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
885                      reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
886             WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
887             break;
888         }
889         if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
890             WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
891             break;
892         }
893         bufRet = true;
894     } while (false);
895     if (!bufRet) {
896         delete[] finayBuf;
897         WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
898         --hSession->ref;
899         return ERR_BUF_COPY;
900     }
901     int ret = -1;
902     if (CMD_KERNEL_ECHO == commandFlag) {
903         ret = SendByProtocol(hSession, finayBuf, finalBufSize, true);
904     } else {
905         ret = SendByProtocol(hSession, finayBuf, finalBufSize);
906     }
907     --hSession->ref;
908     return ret;
909 }
910 
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)911 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
912 {
913     StartTraceScope("HdcSessionBase::DecryptPayload");
914     PayloadProtect protectBuf = {};
915     uint16_t headSize = ntohs(payloadHeadBe->headSize);
916     int dataSize = ntohl(payloadHeadBe->dataSize);
917     string encString(reinterpret_cast<char *>(encBuf), headSize);
918     SerialStruct::ParseFromString(protectBuf, encString);
919     if (protectBuf.vCode != payloadProtectStaticVcode) {
920         WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
921         return ERR_BUF_CHECK;
922     }
923     uint8_t *data = encBuf + headSize;
924     if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
925         WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
926         return ERR_BUF_CHECK;
927     }
928     if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
929         WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
930                   protectBuf.channelId, protectBuf.commandFlag);
931         return ERR_GENERIC;
932     }
933     return RET_SUCCESS;
934 }
935 
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)936 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
937 {
938     int ret = ERR_GENERIC;
939     StartTraceScope("HdcSessionBase::OnRead");
940     if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
941         WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
942         return ERR_BUF_CHECK;
943     }
944     struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
945     // to prevent integer overflow caused by the add operation of two input num
946     uint64_t payloadHeadSize = static_cast<uint64_t>(ntohl(payloadHead->dataSize)) +
947         static_cast<uint64_t>(ntohs(payloadHead->headSize));
948     int packetHeadSize = sizeof(struct PayloadHead);
949     if (payloadHeadSize == 0 || payloadHeadSize > static_cast<uint64_t>(HDC_BUF_MAX_BYTES)) {
950         WRITE_LOG(LOG_FATAL, "Packet size incorrect");
951         return ERR_BUF_CHECK;
952     }
953 
954     // 0 < payloadHeadSize < HDC_BUF_MAX_BYTES
955     int tobeReadLen = static_cast<int>(payloadHeadSize);
956     if (bufLen - packetHeadSize < tobeReadLen) {
957         return 0;
958     }
959     if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
960         WRITE_LOG(LOG_WARN, "decrypt plhead error");
961         return ERR_BUF_CHECK;
962     }
963     ret = packetHeadSize + tobeReadLen;
964     return ret;
965 }
966 
967 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)968 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
969 {
970     HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
971     int indexBuf = 0;
972     int childRet = 0;
973     StartTraceScope("HdcSessionBase::FetchIOBuf");
974     if (read < 0) {
975         constexpr int bufSize = 1024;
976         char buf[bufSize] = { 0 };
977         uv_strerror_r(read, buf, bufSize);
978         WRITE_LOG(LOG_FATAL, "FetchIOBuf read io failed,%s", buf);
979         return ERR_IO_FAIL;
980     }
981     hSession->heartbeat.AddMessageCount();
982     hSession->stat.dataRecvBytes += read;
983     hSession->availTailIndex += read;
984     while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
985         childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
986         if (childRet > 0) {
987             hSession->availTailIndex -= childRet;
988             indexBuf += childRet;
989         } else if (childRet == 0) {
990             // Not enough a IO
991             break;
992         } else {                           // <0
993             WRITE_LOG(LOG_FATAL, "FetchIOBuf error childRet:%d sessionId:%u", childRet, hSession->sessionId);
994             hSession->availTailIndex = 0;  // Preventing malicious data packages
995             indexBuf = ERR_BUF_SIZE;
996             break;
997         }
998         // It may be multi-time IO to merge in a BUF, need to loop processing
999     }
1000     if (indexBuf > 0 && hSession->availTailIndex > 0) {
1001         if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
1002             != EOK) {
1003             return ERR_BUF_COPY;
1004         };
1005         uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
1006         Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
1007     }
1008     return indexBuf;
1009 }
1010 
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)1011 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
1012 {
1013     HSession context = (HSession)handle->data;
1014     Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
1015     buf->base = (char *)context->ioBuf + context->availTailIndex;
1016     int size = context->bufSize - context->availTailIndex;
1017     buf->len = std::min(size, static_cast<int>(sizeWanted));
1018 }
1019 
FinishWriteSessionTCP(uv_write_t * req,int status)1020 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
1021 {
1022     HSession hSession = (HSession)req->handle->data;
1023     --hSession->ref;
1024     HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1025     if (status < 0) {
1026         WRITE_LOG(LOG_WARN, "FinishWriteSessionTCP status:%d sessionId:%u isDead:%d ref:%u",
1027             status, hSession->sessionId, hSession->isDead, uint32_t(hSession->ref));
1028         Base::TryCloseHandle((uv_handle_t *)req->handle);
1029         if (!hSession->isDead && !hSession->ref) {
1030             WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%u", hSession->sessionId);
1031             thisClass->FreeSession(hSession->sessionId);
1032         }
1033     }
1034     delete[]((uint8_t *)req->data);
1035     delete req;
1036 }
1037 
DispatchSessionThreadCommand(HSession hSession,const uint8_t * baseBuf,const int bytesIO)1038 bool HdcSessionBase::DispatchSessionThreadCommand(HSession hSession, const uint8_t *baseBuf,
1039                                                   const int bytesIO)
1040 {
1041     bool ret = true;
1042     uint8_t flag = *const_cast<uint8_t *>(baseBuf);
1043 
1044     switch (flag) {
1045         case SP_JDWP_NEWFD:
1046         case SP_ARK_NEWFD: {
1047             JdwpNewFileDescriptor(baseBuf, bytesIO);
1048             break;
1049         }
1050         default:
1051             WRITE_LOG(LOG_WARN, "Not support session command");
1052             break;
1053     }
1054     return ret;
1055 }
1056 
ReadCtrlFromSession(uv_poll_t * poll,int status,int events)1057 void HdcSessionBase::ReadCtrlFromSession(uv_poll_t *poll, int status, int events)
1058 {
1059     HSession hSession = (HSession)poll->data;
1060     HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1061     const int size = Base::GetMaxBufSizeStable();
1062     char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1063     ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_MAIN], buf, size);
1064     while (true) {
1065         if (nread < 0) {
1066             constexpr int bufSize = 1024;
1067             char buffer[bufSize] = { 0 };
1068             uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1069             WRITE_LOG(LOG_WARN, "ReadCtrlFromSession failed,%s", buffer);
1070             uv_poll_stop(poll);
1071             break;
1072         }
1073         if (nread == 0) {
1074             WRITE_LOG(LOG_FATAL, "ReadCtrlFromSession read data zero byte");
1075             break;
1076         }
1077         // only one command, no need to split command from stream
1078         // if add more commands, consider the format command
1079         hSessionBase->DispatchSessionThreadCommand(hSession, reinterpret_cast<uint8_t *>(buf), nread);
1080         break;
1081     }
1082     delete[] buf;
1083 }
1084 
SetHeartbeatFeature(SessionHandShake & handshake)1085 void HdcSessionBase::SetHeartbeatFeature(SessionHandShake &handshake)
1086 {
1087     if (!Base::GetheartbeatSwitch()) {
1088         return;
1089     }
1090     // told daemon, we support features
1091     Base::TlvAppend(handshake.buf, TAG_SUPPORT_FEATURE, Base::FeatureToString(Base::GetSupportFeature()));
1092 }
1093 
WorkThreadInitSession(HSession hSession,SessionHandShake & handshake)1094 void HdcSessionBase::WorkThreadInitSession(HSession hSession, SessionHandShake &handshake)
1095 {
1096     handshake.banner = HANDSHAKE_MESSAGE;
1097     handshake.sessionId = hSession->sessionId;
1098     handshake.connectKey = hSession->connectKey;
1099     if (!hSession->isCheck) {
1100         handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1101         WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1102     }
1103     handshake.authType = AUTH_NONE;
1104     // told daemon, we support RSA_3072_SHA512 auth
1105     Base::TlvAppend(handshake.buf, TAG_AUTH_TYPE, std::to_string(AuthVerifyType::RSA_3072_SHA512));
1106     HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1107     hSessionBase->SetHeartbeatFeature(handshake);
1108 }
1109 
WorkThreadStartSession(HSession hSession)1110 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1111 {
1112     bool regOK = false;
1113     int childRet = 0;
1114     if (hSession->connType == CONN_TCP) {
1115         HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1116         hSession->hChildWorkTCP.data = hSession;
1117         if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1118             WRITE_LOG(LOG_WARN, "HdcSessionBase SessionCtrl failed 1");
1119             return false;
1120         }
1121         if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1122             constexpr int bufSize = 1024;
1123             char buf[bufSize] = { 0 };
1124             uv_strerror_r(childRet, buf, bufSize);
1125             WRITE_LOG(LOG_WARN, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1126             return false;
1127         }
1128         Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1129         uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1130         regOK = true;
1131 #ifdef HDC_SUPPORT_UART
1132     } else if (hSession->connType == CONN_SERIAL) { // UART
1133         HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1134         WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1135         regOK = pUARTBase->ReadyForWorkThread(hSession);
1136 #endif
1137     } else {  // USB
1138         HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1139         WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1140         regOK = pUSBBase->ReadyForWorkThread(hSession);
1141     }
1142 
1143     if (regOK && hSession->serverOrDaemon) {
1144         // session handshake step1
1145         SessionHandShake handshake = {};
1146         WorkThreadInitSession(hSession, handshake);
1147         string hs = SerialStruct::SerializeToString(handshake);
1148 #ifdef HDC_SUPPORT_UART
1149         WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1150                   hSession->sessionId, handshake.authType, hs.c_str());
1151 #endif
1152         Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1153              reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1154     }
1155     return regOK;
1156 }
1157 
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1158 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1159                                                 int dataSize)
1160 {
1161     vector<uint8_t> ret;
1162     while (true) {
1163         if (dataSize > BUF_SIZE_MICRO) {
1164             WRITE_LOG(LOG_WARN, "BuildCtrlString dataSize:%d", dataSize);
1165             break;
1166         }
1167         CtrlStruct ctrl = {};
1168         ctrl.command = command;
1169         ctrl.channelId = channelId;
1170         ctrl.dataSize = dataSize;
1171         if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1172             break;
1173         }
1174         uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1175         ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1176         break;
1177     }
1178     return ret;
1179 }
1180 
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1181 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1182 {
1183     bool ret = true;
1184     uint32_t channelId = ctrl->channelId;  // if send not set, it is zero
1185     switch (ctrl->command) {
1186         case SP_START_SESSION: {
1187             WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand  START_SESSION sessionId:%u instance:%s",
1188                       hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1189             ret = WorkThreadStartSession(hSession);
1190             break;
1191         }
1192         case SP_STOP_SESSION: {
1193             WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1194             HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1195             hSessionBase->StopHeartbeatWork(hSession);
1196             auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1197                 HSession hSession = (HSession)handle->data;
1198                 Base::TryCloseHandle((uv_handle_t *)handle);
1199                 if (handle == (uv_handle_t *)hSession->pollHandle[STREAM_WORK]) {
1200                     free(hSession->pollHandle[STREAM_WORK]);
1201                 }
1202                 if (--hSession->uvChildRef == 0) {
1203                     uv_stop(&hSession->childLoop);
1204                 };
1205             };
1206             constexpr int uvChildRefOffset = 2;
1207             hSession->uvChildRef += uvChildRefOffset;
1208             if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) {  // maybe not use it
1209                 ++hSession->uvChildRef;
1210                 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1211             }
1212             Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_WORK], true,
1213                                  closeSessionChildThreadTCPHandle);
1214             Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1215                                  closeSessionChildThreadTCPHandle);
1216             break;
1217         }
1218         case SP_ATTACH_CHANNEL: {
1219             if (!serverOrDaemon) {
1220                 break;  // Only Server has this feature
1221             }
1222             AttachChannel(hSession, channelId);
1223             break;
1224         }
1225         case SP_DEATCH_CHANNEL: {
1226             if (!serverOrDaemon) {
1227                 break;  // Only Server has this feature
1228             }
1229             DeatchChannel(hSession, channelId);
1230             break;
1231         }
1232         default:
1233             WRITE_LOG(LOG_WARN, "Not support main command");
1234             ret = false;
1235             break;
1236     }
1237     return ret;
1238 }
1239 
1240 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_poll_t * poll,int status,int events)1241 void HdcSessionBase::ReadCtrlFromMain(uv_poll_t *poll, int status, int events)
1242 {
1243     HSession hSession = (HSession)poll->data;
1244     HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1245     int formatCommandSize = sizeof(CtrlStruct);
1246     int index = 0;
1247     const int size = Base::GetMaxBufSizeStable();
1248     char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1249     ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_WORK], buf, size);
1250     while (true) {
1251         if (nread < 0) {
1252             constexpr int bufSize = 1024;
1253             char buffer[bufSize] = { 0 };
1254             uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1255             WRITE_LOG(LOG_WARN, "SessionCtrl failed,%s", buffer);
1256             break;
1257         }
1258         if (nread % formatCommandSize != 0) {
1259             WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1260             break;
1261         }
1262         CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf + index);
1263         if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1264             WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1265                       hSession->sessionId, ctrl->channelId, ctrl->command);
1266             break;
1267         }
1268         index += sizeof(CtrlStruct);
1269         if (index >= nread) {
1270             break;
1271         }
1272     }
1273     delete[] buf;
1274 }
1275 
ReChildLoopForSessionClear(HSession hSession)1276 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1277 {
1278     // Restart loop close task
1279     ClearOwnTasks(hSession, 0);
1280     WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1281     auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1282         HSession hSession = (HSession)handle->data;
1283         hSession->clearTaskTimes++;
1284         HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1285         if (thisClass->taskCount && (hSession->clearTaskTimes < 50)) {    //50: 50 * 120ms = 6s
1286             return;
1287         }
1288         if (hSession->clearTaskTimes >= 50) {   //50: 50 * 120ms = 6s
1289             WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear exited due to timeout of 6s");
1290         }
1291         // all task has been free
1292         uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1293         uv_stop(&hSession->childLoop);  // stop ReChildLoopForSessionClear pendding
1294     };
1295     Base::TimerUvTask(
1296         &hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1297     uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1298     // clear
1299     Base::TryCloseChildLoop(&hSession->childLoop, "Session childUV");
1300 }
1301 
1302 //stop Heartbeat
StopHeartbeatWork(HSession hSession)1303 void HdcSessionBase::StopHeartbeatWork(HSession hSession)
1304 {
1305     hSession->heartbeat.SetSupportHeartbeat(false);
1306     uv_timer_stop(&hSession->heartbeatTimer);
1307 }
1308 
1309 // send heartbeat msg
SendHeartbeatMsg(uv_timer_t * handle)1310 void HdcSessionBase::SendHeartbeatMsg(uv_timer_t *handle)
1311 {
1312     HSession hSession = (HSession)handle->data;
1313     if (!hSession->heartbeat.GetSupportHeartbeat()) {
1314         WRITE_LOG(LOG_INFO, "session %u not support heatbeat", hSession->sessionId);
1315         if (hSession->handshakeOK) {
1316             uv_timer_stop(&hSession->heartbeatTimer);
1317         }
1318         return;
1319     }
1320 
1321     std::string str = hSession->heartbeat.ToString();
1322     WRITE_LOG(LOG_INFO, "send %s for session %u", str.c_str(), hSession->sessionId);
1323 
1324     HeartbeatMsg heartbeat = {};
1325     heartbeat.heartbeatCount = hSession->heartbeat.GetHeartbeatCount();
1326     hSession->heartbeat.AddHeartbeatCount();
1327     string s = SerialStruct::SerializeToString(heartbeat);
1328     HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1329     thisClass->Send(hSession->sessionId, 0, CMD_HEARTBEAT_MSG,
1330         reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size());
1331 }
1332 
StartHeartbeatWork(HSession hSession)1333 void HdcSessionBase::StartHeartbeatWork(HSession hSession)
1334 {
1335     if (!Base::GetheartbeatSwitch()) {
1336         return;
1337     }
1338     WRITE_LOG(LOG_DEBUG, "StartHeartbeatWork");
1339     hSession->heartbeatTimer.data = hSession;
1340     uv_timer_init(&hSession->childLoop, &hSession->heartbeatTimer);
1341     uv_timer_start(&hSession->heartbeatTimer, SendHeartbeatMsg, 0, HEARTBEAT_INTERVAL);
1342 }
1343 
SessionWorkThread(uv_work_t * arg)1344 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1345 {
1346     HSession hSession = (HSession)arg->data;
1347     HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1348     hSession->hWorkChildThread = uv_thread_self();
1349 
1350     uv_poll_t *pollHandle = hSession->pollHandle[STREAM_WORK];
1351     pollHandle->data = hSession;
1352     int initResult = uv_poll_init_socket(&hSession->childLoop, pollHandle, hSession->ctrlFd[STREAM_WORK]);
1353     if (initResult != 0) {
1354         WRITE_LOG(LOG_FATAL, "SessionWorkThread init pollHandle->loop failed");
1355         _exit(0);
1356     }
1357     uv_poll_start(pollHandle, UV_READABLE, ReadCtrlFromMain);
1358     // start heartbeat rimer
1359     HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1360     hSessionBase->StartHeartbeatWork(hSession);
1361     WRITE_LOG(LOG_INFO, "!!!Workthread run begin, sid:%u instance:%s", hSession->sessionId,
1362               thisClass->serverOrDaemon ? "server" : "daemon");
1363     uv_run(&hSession->childLoop, UV_RUN_DEFAULT);  // work pendding
1364     WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1365     // main loop has exit
1366     thisClass->ReChildLoopForSessionClear(hSession);  // work pending again
1367     hSession->childCleared = true;
1368     WRITE_LOG(LOG_WARN, "!!!Workthread run finish, sessionId:%u", hSession->sessionId);
1369 }
1370 
1371 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1372 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1373                             MessageLevel level, const char *msg, ...)
1374 // clang-format on
1375 {
1376     va_list vaArgs;
1377     va_start(vaArgs, msg);
1378     string log = Base::StringFormat(msg, vaArgs);
1379     va_end(vaArgs);
1380     vector<uint8_t> buf;
1381     buf.push_back(level);
1382     buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1383     ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1384 }
1385 
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1386 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1387 {
1388     // referer from HdcServerForClient::DoCommandRemote
1389     bool ret = false;
1390     bool taskMasterInit = false;
1391     masterTask = false;
1392     switch (command) {
1393         case CMD_FILE_INIT:
1394         case CMD_FLASHD_FLASH_INIT:
1395         case CMD_FLASHD_UPDATE_INIT:
1396         case CMD_FLASHD_ERASE:
1397         case CMD_FLASHD_FORMAT:
1398         case CMD_FORWARD_INIT:
1399         case CMD_APP_INIT:
1400         case CMD_APP_UNINSTALL:
1401         case CMD_APP_SIDELOAD:
1402             taskMasterInit = true;
1403             break;
1404         default:
1405             break;
1406     }
1407     if (!serverOrDaemon &&
1408             (command == CMD_SHELL_INIT || command == CMD_UNITY_EXECUTE_EX ||
1409                 (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1410         // daemon's single side command
1411         ret = true;
1412     } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1413         // slave tasks
1414         ret = true;
1415     } else if (command == CMD_UNITY_BUGREPORT_INIT) {
1416         ret = true;
1417     } else if (taskMasterInit) {
1418         // task init command
1419         masterTask = true;
1420         ret = true;
1421     }
1422     return ret;
1423 }
1424 // Heavy and time-consuming work was putted in the new thread to do, and does
1425 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1426 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1427                                       uint8_t *payload, int payloadSize)
1428 {
1429     StartTraceScope("HdcSessionBase::DispatchTaskData");
1430     bool ret = true;
1431     HTaskInfo hTaskInfo = nullptr;
1432     bool masterTask = false;
1433     while (true) {
1434         // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1435         if (NeedNewTaskInfo(command, masterTask)) {
1436             WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1437             hTaskInfo = new(std::nothrow) TaskInformation();
1438             if (hTaskInfo == nullptr) {
1439                 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1440                 break;
1441             }
1442             hTaskInfo->channelId = channelId;
1443             hTaskInfo->sessionId = hSession->sessionId;
1444             hTaskInfo->runLoop = &hSession->childLoop;
1445             hTaskInfo->serverOrDaemon = serverOrDaemon;
1446             hTaskInfo->masterSlave = masterTask;
1447             hTaskInfo->closeRetryCount = 0;
1448             hTaskInfo->channelTask = false;
1449             hTaskInfo->isCleared = false;
1450 
1451             int addTaskRetry = 3; // try 3 time
1452             while (addTaskRetry > 0) {
1453                 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1454                     break;
1455                 }
1456                 sleep(1);
1457                 --addTaskRetry;
1458             }
1459 
1460             if (addTaskRetry == 0) {
1461 #ifndef HDC_HOST
1462                 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId,
1463                        MSG_FAIL, "hdc thread pool busy, may cause reset later");
1464 #endif
1465                 delete hTaskInfo;
1466                 hTaskInfo = nullptr;
1467                 ret = false;
1468                 break;
1469             }
1470         } else {
1471             hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1472         }
1473         if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1474             WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1475             break;
1476         }
1477         ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1478         break;
1479     }
1480     return ret;
1481 }
1482 
PostStopInstanceMessage(bool restart)1483 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1484 {
1485     PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1486     WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1487     wantRestart = restart;
1488 }
1489 
ParsePeerSupportFeatures(HSession & hSession,std::map<std::string,std::string> & tlvMap)1490 void HdcSessionBase::ParsePeerSupportFeatures(HSession &hSession, std::map<std::string, std::string> &tlvMap)
1491 {
1492     if (hSession == nullptr) {
1493         WRITE_LOG(LOG_FATAL, "Invalid paramter, hSession is null");
1494         return;
1495     }
1496 
1497     if (tlvMap.find(TAG_SUPPORT_FEATURE) != tlvMap.end()) {
1498         std::vector<std::string> features;
1499         WRITE_LOG(LOG_INFO, "peer support features are %s for session %u",
1500             tlvMap[TAG_SUPPORT_FEATURE].c_str(), hSession->sessionId);
1501         Base::SplitString(tlvMap[TAG_SUPPORT_FEATURE], ",", features);
1502         hSession->heartbeat.SetSupportHeartbeat(Base::IsSupportFeature(features, FEATURE_HEARTBEAT));
1503     }
1504 }
1505 }  // namespace Hdc
1506