1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24 // print version pid
25 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26 // server/daemon common initialization code
27 if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28 uvThreadSize = SIZE_THREAD_POOL_MIN;
29 } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30 uvThreadSize = SIZE_THREAD_POOL_MAX;
31 }
32 threadPoolCount = uvThreadSize;
33 WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34 string uvThreadEnv("UV_THREADPOOL_SIZE");
35 string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37 uvThreadEnv += "=";
38 uvThreadEnv += uvThreadVal;
39 _putenv(uvThreadEnv.c_str());
40 #else
41 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43 uv_loop_init(&loopMain);
44 WRITE_LOG(LOG_DEBUG, "loopMain init");
45 uv_rwlock_init(&mainAsync);
46 #ifndef FUZZ_TEST
47 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
48 #endif
49 uv_rwlock_init(&lockMapSession);
50 serverOrDaemon = serverOrDaemonIn;
51 ctxUSB = nullptr;
52 wantRestart = false;
53 threadSessionMain = uv_thread_self();
54
55 #ifdef HDC_HOST
56 if (serverOrDaemon) {
57 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
58 ctxUSB = nullptr;
59 WRITE_LOG(LOG_FATAL, "libusb_init failed ctxUSB is nullptr");
60 }
61 }
62 #endif
63 }
64
~HdcSessionBase()65 HdcSessionBase::~HdcSessionBase()
66 {
67 #ifndef FUZZ_TEST
68 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
69 #endif
70 uv_loop_close(&loopMain);
71 // clear base
72 uv_rwlock_destroy(&mainAsync);
73 uv_rwlock_destroy(&lockMapSession);
74 #ifdef HDC_HOST
75 if (serverOrDaemon and ctxUSB != nullptr) {
76 libusb_exit((libusb_context *)ctxUSB);
77 }
78 #endif
79 WRITE_LOG(LOG_WARN, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
80 serverOrDaemon ? "server" : "daemon");
81 }
82
83 // remove step2
TryRemoveTask(HTaskInfo hTask)84 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
85 {
86 if (hTask->taskFree) {
87 WRITE_LOG(LOG_WARN, "TryRemoveTask channelId:%u", hTask->channelId);
88 return true;
89 }
90 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
91 if (ret) {
92 hTask->taskFree = true;
93 } else {
94 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
95 // here to see which task has not been released
96 // print task clear
97 }
98 return ret;
99 }
100
101 // remove step1
BeginRemoveTask(HTaskInfo hTask)102 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
103 {
104 StartTraceScope("HdcSessionBase::BeginRemoveTask");
105 if (hTask->taskStop || hTask->taskFree) {
106 WRITE_LOG(LOG_WARN, "BeginRemoveTask channelId:%u taskStop:%d taskFree:%d",
107 hTask->channelId, hTask->taskStop, hTask->taskFree);
108 return;
109 }
110
111 WRITE_LOG(LOG_WARN, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
112 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
113 StartTraceScope("HdcSessionBase::BeginRemoveTask taskClassDeleteRetry");
114 HTaskInfo hTask = (HTaskInfo)handle->data;
115 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
116 if (hTask->isCleared == false) {
117 hTask->isCleared = true;
118 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry start clear task, taskType:%d cid:%u sid:%u",
119 hTask->taskType, hTask->channelId, hTask->sessionId);
120 bool ret = thisClass->RemoveInstanceTask(OP_CLEAR, hTask);
121 if (!ret) {
122 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry RemoveInstanceTask return false taskType:%d cid:%u sid:%u",
123 hTask->taskType, hTask->channelId, hTask->sessionId);
124 }
125 }
126
127 constexpr uint32_t count = 1000;
128 if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
129 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
130 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
131 hTask->closeRetryCount = 1;
132 }
133 hTask->closeRetryCount++;
134 if (!thisClass->TryRemoveTask(hTask)) {
135 WRITE_LOG(LOG_WARN, "TaskDelay TryRemoveTask false channelId:%u", hTask->channelId);
136 return;
137 }
138 WRITE_LOG(LOG_WARN, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
139 if (hTask != nullptr) {
140 delete hTask;
141 hTask = nullptr;
142 }
143 thisClass->taskCount--;
144 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
145 };
146 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
147
148 hTask->taskStop = true;
149 }
150
151 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
152 // after the end of the LOOP.
153 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
154 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)155 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
156 {
157 // First case: normal task cleanup process (STOP Remove)
158 // Second: The task is cleaned up, the session ends
159 // Third: The task is cleaned up, and the session is directly over the session.
160 StartTraceScope("HdcSessionBase::ClearOwnTasks");
161 hSession->mapTaskMutex.lock();
162 map<uint32_t, HTaskInfo>::iterator iter;
163 taskCount = hSession->mapTask->size();
164 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
165 uint32_t channelId = iter->first;
166 HTaskInfo hTask = iter->second;
167 if (channelIDInput != 0) { // single
168 if (channelIDInput != channelId) {
169 ++iter;
170 continue;
171 }
172 BeginRemoveTask(hTask);
173 WRITE_LOG(LOG_WARN, "ClearOwnTasks OP_CLEAR finish, sessionId:%u channelIDInput:%u",
174 hSession->sessionId, channelIDInput);
175 iter = hSession->mapTask->erase(iter);
176 break;
177 }
178 // multi
179 BeginRemoveTask(hTask);
180 iter = hSession->mapTask->erase(iter);
181 }
182 hSession->mapTaskMutex.unlock();
183 }
184
ClearSessions()185 void HdcSessionBase::ClearSessions()
186 {
187 // no need to lock mapSession
188 // broadcast free signal
189 for (auto v : mapSession) {
190 HSession hSession = (HSession)v.second;
191 if (!hSession->isDead) {
192 FreeSession(hSession->sessionId);
193 }
194 }
195 }
196
ReMainLoopForInstanceClear()197 void HdcSessionBase::ReMainLoopForInstanceClear()
198 { // reloop
199 StartTraceScope("HdcSessionBase::ReMainLoopForInstanceClear");
200 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
201 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
202 if (thisClass->sessionRef > 0) {
203 return;
204 }
205 // all task has been free
206 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
207 uv_stop(&thisClass->loopMain);
208 };
209 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
210 uv_run(&loopMain, UV_RUN_DEFAULT);
211 };
212
213 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)214 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
215 {
216 uv_rwlock_rdlock(&lockMapSession);
217 map<uint32_t, HSession>::iterator i;
218 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
219 HSession hs = i->second;
220 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
221 continue;
222 }
223 kickOut(hs);
224 break;
225 }
226 uv_rwlock_rdunlock(&lockMapSession);
227 }
228 #endif
229
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))230 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
231 {
232 if (!pCallBack) {
233 return;
234 }
235 uv_rwlock_rdlock(&lockMapSession);
236 map<uint32_t, HSession>::iterator i;
237 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
238 HSession hs = i->second;
239 if (hs->connType != CONN_USB) {
240 continue;
241 }
242 if (hs->hUSB == nullptr) {
243 continue;
244 }
245 if (pCallBack) {
246 pCallBack(hs);
247 }
248 break;
249 }
250 uv_rwlock_rdunlock(&lockMapSession);
251 }
252
253 // The PC side gives the device information, determines if the USB device is registered
254 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)255 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
256 {
257 #ifdef HDC_HOST
258 libusb_device *dev = (libusb_device *)pDev;
259 HSession hResult = nullptr;
260 if (!mapSession.size()) {
261 return nullptr;
262 }
263 uint8_t busId = 0;
264 uint8_t devId = 0;
265 if (pDev) {
266 busId = libusb_get_bus_number(dev);
267 devId = libusb_get_device_address(dev);
268 } else {
269 busId = busIDIn;
270 devId = devIDIn;
271 }
272 uv_rwlock_rdlock(&lockMapSession);
273 map<uint32_t, HSession>::iterator i;
274 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
275 HSession hs = i->second;
276 if (hs->connType == CONN_USB) {
277 continue;
278 }
279 if (hs->hUSB == nullptr) {
280 continue;
281 }
282 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
283 continue;
284 }
285 hResult = hs;
286 break;
287 }
288 uv_rwlock_rdunlock(&lockMapSession);
289 return hResult;
290 #else
291 return nullptr;
292 #endif
293 }
294
AsyncMainLoopTask(uv_idle_t * handle)295 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
296 {
297 AsyncParam *param = (AsyncParam *)handle->data;
298 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
299 switch (param->method) {
300 case ASYNC_FREE_SESSION:
301 // Destruction is unified in the main thread
302 thisClass->FreeSession(param->sid);
303 break;
304 case ASYNC_STOP_MAINLOOP:
305 uv_stop(&thisClass->loopMain);
306 break;
307 default:
308 break;
309 }
310 if (param->data) {
311 delete[]((uint8_t *)param->data);
312 }
313 delete param;
314 param = nullptr;
315 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
316 }
317
MainAsyncCallback(uv_async_t * handle)318 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
319 {
320 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
321 list<void *>::iterator i;
322 list<void *> &lst = thisClass->lstMainThreadOP;
323 uv_rwlock_wrlock(&thisClass->mainAsync);
324 for (i = lst.begin(); i != lst.end();) {
325 AsyncParam *param = (AsyncParam *)*i;
326 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
327 i = lst.erase(i);
328 }
329 uv_rwlock_wrunlock(&thisClass->mainAsync);
330 }
331
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)332 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
333 const int dataSize)
334 {
335 AsyncParam *param = new AsyncParam();
336 if (!param) {
337 return;
338 }
339 param->sid = sessionId;
340 param->thisClass = this;
341 param->method = method;
342 if (dataSize > 0) {
343 param->dataSize = dataSize;
344 param->data = new uint8_t[param->dataSize]();
345 if (!param->data) {
346 delete param;
347 return;
348 }
349 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
350 delete[]((uint8_t *)param->data);
351 delete param;
352 return;
353 }
354 }
355
356 asyncMainLoop.data = this;
357 uv_rwlock_wrlock(&mainAsync);
358 lstMainThreadOP.push_back(param);
359 uv_rwlock_wrunlock(&mainAsync);
360 uv_async_send(&asyncMainLoop);
361 }
362
WorkerPendding()363 void HdcSessionBase::WorkerPendding()
364 {
365 uv_run(&loopMain, UV_RUN_DEFAULT);
366 ClearInstanceResource();
367 }
368
MallocSessionByConnectType(HSession hSession)369 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
370 {
371 int ret = 0;
372 switch (hSession->connType) {
373 case CONN_TCP: {
374 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
375 ++hSession->uvHandleRef;
376 hSession->hWorkTCP.data = hSession;
377 break;
378 }
379 case CONN_USB: {
380 // Some members need to be placed at the primary thread
381 HUSB hUSB = new HdcUSB();
382 if (!hUSB) {
383 ret = -1;
384 break;
385 }
386 hSession->hUSB = hUSB;
387 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
388 break;
389 }
390 #ifdef HDC_SUPPORT_UART
391 case CONN_SERIAL: {
392 HUART hUART = new HdcUART();
393 if (!hUART) {
394 ret = -1;
395 break;
396 }
397 hSession->hUART = hUART;
398 break;
399 }
400 #endif // HDC_SUPPORT_UART
401 default:
402 ret = -1;
403 break;
404 }
405 return ret;
406 }
407
408 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()409 uint32_t HdcSessionBase::GetSessionPseudoUid()
410 {
411 uint32_t uid = 0;
412 do {
413 uid = Base::GetSecureRandom();
414 } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
415 return uid;
416 }
417
418 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)419 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
420 uint32_t sessionId)
421 {
422 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
423 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
424 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
425 #endif
426 HSession hSession = new(std::nothrow) HdcSession();
427 if (!hSession) {
428 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
429 return nullptr;
430 }
431 int ret = 0;
432 ++sessionRef;
433 hSession->classInstance = this;
434 hSession->connType = connType;
435 hSession->classModule = classModule;
436 hSession->isDead = false;
437 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
438 hSession->serverOrDaemon = serverOrDaemon;
439 hSession->hWorkThread = uv_thread_self();
440 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
441 if (hSession->mapTask == nullptr) {
442 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
443 delete hSession;
444 hSession = nullptr;
445 return nullptr;
446 }
447 hSession->listKey = new(std::nothrow) list<void *>;
448 if (hSession->listKey == nullptr) {
449 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
450 delete hSession;
451 hSession = nullptr;
452 return nullptr;
453 }
454 uv_loop_init(&hSession->childLoop);
455 hSession->uvHandleRef = 0;
456 // pullup child
457 WRITE_LOG(LOG_INFO, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
458 hSession->sessionId, hSession->connType);
459 ++hSession->uvHandleRef;
460 Base::CreateSocketPair(hSession->ctrlFd);
461 size_t handleSize = sizeof(uv_poll_t);
462 hSession->pollHandle[STREAM_WORK] = (uv_poll_t *)malloc(handleSize);
463 hSession->pollHandle[STREAM_MAIN] = (uv_poll_t *)malloc(handleSize);
464 uv_poll_t *pollHandleMain = hSession->pollHandle[STREAM_MAIN];
465 if (pollHandleMain == nullptr || hSession->pollHandle[STREAM_WORK] == nullptr) {
466 WRITE_LOG(LOG_FATAL, "MallocSession malloc hSession->pollHandle failed");
467 delete hSession;
468 hSession = nullptr;
469 return nullptr;
470 }
471 (void)memset_s(hSession->pollHandle[STREAM_WORK], handleSize, 0, handleSize);
472 (void)memset_s(hSession->pollHandle[STREAM_MAIN], handleSize, 0, handleSize);
473 int initResult = uv_poll_init_socket(&loopMain, pollHandleMain, hSession->ctrlFd[STREAM_MAIN]);
474 if (initResult != 0) {
475 WRITE_LOG(LOG_FATAL, "MallocSession init pollHandleMain->loop failed");
476 _exit(0);
477 }
478 uv_poll_start(pollHandleMain, UV_READABLE, ReadCtrlFromSession);
479 hSession->pollHandle[STREAM_MAIN]->data = hSession;
480 hSession->pollHandle[STREAM_WORK]->data = hSession;
481 // Activate USB DAEMON's data channel, may not for use
482 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
483 (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
484 0, sizeof(uv_tcp_t));
485 ++hSession->uvHandleRef;
486 int createResult = Base::CreateSocketPair(hSession->dataFd);
487 if (createResult < 0) {
488 WRITE_LOG(LOG_FATAL, "MallocSession init dataFd failed");
489 _exit(0);
490 }
491 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
492 hSession->dataPipe[STREAM_MAIN].data = hSession;
493 hSession->dataPipe[STREAM_WORK].data = hSession;
494 #ifdef HDC_HOST
495 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN], HOST_SOCKETPAIR_SIZE);
496 #else
497 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
498 #endif
499 ret = MallocSessionByConnectType(hSession);
500 if (ret) {
501 delete hSession;
502 hSession = nullptr;
503 } else {
504 AdminSession(OP_ADD, hSession->sessionId, hSession);
505 }
506 return hSession;
507 }
508
FreeSessionByConnectType(HSession hSession)509 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
510 {
511 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
512
513 if (hSession->connType == CONN_USB) {
514 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
515 if (!hSession->hUSB) {
516 return;
517 }
518 HUSB hUSB = hSession->hUSB;
519 if (!hUSB) {
520 return;
521 }
522 #ifdef HDC_HOST
523 if (hUSB->devHandle) {
524 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
525 libusb_close(hUSB->devHandle);
526 hUSB->devHandle = nullptr;
527 }
528 #else
529 Base::CloseFd(hUSB->bulkIn);
530 Base::CloseFd(hUSB->bulkOut);
531 #endif
532 delete hSession->hUSB;
533 hSession->hUSB = nullptr;
534 }
535 #ifdef HDC_SUPPORT_UART
536 if (CONN_SERIAL == hSession->connType) {
537 if (!hSession->hUART) {
538 return;
539 }
540 HUART hUART = hSession->hUART;
541 if (!hUART) {
542 return;
543 }
544 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
545 // tell uart session will be free
546 uartBase->StopSession(hSession);
547 #ifdef HDC_HOST
548 #ifdef HOST_MINGW
549 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
550 CloseHandle(hUART->devUartHandle);
551 hUART->devUartHandle = INVALID_HANDLE_VALUE;
552 }
553 #elif defined(HOST_LINUX)
554 Base::CloseFd(hUART->devUartHandle);
555 #endif // _WIN32
556 #endif
557 delete hSession->hUART;
558 hSession->hUART = nullptr;
559 }
560 #endif
561 }
562
563 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)564 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
565 {
566 HSession hSession = (HSession)handle->data;
567 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
568 if (hSession->uvHandleRef > 0) {
569 WRITE_LOG(LOG_INFO, "FreeSessionFinally uvHandleRef:%d sessionId:%u",
570 hSession->uvHandleRef, hSession->sessionId);
571 return;
572 }
573 // Notify Server or Daemon, just UI or display commandline
574 thisClass->NotifyInstanceSessionFree(hSession, true);
575 // all hsession uv handle has been clear
576 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
577 WRITE_LOG(LOG_INFO, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
578 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
579 delete hSession;
580 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
581 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
582 --thisClass->sessionRef;
583 }
584
585 // work when child-work thread finish
FreeSessionContinue(HSession hSession)586 void HdcSessionBase::FreeSessionContinue(HSession hSession)
587 {
588 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
589 HSession hSession = (HSession)handle->data;
590 --hSession->uvHandleRef;
591 Base::TryCloseHandle((uv_handle_t *)handle);
592 if (handle == reinterpret_cast<uv_handle_t *>(hSession->pollHandle[STREAM_MAIN])) {
593 Base::CloseSocketPair(hSession->ctrlFd);
594 free(hSession->pollHandle[STREAM_MAIN]);
595 }
596 };
597 if (hSession->connType == CONN_TCP) {
598 // Turn off TCP to prevent continuing writing
599 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
600 #ifdef _WIN32
601 closesocket(hSession->dataFd[STREAM_WORK]);
602 #else
603 Base::CloseFd(hSession->dataFd[STREAM_WORK]);
604 #endif
605 }
606 hSession->availTailIndex = 0;
607 if (hSession->ioBuf) {
608 delete[] hSession->ioBuf;
609 hSession->ioBuf = nullptr;
610 }
611 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_MAIN], true, closeSessionTCPHandle);
612 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
613 FreeSessionByConnectType(hSession);
614 // finish
615 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
616 }
617
FreeSessionOpeate(uv_timer_t * handle)618 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
619 {
620 StartTraceScope("HdcSessionBase::FreeSessionOpeate");
621 HSession hSession = (HSession)handle->data;
622 #ifdef HDC_HOST
623 if (hSession->hUSB != nullptr
624 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
625 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
626 pUSB->CancelUsbIo(hSession);
627 return;
628 }
629 #endif
630 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
631 if (hSession->ref > 0) {
632 WRITE_LOG(LOG_WARN, "FreeSessionOpeate sid:%u ref:%u > 0", hSession->sessionId, uint32_t(hSession->ref));
633 return;
634 }
635 WRITE_LOG(LOG_INFO, "FreeSessionOpeate sid:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
636 // wait workthread to free
637 if (hSession->pollHandle[STREAM_WORK]->loop) {
638 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
639 Base::SendToPollFd(hSession->ctrlFd[STREAM_MAIN], ctrl.data(), ctrl.size());
640 WRITE_LOG(LOG_INFO, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
641 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
642 HSession hSession = (HSession)handle->data;
643 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
644 if (!hSession->childCleared) {
645 WRITE_LOG(LOG_INFO, "FreeSessionOpeate childCleared:%d sessionId:%u",
646 hSession->childCleared, hSession->sessionId);
647 return;
648 }
649 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
650 thisClass->FreeSessionContinue(hSession);
651 };
652 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
653 } else {
654 thisClass->FreeSessionContinue(hSession);
655 }
656 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
657 }
658
FreeSession(const uint32_t sessionId)659 void HdcSessionBase::FreeSession(const uint32_t sessionId)
660 {
661 StartTraceScope("HdcSessionBase::FreeSession");
662 Base::AddDeletedSessionId(sessionId);
663 if (threadSessionMain != uv_thread_self()) {
664 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
665 return;
666 }
667 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
668 WRITE_LOG(LOG_INFO, "Begin to free session, sessionid:%u", sessionId);
669 do {
670 if (!hSession || hSession->isDead) {
671 WRITE_LOG(LOG_WARN, "FreeSession hSession nullptr or isDead sessionId:%u", sessionId);
672 break;
673 }
674 WRITE_LOG(LOG_INFO, "dataFdSend:%llu, dataFdRecv:%llu",
675 uint64_t(hSession->stat.dataSendBytes),
676 uint64_t(hSession->stat.dataRecvBytes));
677 hSession->isDead = true;
678 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
679 NotifyInstanceSessionFree(hSession, false);
680 WRITE_LOG(LOG_INFO, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
681 } while (false);
682 }
683
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)684 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
685 {
686 HSession hRet = nullptr;
687 switch (op) {
688 case OP_ADD:
689 uv_rwlock_wrlock(&lockMapSession);
690 mapSession[sessionId] = hInput;
691 uv_rwlock_wrunlock(&lockMapSession);
692 break;
693 case OP_REMOVE:
694 uv_rwlock_wrlock(&lockMapSession);
695 mapSession.erase(sessionId);
696 uv_rwlock_wrunlock(&lockMapSession);
697 break;
698 case OP_QUERY:
699 uv_rwlock_rdlock(&lockMapSession);
700 if (mapSession.count(sessionId)) {
701 hRet = mapSession[sessionId];
702 }
703 uv_rwlock_rdunlock(&lockMapSession);
704 break;
705 case OP_QUERY_REF:
706 uv_rwlock_wrlock(&lockMapSession);
707 if (mapSession.count(sessionId)) {
708 hRet = mapSession[sessionId];
709 ++hRet->ref;
710 }
711 uv_rwlock_wrunlock(&lockMapSession);
712 break;
713 case OP_UPDATE:
714 uv_rwlock_wrlock(&lockMapSession);
715 // remove old
716 mapSession.erase(sessionId);
717 mapSession[hInput->sessionId] = hInput;
718 uv_rwlock_wrunlock(&lockMapSession);
719 break;
720 case OP_VOTE_RESET:
721 if (mapSession.count(sessionId) == 0) {
722 break;
723 }
724 bool needReset;
725 if (serverOrDaemon) {
726 uv_rwlock_wrlock(&lockMapSession);
727 hRet = mapSession[sessionId];
728 hRet->voteReset = true;
729 needReset = true;
730 for (auto &kv : mapSession) {
731 if (sessionId == kv.first) {
732 continue;
733 }
734 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
735 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
736 if (!kv.second->voteReset) {
737 needReset = false;
738 }
739 }
740 uv_rwlock_wrunlock(&lockMapSession);
741 } else {
742 needReset = true;
743 }
744 if (needReset) {
745 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
746 abort();
747 }
748 break;
749 default:
750 break;
751 }
752 return hRet;
753 }
754
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)755 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
756 {
757 int idx = 1;
758 for (auto t : mapTask) {
759 HTaskInfo ti = t.second;
760 WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
761 idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
762 }
763 }
764
765 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)766 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
767 {
768 HTaskInfo hRet = nullptr;
769 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
770
771 switch (op) {
772 case OP_ADD:
773 hRet = mapTask[channelId];
774 if (hRet != nullptr) {
775 delete hRet;
776 }
777 mapTask[channelId] = hInput;
778 hRet = hInput;
779
780 WRITE_LOG(LOG_WARN, "AdminTask add session %u, channelId %u, mapTask size: %zu",
781 hSession->sessionId, channelId, mapTask.size());
782
783 break;
784 case OP_REMOVE:
785 mapTask.erase(channelId);
786 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
787 hSession->sessionId, channelId, mapTask.size());
788 break;
789 case OP_QUERY:
790 if (mapTask.count(channelId)) {
791 hRet = mapTask[channelId];
792 }
793 break;
794 case OP_VOTE_RESET:
795 AdminSession(op, hSession->sessionId, nullptr);
796 break;
797 default:
798 break;
799 }
800 return hRet;
801 }
802
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)803 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo)
804 {
805 StartTraceScope("HdcSessionBase::SendByProtocol");
806 if (hSession->isDead) {
807 delete[] bufPtr;
808 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
809 return ERR_SESSION_NOFOUND;
810 }
811 int ret = 0;
812 switch (hSession->connType) {
813 case CONN_TCP: {
814 HdcTCPBase *pTCP = ((HdcTCPBase *)hSession->classModule);
815 if (echo && !hSession->serverOrDaemon) {
816 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
817 } else {
818 if (hSession->hWorkThread == uv_thread_self()) {
819 ret = pTCP->WriteUvTcpFd(&hSession->hWorkTCP, bufPtr, bufLen);
820 } else {
821 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
822 }
823 }
824 break;
825 }
826 case CONN_USB: {
827 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
828 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
829 delete[] bufPtr;
830 break;
831 }
832 #ifdef HDC_SUPPORT_UART
833 case CONN_SERIAL: {
834 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
835 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
836 delete[] bufPtr;
837 break;
838 }
839 #endif
840 default:
841 delete[] bufPtr;
842 break;
843 }
844 return ret;
845 }
846
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)847 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
848 const uint8_t *data, const int dataSize)
849 {
850 StartTraceScope("HdcSessionBase::Send");
851 HSession hSession = AdminSession(OP_QUERY_REF, sessionId, nullptr);
852 if (!hSession) {
853 WRITE_LOG(LOG_WARN, "Send to offline device, drop it, sessionId:%u", sessionId);
854 return ERR_SESSION_NOFOUND;
855 }
856 PayloadProtect protectBuf; // noneed convert to big-endian
857 protectBuf.channelId = channelId;
858 protectBuf.commandFlag = commandFlag;
859 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
860 protectBuf.vCode = payloadProtectStaticVcode;
861 string s = SerialStruct::SerializeToString(protectBuf);
862 // reserve for encrypt here
863 // xx-encrypt
864
865 PayloadHead payloadHead = {}; // need convert to big-endian
866 payloadHead.flag[0] = PACKET_FLAG.at(0);
867 payloadHead.flag[1] = PACKET_FLAG.at(1);
868 payloadHead.protocolVer = VER_PROTOCOL;
869 payloadHead.headSize = htons(s.size());
870 payloadHead.dataSize = htonl(dataSize);
871 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
872 uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
873 if (finayBuf == nullptr) {
874 WRITE_LOG(LOG_WARN, "send allocmem err");
875 --hSession->ref;
876 return ERR_BUF_ALLOC;
877 }
878 bool bufRet = false;
879 do {
880 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
881 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
882 break;
883 }
884 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
885 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
886 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
887 break;
888 }
889 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
890 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
891 break;
892 }
893 bufRet = true;
894 } while (false);
895 if (!bufRet) {
896 delete[] finayBuf;
897 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
898 --hSession->ref;
899 return ERR_BUF_COPY;
900 }
901 int ret = -1;
902 if (CMD_KERNEL_ECHO == commandFlag) {
903 ret = SendByProtocol(hSession, finayBuf, finalBufSize, true);
904 } else {
905 ret = SendByProtocol(hSession, finayBuf, finalBufSize);
906 }
907 --hSession->ref;
908 return ret;
909 }
910
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)911 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
912 {
913 StartTraceScope("HdcSessionBase::DecryptPayload");
914 PayloadProtect protectBuf = {};
915 uint16_t headSize = ntohs(payloadHeadBe->headSize);
916 int dataSize = ntohl(payloadHeadBe->dataSize);
917 string encString(reinterpret_cast<char *>(encBuf), headSize);
918 SerialStruct::ParseFromString(protectBuf, encString);
919 if (protectBuf.vCode != payloadProtectStaticVcode) {
920 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
921 return ERR_BUF_CHECK;
922 }
923 uint8_t *data = encBuf + headSize;
924 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
925 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
926 return ERR_BUF_CHECK;
927 }
928 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
929 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
930 protectBuf.channelId, protectBuf.commandFlag);
931 return ERR_GENERIC;
932 }
933 return RET_SUCCESS;
934 }
935
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)936 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
937 {
938 int ret = ERR_GENERIC;
939 StartTraceScope("HdcSessionBase::OnRead");
940 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
941 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
942 return ERR_BUF_CHECK;
943 }
944 struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
945 // to prevent integer overflow caused by the add operation of two input num
946 uint64_t payloadHeadSize = static_cast<uint64_t>(ntohl(payloadHead->dataSize)) +
947 static_cast<uint64_t>(ntohs(payloadHead->headSize));
948 int packetHeadSize = sizeof(struct PayloadHead);
949 if (payloadHeadSize == 0 || payloadHeadSize > static_cast<uint64_t>(HDC_BUF_MAX_BYTES)) {
950 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
951 return ERR_BUF_CHECK;
952 }
953
954 // 0 < payloadHeadSize < HDC_BUF_MAX_BYTES
955 int tobeReadLen = static_cast<int>(payloadHeadSize);
956 if (bufLen - packetHeadSize < tobeReadLen) {
957 return 0;
958 }
959 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
960 WRITE_LOG(LOG_WARN, "decrypt plhead error");
961 return ERR_BUF_CHECK;
962 }
963 ret = packetHeadSize + tobeReadLen;
964 return ret;
965 }
966
967 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)968 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
969 {
970 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
971 int indexBuf = 0;
972 int childRet = 0;
973 StartTraceScope("HdcSessionBase::FetchIOBuf");
974 if (read < 0) {
975 constexpr int bufSize = 1024;
976 char buf[bufSize] = { 0 };
977 uv_strerror_r(read, buf, bufSize);
978 WRITE_LOG(LOG_FATAL, "FetchIOBuf read io failed,%s", buf);
979 return ERR_IO_FAIL;
980 }
981 hSession->heartbeat.AddMessageCount();
982 hSession->stat.dataRecvBytes += read;
983 hSession->availTailIndex += read;
984 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
985 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
986 if (childRet > 0) {
987 hSession->availTailIndex -= childRet;
988 indexBuf += childRet;
989 } else if (childRet == 0) {
990 // Not enough a IO
991 break;
992 } else { // <0
993 WRITE_LOG(LOG_FATAL, "FetchIOBuf error childRet:%d sessionId:%u", childRet, hSession->sessionId);
994 hSession->availTailIndex = 0; // Preventing malicious data packages
995 indexBuf = ERR_BUF_SIZE;
996 break;
997 }
998 // It may be multi-time IO to merge in a BUF, need to loop processing
999 }
1000 if (indexBuf > 0 && hSession->availTailIndex > 0) {
1001 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
1002 != EOK) {
1003 return ERR_BUF_COPY;
1004 };
1005 uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
1006 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
1007 }
1008 return indexBuf;
1009 }
1010
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)1011 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
1012 {
1013 HSession context = (HSession)handle->data;
1014 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
1015 buf->base = (char *)context->ioBuf + context->availTailIndex;
1016 int size = context->bufSize - context->availTailIndex;
1017 buf->len = std::min(size, static_cast<int>(sizeWanted));
1018 }
1019
FinishWriteSessionTCP(uv_write_t * req,int status)1020 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
1021 {
1022 HSession hSession = (HSession)req->handle->data;
1023 --hSession->ref;
1024 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1025 if (status < 0) {
1026 WRITE_LOG(LOG_WARN, "FinishWriteSessionTCP status:%d sessionId:%u isDead:%d ref:%u",
1027 status, hSession->sessionId, hSession->isDead, uint32_t(hSession->ref));
1028 Base::TryCloseHandle((uv_handle_t *)req->handle);
1029 if (!hSession->isDead && !hSession->ref) {
1030 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%u", hSession->sessionId);
1031 thisClass->FreeSession(hSession->sessionId);
1032 }
1033 }
1034 delete[]((uint8_t *)req->data);
1035 delete req;
1036 }
1037
DispatchSessionThreadCommand(HSession hSession,const uint8_t * baseBuf,const int bytesIO)1038 bool HdcSessionBase::DispatchSessionThreadCommand(HSession hSession, const uint8_t *baseBuf,
1039 const int bytesIO)
1040 {
1041 bool ret = true;
1042 uint8_t flag = *const_cast<uint8_t *>(baseBuf);
1043
1044 switch (flag) {
1045 case SP_JDWP_NEWFD:
1046 case SP_ARK_NEWFD: {
1047 JdwpNewFileDescriptor(baseBuf, bytesIO);
1048 break;
1049 }
1050 default:
1051 WRITE_LOG(LOG_WARN, "Not support session command");
1052 break;
1053 }
1054 return ret;
1055 }
1056
ReadCtrlFromSession(uv_poll_t * poll,int status,int events)1057 void HdcSessionBase::ReadCtrlFromSession(uv_poll_t *poll, int status, int events)
1058 {
1059 HSession hSession = (HSession)poll->data;
1060 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1061 const int size = Base::GetMaxBufSizeStable();
1062 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1063 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_MAIN], buf, size);
1064 while (true) {
1065 if (nread < 0) {
1066 constexpr int bufSize = 1024;
1067 char buffer[bufSize] = { 0 };
1068 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1069 WRITE_LOG(LOG_WARN, "ReadCtrlFromSession failed,%s", buffer);
1070 uv_poll_stop(poll);
1071 break;
1072 }
1073 if (nread == 0) {
1074 WRITE_LOG(LOG_FATAL, "ReadCtrlFromSession read data zero byte");
1075 break;
1076 }
1077 // only one command, no need to split command from stream
1078 // if add more commands, consider the format command
1079 hSessionBase->DispatchSessionThreadCommand(hSession, reinterpret_cast<uint8_t *>(buf), nread);
1080 break;
1081 }
1082 delete[] buf;
1083 }
1084
SetHeartbeatFeature(SessionHandShake & handshake)1085 void HdcSessionBase::SetHeartbeatFeature(SessionHandShake &handshake)
1086 {
1087 if (!Base::GetheartbeatSwitch()) {
1088 return;
1089 }
1090 // told daemon, we support features
1091 Base::TlvAppend(handshake.buf, TAG_SUPPORT_FEATURE, Base::FeatureToString(Base::GetSupportFeature()));
1092 }
1093
WorkThreadInitSession(HSession hSession,SessionHandShake & handshake)1094 void HdcSessionBase::WorkThreadInitSession(HSession hSession, SessionHandShake &handshake)
1095 {
1096 handshake.banner = HANDSHAKE_MESSAGE;
1097 handshake.sessionId = hSession->sessionId;
1098 handshake.connectKey = hSession->connectKey;
1099 if (!hSession->isCheck) {
1100 handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1101 WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1102 }
1103 handshake.authType = AUTH_NONE;
1104 // told daemon, we support RSA_3072_SHA512 auth
1105 Base::TlvAppend(handshake.buf, TAG_AUTH_TYPE, std::to_string(AuthVerifyType::RSA_3072_SHA512));
1106 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1107 hSessionBase->SetHeartbeatFeature(handshake);
1108 }
1109
WorkThreadStartSession(HSession hSession)1110 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1111 {
1112 bool regOK = false;
1113 int childRet = 0;
1114 if (hSession->connType == CONN_TCP) {
1115 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1116 hSession->hChildWorkTCP.data = hSession;
1117 if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1118 WRITE_LOG(LOG_WARN, "HdcSessionBase SessionCtrl failed 1");
1119 return false;
1120 }
1121 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1122 constexpr int bufSize = 1024;
1123 char buf[bufSize] = { 0 };
1124 uv_strerror_r(childRet, buf, bufSize);
1125 WRITE_LOG(LOG_WARN, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1126 return false;
1127 }
1128 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1129 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1130 regOK = true;
1131 #ifdef HDC_SUPPORT_UART
1132 } else if (hSession->connType == CONN_SERIAL) { // UART
1133 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1134 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1135 regOK = pUARTBase->ReadyForWorkThread(hSession);
1136 #endif
1137 } else { // USB
1138 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1139 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1140 regOK = pUSBBase->ReadyForWorkThread(hSession);
1141 }
1142
1143 if (regOK && hSession->serverOrDaemon) {
1144 // session handshake step1
1145 SessionHandShake handshake = {};
1146 WorkThreadInitSession(hSession, handshake);
1147 string hs = SerialStruct::SerializeToString(handshake);
1148 #ifdef HDC_SUPPORT_UART
1149 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1150 hSession->sessionId, handshake.authType, hs.c_str());
1151 #endif
1152 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1153 reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1154 }
1155 return regOK;
1156 }
1157
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1158 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1159 int dataSize)
1160 {
1161 vector<uint8_t> ret;
1162 while (true) {
1163 if (dataSize > BUF_SIZE_MICRO) {
1164 WRITE_LOG(LOG_WARN, "BuildCtrlString dataSize:%d", dataSize);
1165 break;
1166 }
1167 CtrlStruct ctrl = {};
1168 ctrl.command = command;
1169 ctrl.channelId = channelId;
1170 ctrl.dataSize = dataSize;
1171 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1172 break;
1173 }
1174 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1175 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1176 break;
1177 }
1178 return ret;
1179 }
1180
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1181 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1182 {
1183 bool ret = true;
1184 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1185 switch (ctrl->command) {
1186 case SP_START_SESSION: {
1187 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1188 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1189 ret = WorkThreadStartSession(hSession);
1190 break;
1191 }
1192 case SP_STOP_SESSION: {
1193 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1194 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1195 hSessionBase->StopHeartbeatWork(hSession);
1196 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1197 HSession hSession = (HSession)handle->data;
1198 Base::TryCloseHandle((uv_handle_t *)handle);
1199 if (handle == (uv_handle_t *)hSession->pollHandle[STREAM_WORK]) {
1200 free(hSession->pollHandle[STREAM_WORK]);
1201 }
1202 if (--hSession->uvChildRef == 0) {
1203 uv_stop(&hSession->childLoop);
1204 };
1205 };
1206 constexpr int uvChildRefOffset = 2;
1207 hSession->uvChildRef += uvChildRefOffset;
1208 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1209 ++hSession->uvChildRef;
1210 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1211 }
1212 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_WORK], true,
1213 closeSessionChildThreadTCPHandle);
1214 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1215 closeSessionChildThreadTCPHandle);
1216 break;
1217 }
1218 case SP_ATTACH_CHANNEL: {
1219 if (!serverOrDaemon) {
1220 break; // Only Server has this feature
1221 }
1222 AttachChannel(hSession, channelId);
1223 break;
1224 }
1225 case SP_DEATCH_CHANNEL: {
1226 if (!serverOrDaemon) {
1227 break; // Only Server has this feature
1228 }
1229 DeatchChannel(hSession, channelId);
1230 break;
1231 }
1232 default:
1233 WRITE_LOG(LOG_WARN, "Not support main command");
1234 ret = false;
1235 break;
1236 }
1237 return ret;
1238 }
1239
1240 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_poll_t * poll,int status,int events)1241 void HdcSessionBase::ReadCtrlFromMain(uv_poll_t *poll, int status, int events)
1242 {
1243 HSession hSession = (HSession)poll->data;
1244 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1245 int formatCommandSize = sizeof(CtrlStruct);
1246 int index = 0;
1247 const int size = Base::GetMaxBufSizeStable();
1248 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1249 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_WORK], buf, size);
1250 while (true) {
1251 if (nread < 0) {
1252 constexpr int bufSize = 1024;
1253 char buffer[bufSize] = { 0 };
1254 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1255 WRITE_LOG(LOG_WARN, "SessionCtrl failed,%s", buffer);
1256 break;
1257 }
1258 if (nread % formatCommandSize != 0) {
1259 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1260 break;
1261 }
1262 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf + index);
1263 if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1264 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1265 hSession->sessionId, ctrl->channelId, ctrl->command);
1266 break;
1267 }
1268 index += sizeof(CtrlStruct);
1269 if (index >= nread) {
1270 break;
1271 }
1272 }
1273 delete[] buf;
1274 }
1275
ReChildLoopForSessionClear(HSession hSession)1276 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1277 {
1278 // Restart loop close task
1279 ClearOwnTasks(hSession, 0);
1280 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1281 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1282 HSession hSession = (HSession)handle->data;
1283 hSession->clearTaskTimes++;
1284 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1285 if (thisClass->taskCount && (hSession->clearTaskTimes < 50)) { //50: 50 * 120ms = 6s
1286 return;
1287 }
1288 if (hSession->clearTaskTimes >= 50) { //50: 50 * 120ms = 6s
1289 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear exited due to timeout of 6s");
1290 }
1291 // all task has been free
1292 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1293 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1294 };
1295 Base::TimerUvTask(
1296 &hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1297 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1298 // clear
1299 Base::TryCloseChildLoop(&hSession->childLoop, "Session childUV");
1300 }
1301
1302 //stop Heartbeat
StopHeartbeatWork(HSession hSession)1303 void HdcSessionBase::StopHeartbeatWork(HSession hSession)
1304 {
1305 hSession->heartbeat.SetSupportHeartbeat(false);
1306 uv_timer_stop(&hSession->heartbeatTimer);
1307 }
1308
1309 // send heartbeat msg
SendHeartbeatMsg(uv_timer_t * handle)1310 void HdcSessionBase::SendHeartbeatMsg(uv_timer_t *handle)
1311 {
1312 HSession hSession = (HSession)handle->data;
1313 if (!hSession->heartbeat.GetSupportHeartbeat()) {
1314 WRITE_LOG(LOG_INFO, "session %u not support heatbeat", hSession->sessionId);
1315 if (hSession->handshakeOK) {
1316 uv_timer_stop(&hSession->heartbeatTimer);
1317 }
1318 return;
1319 }
1320
1321 std::string str = hSession->heartbeat.ToString();
1322 WRITE_LOG(LOG_INFO, "send %s for session %u", str.c_str(), hSession->sessionId);
1323
1324 HeartbeatMsg heartbeat = {};
1325 heartbeat.heartbeatCount = hSession->heartbeat.GetHeartbeatCount();
1326 hSession->heartbeat.AddHeartbeatCount();
1327 string s = SerialStruct::SerializeToString(heartbeat);
1328 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1329 thisClass->Send(hSession->sessionId, 0, CMD_HEARTBEAT_MSG,
1330 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size());
1331 }
1332
StartHeartbeatWork(HSession hSession)1333 void HdcSessionBase::StartHeartbeatWork(HSession hSession)
1334 {
1335 if (!Base::GetheartbeatSwitch()) {
1336 return;
1337 }
1338 WRITE_LOG(LOG_DEBUG, "StartHeartbeatWork");
1339 hSession->heartbeatTimer.data = hSession;
1340 uv_timer_init(&hSession->childLoop, &hSession->heartbeatTimer);
1341 uv_timer_start(&hSession->heartbeatTimer, SendHeartbeatMsg, 0, HEARTBEAT_INTERVAL);
1342 }
1343
SessionWorkThread(uv_work_t * arg)1344 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1345 {
1346 HSession hSession = (HSession)arg->data;
1347 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1348 hSession->hWorkChildThread = uv_thread_self();
1349
1350 uv_poll_t *pollHandle = hSession->pollHandle[STREAM_WORK];
1351 pollHandle->data = hSession;
1352 int initResult = uv_poll_init_socket(&hSession->childLoop, pollHandle, hSession->ctrlFd[STREAM_WORK]);
1353 if (initResult != 0) {
1354 WRITE_LOG(LOG_FATAL, "SessionWorkThread init pollHandle->loop failed");
1355 _exit(0);
1356 }
1357 uv_poll_start(pollHandle, UV_READABLE, ReadCtrlFromMain);
1358 // start heartbeat rimer
1359 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1360 hSessionBase->StartHeartbeatWork(hSession);
1361 WRITE_LOG(LOG_INFO, "!!!Workthread run begin, sid:%u instance:%s", hSession->sessionId,
1362 thisClass->serverOrDaemon ? "server" : "daemon");
1363 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1364 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1365 // main loop has exit
1366 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1367 hSession->childCleared = true;
1368 WRITE_LOG(LOG_WARN, "!!!Workthread run finish, sessionId:%u", hSession->sessionId);
1369 }
1370
1371 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1372 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1373 MessageLevel level, const char *msg, ...)
1374 // clang-format on
1375 {
1376 va_list vaArgs;
1377 va_start(vaArgs, msg);
1378 string log = Base::StringFormat(msg, vaArgs);
1379 va_end(vaArgs);
1380 vector<uint8_t> buf;
1381 buf.push_back(level);
1382 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1383 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1384 }
1385
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1386 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1387 {
1388 // referer from HdcServerForClient::DoCommandRemote
1389 bool ret = false;
1390 bool taskMasterInit = false;
1391 masterTask = false;
1392 switch (command) {
1393 case CMD_FILE_INIT:
1394 case CMD_FLASHD_FLASH_INIT:
1395 case CMD_FLASHD_UPDATE_INIT:
1396 case CMD_FLASHD_ERASE:
1397 case CMD_FLASHD_FORMAT:
1398 case CMD_FORWARD_INIT:
1399 case CMD_APP_INIT:
1400 case CMD_APP_UNINSTALL:
1401 case CMD_APP_SIDELOAD:
1402 taskMasterInit = true;
1403 break;
1404 default:
1405 break;
1406 }
1407 if (!serverOrDaemon &&
1408 (command == CMD_SHELL_INIT || command == CMD_UNITY_EXECUTE_EX ||
1409 (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1410 // daemon's single side command
1411 ret = true;
1412 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1413 // slave tasks
1414 ret = true;
1415 } else if (command == CMD_UNITY_BUGREPORT_INIT) {
1416 ret = true;
1417 } else if (taskMasterInit) {
1418 // task init command
1419 masterTask = true;
1420 ret = true;
1421 }
1422 return ret;
1423 }
1424 // Heavy and time-consuming work was putted in the new thread to do, and does
1425 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1426 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1427 uint8_t *payload, int payloadSize)
1428 {
1429 StartTraceScope("HdcSessionBase::DispatchTaskData");
1430 bool ret = true;
1431 HTaskInfo hTaskInfo = nullptr;
1432 bool masterTask = false;
1433 while (true) {
1434 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1435 if (NeedNewTaskInfo(command, masterTask)) {
1436 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1437 hTaskInfo = new(std::nothrow) TaskInformation();
1438 if (hTaskInfo == nullptr) {
1439 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1440 break;
1441 }
1442 hTaskInfo->channelId = channelId;
1443 hTaskInfo->sessionId = hSession->sessionId;
1444 hTaskInfo->runLoop = &hSession->childLoop;
1445 hTaskInfo->serverOrDaemon = serverOrDaemon;
1446 hTaskInfo->masterSlave = masterTask;
1447 hTaskInfo->closeRetryCount = 0;
1448 hTaskInfo->channelTask = false;
1449 hTaskInfo->isCleared = false;
1450
1451 int addTaskRetry = 3; // try 3 time
1452 while (addTaskRetry > 0) {
1453 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1454 break;
1455 }
1456 sleep(1);
1457 --addTaskRetry;
1458 }
1459
1460 if (addTaskRetry == 0) {
1461 #ifndef HDC_HOST
1462 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId,
1463 MSG_FAIL, "hdc thread pool busy, may cause reset later");
1464 #endif
1465 delete hTaskInfo;
1466 hTaskInfo = nullptr;
1467 ret = false;
1468 break;
1469 }
1470 } else {
1471 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1472 }
1473 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1474 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1475 break;
1476 }
1477 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1478 break;
1479 }
1480 return ret;
1481 }
1482
PostStopInstanceMessage(bool restart)1483 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1484 {
1485 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1486 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1487 wantRestart = restart;
1488 }
1489
ParsePeerSupportFeatures(HSession & hSession,std::map<std::string,std::string> & tlvMap)1490 void HdcSessionBase::ParsePeerSupportFeatures(HSession &hSession, std::map<std::string, std::string> &tlvMap)
1491 {
1492 if (hSession == nullptr) {
1493 WRITE_LOG(LOG_FATAL, "Invalid paramter, hSession is null");
1494 return;
1495 }
1496
1497 if (tlvMap.find(TAG_SUPPORT_FEATURE) != tlvMap.end()) {
1498 std::vector<std::string> features;
1499 WRITE_LOG(LOG_INFO, "peer support features are %s for session %u",
1500 tlvMap[TAG_SUPPORT_FEATURE].c_str(), hSession->sessionId);
1501 Base::SplitString(tlvMap[TAG_SUPPORT_FEATURE], ",", features);
1502 hSession->heartbeat.SetSupportHeartbeat(Base::IsSupportFeature(features, FEATURE_HEARTBEAT));
1503 }
1504 }
1505 } // namespace Hdc
1506