1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24 // print version pid
25 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26 // server/daemon common initialization code
27 if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28 uvThreadSize = SIZE_THREAD_POOL_MIN;
29 } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30 uvThreadSize = SIZE_THREAD_POOL_MAX;
31 }
32 threadPoolCount = uvThreadSize;
33 WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34 string uvThreadEnv("UV_THREADPOOL_SIZE");
35 string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37 uvThreadEnv += "=";
38 uvThreadEnv += uvThreadVal;
39 _putenv(uvThreadEnv.c_str());
40 #else
41 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43 uv_loop_init(&loopMain);
44 WRITE_LOG(LOG_DEBUG, "loopMain init");
45 uv_rwlock_init(&mainAsync);
46 #ifndef FUZZ_TEST
47 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
48 #endif
49 uv_rwlock_init(&lockMapSession);
50 serverOrDaemon = serverOrDaemonIn;
51 ctxUSB = nullptr;
52 wantRestart = false;
53 threadSessionMain = uv_thread_self();
54
55 #ifdef HDC_HOST
56 if (serverOrDaemon) {
57 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
58 ctxUSB = nullptr;
59 WRITE_LOG(LOG_FATAL, "libusb_init failed ctxUSB is nullptr");
60 }
61 }
62 #endif
63 }
64
~HdcSessionBase()65 HdcSessionBase::~HdcSessionBase()
66 {
67 #ifndef FUZZ_TEST
68 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
69 #endif
70 uv_loop_close(&loopMain);
71 // clear base
72 uv_rwlock_destroy(&mainAsync);
73 uv_rwlock_destroy(&lockMapSession);
74 #ifdef HDC_HOST
75 if (serverOrDaemon and ctxUSB != nullptr) {
76 libusb_exit((libusb_context *)ctxUSB);
77 }
78 #endif
79 WRITE_LOG(LOG_WARN, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
80 serverOrDaemon ? "server" : "daemon");
81 }
82
83 // remove step2
TryRemoveTask(HTaskInfo hTask)84 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
85 {
86 if (hTask->taskFree) {
87 WRITE_LOG(LOG_WARN, "TryRemoveTask channelId:%u", hTask->channelId);
88 return true;
89 }
90 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
91 if (ret) {
92 hTask->taskFree = true;
93 } else {
94 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
95 // here to see which task has not been released
96 // print task clear
97 }
98 return ret;
99 }
100
101 // remove step1
BeginRemoveTask(HTaskInfo hTask)102 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
103 {
104 StartTraceScope("HdcSessionBase::BeginRemoveTask");
105 if (hTask->taskStop || hTask->taskFree) {
106 WRITE_LOG(LOG_WARN, "BeginRemoveTask channelId:%u taskStop:%d taskFree:%d",
107 hTask->channelId, hTask->taskStop, hTask->taskFree);
108 return;
109 }
110
111 WRITE_LOG(LOG_WARN, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
112 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
113 StartTraceScope("HdcSessionBase::BeginRemoveTask taskClassDeleteRetry");
114 HTaskInfo hTask = (HTaskInfo)handle->data;
115 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
116 if (hTask->isCleared == false) {
117 hTask->isCleared = true;
118 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry start clear task, taskType:%d cid:%u sid:%u",
119 hTask->taskType, hTask->channelId, hTask->sessionId);
120 bool ret = thisClass->RemoveInstanceTask(OP_CLEAR, hTask);
121 if (!ret) {
122 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry RemoveInstanceTask return false taskType:%d cid:%u sid:%u",
123 hTask->taskType, hTask->channelId, hTask->sessionId);
124 }
125 }
126
127 constexpr uint32_t count = 1000;
128 if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
129 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
130 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
131 hTask->closeRetryCount = 1;
132 }
133 hTask->closeRetryCount++;
134 if (!thisClass->TryRemoveTask(hTask)) {
135 WRITE_LOG(LOG_WARN, "TaskDelay TryRemoveTask false channelId:%u", hTask->channelId);
136 return;
137 }
138 WRITE_LOG(LOG_WARN, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
139 if (hTask != nullptr) {
140 delete hTask;
141 hTask = nullptr;
142 }
143 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
144 };
145 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
146
147 hTask->taskStop = true;
148 }
149
150 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
151 // after the end of the LOOP.
152 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
153 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)154 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
155 {
156 // First case: normal task cleanup process (STOP Remove)
157 // Second: The task is cleaned up, the session ends
158 // Third: The task is cleaned up, and the session is directly over the session.
159 StartTraceScope("HdcSessionBase::ClearOwnTasks");
160 hSession->mapTaskMutex.lock();
161 map<uint32_t, HTaskInfo>::iterator iter;
162 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
163 uint32_t channelId = iter->first;
164 HTaskInfo hTask = iter->second;
165 if (channelIDInput != 0) { // single
166 if (channelIDInput != channelId) {
167 ++iter;
168 continue;
169 }
170 BeginRemoveTask(hTask);
171 WRITE_LOG(LOG_WARN, "ClearOwnTasks OP_CLEAR finish, sessionId:%u channelIDInput:%u",
172 hSession->sessionId, channelIDInput);
173 iter = hSession->mapTask->erase(iter);
174 break;
175 }
176 // multi
177 BeginRemoveTask(hTask);
178 iter = hSession->mapTask->erase(iter);
179 }
180 hSession->mapTaskMutex.unlock();
181 }
182
ClearSessions()183 void HdcSessionBase::ClearSessions()
184 {
185 // no need to lock mapSession
186 // broadcast free signal
187 for (auto v : mapSession) {
188 HSession hSession = (HSession)v.second;
189 if (!hSession->isDead) {
190 FreeSession(hSession->sessionId);
191 }
192 }
193 }
194
ReMainLoopForInstanceClear()195 void HdcSessionBase::ReMainLoopForInstanceClear()
196 { // reloop
197 StartTraceScope("HdcSessionBase::ReMainLoopForInstanceClear");
198 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
199 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
200 if (thisClass->sessionRef > 0) {
201 return;
202 }
203 // all task has been free
204 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
205 uv_stop(&thisClass->loopMain);
206 };
207 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
208 uv_run(&loopMain, UV_RUN_DEFAULT);
209 };
210
211 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)212 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
213 {
214 uv_rwlock_rdlock(&lockMapSession);
215 map<uint32_t, HSession>::iterator i;
216 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
217 HSession hs = i->second;
218 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
219 continue;
220 }
221 kickOut(hs);
222 break;
223 }
224 uv_rwlock_rdunlock(&lockMapSession);
225 }
226 #endif
227
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))228 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
229 {
230 if (!pCallBack) {
231 return;
232 }
233 uv_rwlock_rdlock(&lockMapSession);
234 map<uint32_t, HSession>::iterator i;
235 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
236 HSession hs = i->second;
237 if (hs->connType != CONN_USB) {
238 continue;
239 }
240 if (hs->hUSB == nullptr) {
241 continue;
242 }
243 if (pCallBack) {
244 pCallBack(hs);
245 }
246 break;
247 }
248 uv_rwlock_rdunlock(&lockMapSession);
249 }
250
251 // The PC side gives the device information, determines if the USB device is registered
252 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)253 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
254 {
255 #ifdef HDC_HOST
256 libusb_device *dev = (libusb_device *)pDev;
257 HSession hResult = nullptr;
258 if (!mapSession.size()) {
259 return nullptr;
260 }
261 uint8_t busId = 0;
262 uint8_t devId = 0;
263 if (pDev) {
264 busId = libusb_get_bus_number(dev);
265 devId = libusb_get_device_address(dev);
266 } else {
267 busId = busIDIn;
268 devId = devIDIn;
269 }
270 uv_rwlock_rdlock(&lockMapSession);
271 map<uint32_t, HSession>::iterator i;
272 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
273 HSession hs = i->second;
274 if (hs->connType == CONN_USB) {
275 continue;
276 }
277 if (hs->hUSB == nullptr) {
278 continue;
279 }
280 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
281 continue;
282 }
283 hResult = hs;
284 break;
285 }
286 uv_rwlock_rdunlock(&lockMapSession);
287 return hResult;
288 #else
289 return nullptr;
290 #endif
291 }
292
AsyncMainLoopTask(uv_idle_t * handle)293 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
294 {
295 AsyncParam *param = (AsyncParam *)handle->data;
296 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
297 switch (param->method) {
298 case ASYNC_FREE_SESSION:
299 // Destruction is unified in the main thread
300 thisClass->FreeSession(param->sid);
301 break;
302 case ASYNC_STOP_MAINLOOP:
303 uv_stop(&thisClass->loopMain);
304 break;
305 default:
306 break;
307 }
308 if (param->data) {
309 delete[]((uint8_t *)param->data);
310 }
311 delete param;
312 param = nullptr;
313 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
314 }
315
MainAsyncCallback(uv_async_t * handle)316 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
317 {
318 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
319 list<void *>::iterator i;
320 list<void *> &lst = thisClass->lstMainThreadOP;
321 uv_rwlock_wrlock(&thisClass->mainAsync);
322 for (i = lst.begin(); i != lst.end();) {
323 AsyncParam *param = (AsyncParam *)*i;
324 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
325 i = lst.erase(i);
326 }
327 uv_rwlock_wrunlock(&thisClass->mainAsync);
328 }
329
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)330 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
331 const int dataSize)
332 {
333 AsyncParam *param = new AsyncParam();
334 if (!param) {
335 return;
336 }
337 param->sid = sessionId;
338 param->thisClass = this;
339 param->method = method;
340 if (dataSize > 0) {
341 param->dataSize = dataSize;
342 param->data = new uint8_t[param->dataSize]();
343 if (!param->data) {
344 delete param;
345 return;
346 }
347 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
348 delete[]((uint8_t *)param->data);
349 delete param;
350 return;
351 }
352 }
353
354 asyncMainLoop.data = this;
355 uv_rwlock_wrlock(&mainAsync);
356 lstMainThreadOP.push_back(param);
357 uv_rwlock_wrunlock(&mainAsync);
358 uv_async_send(&asyncMainLoop);
359 }
360
WorkerPendding()361 void HdcSessionBase::WorkerPendding()
362 {
363 uv_run(&loopMain, UV_RUN_DEFAULT);
364 ClearInstanceResource();
365 }
366
MallocSessionByConnectType(HSession hSession)367 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
368 {
369 int ret = 0;
370 switch (hSession->connType) {
371 case CONN_TCP: {
372 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
373 ++hSession->uvHandleRef;
374 hSession->hWorkTCP.data = hSession;
375 break;
376 }
377 case CONN_USB: {
378 // Some members need to be placed at the primary thread
379 HUSB hUSB = new HdcUSB();
380 if (!hUSB) {
381 ret = -1;
382 break;
383 }
384 hSession->hUSB = hUSB;
385 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
386 break;
387 }
388 #ifdef HDC_SUPPORT_UART
389 case CONN_SERIAL: {
390 HUART hUART = new HdcUART();
391 if (!hUART) {
392 ret = -1;
393 break;
394 }
395 hSession->hUART = hUART;
396 break;
397 }
398 #endif // HDC_SUPPORT_UART
399 default:
400 ret = -1;
401 break;
402 }
403 return ret;
404 }
405
406 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()407 uint32_t HdcSessionBase::GetSessionPseudoUid()
408 {
409 uint32_t uid = 0;
410 do {
411 uid = Base::GetSecureRandom();
412 } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
413 return uid;
414 }
415
416 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)417 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
418 uint32_t sessionId)
419 {
420 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
421 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
422 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
423 #endif
424 HSession hSession = new(std::nothrow) HdcSession();
425 if (!hSession) {
426 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
427 return nullptr;
428 }
429 int ret = 0;
430 ++sessionRef;
431 hSession->classInstance = this;
432 hSession->connType = connType;
433 hSession->classModule = classModule;
434 hSession->isDead = false;
435 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
436 hSession->serverOrDaemon = serverOrDaemon;
437 hSession->hWorkThread = uv_thread_self();
438 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
439 if (hSession->mapTask == nullptr) {
440 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
441 delete hSession;
442 hSession = nullptr;
443 return nullptr;
444 }
445 hSession->listKey = new(std::nothrow) list<void *>;
446 if (hSession->listKey == nullptr) {
447 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
448 delete hSession;
449 hSession = nullptr;
450 return nullptr;
451 }
452 uv_loop_init(&hSession->childLoop);
453 hSession->uvHandleRef = 0;
454 // pullup child
455 WRITE_LOG(LOG_INFO, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
456 hSession->sessionId, hSession->connType);
457 ++hSession->uvHandleRef;
458 Base::CreateSocketPair(hSession->ctrlFd);
459 size_t handleSize = sizeof(uv_poll_t);
460 hSession->pollHandle[STREAM_WORK] = (uv_poll_t *)malloc(handleSize);
461 hSession->pollHandle[STREAM_MAIN] = (uv_poll_t *)malloc(handleSize);
462 uv_poll_t *pollHandleMain = hSession->pollHandle[STREAM_MAIN];
463 if (pollHandleMain == nullptr || hSession->pollHandle[STREAM_WORK] == nullptr) {
464 WRITE_LOG(LOG_FATAL, "MallocSession malloc hSession->pollHandle failed");
465 delete hSession;
466 hSession = nullptr;
467 return nullptr;
468 }
469 uv_poll_init_socket(&loopMain, pollHandleMain, hSession->ctrlFd[STREAM_MAIN]);
470 uv_poll_start(pollHandleMain, UV_READABLE, ReadCtrlFromSession);
471 hSession->pollHandle[STREAM_MAIN]->data = hSession;
472 hSession->pollHandle[STREAM_WORK]->data = hSession;
473 // Activate USB DAEMON's data channel, may not for use
474 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
475 (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
476 0, sizeof(uv_tcp_t));
477 ++hSession->uvHandleRef;
478 Base::CreateSocketPair(hSession->dataFd);
479 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
480 hSession->dataPipe[STREAM_MAIN].data = hSession;
481 hSession->dataPipe[STREAM_WORK].data = hSession;
482 #ifdef HDC_HOST
483 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN], HOST_SOCKETPAIR_SIZE);
484 #else
485 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
486 #endif
487 ret = MallocSessionByConnectType(hSession);
488 if (ret) {
489 delete hSession;
490 hSession = nullptr;
491 } else {
492 AdminSession(OP_ADD, hSession->sessionId, hSession);
493 }
494 return hSession;
495 }
496
FreeSessionByConnectType(HSession hSession)497 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
498 {
499 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
500
501 if (hSession->connType == CONN_USB) {
502 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
503 if (!hSession->hUSB) {
504 return;
505 }
506 HUSB hUSB = hSession->hUSB;
507 if (!hUSB) {
508 return;
509 }
510 #ifdef HDC_HOST
511 if (hUSB->devHandle) {
512 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
513 libusb_close(hUSB->devHandle);
514 hUSB->devHandle = nullptr;
515 }
516 #else
517 Base::CloseFd(hUSB->bulkIn);
518 Base::CloseFd(hUSB->bulkOut);
519 #endif
520 delete hSession->hUSB;
521 hSession->hUSB = nullptr;
522 }
523 #ifdef HDC_SUPPORT_UART
524 if (CONN_SERIAL == hSession->connType) {
525 if (!hSession->hUART) {
526 return;
527 }
528 HUART hUART = hSession->hUART;
529 if (!hUART) {
530 return;
531 }
532 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
533 // tell uart session will be free
534 uartBase->StopSession(hSession);
535 #ifdef HDC_HOST
536 #ifdef HOST_MINGW
537 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
538 CloseHandle(hUART->devUartHandle);
539 hUART->devUartHandle = INVALID_HANDLE_VALUE;
540 }
541 #elif defined(HOST_LINUX)
542 Base::CloseFd(hUART->devUartHandle);
543 #endif // _WIN32
544 #endif
545 delete hSession->hUART;
546 hSession->hUART = nullptr;
547 }
548 #endif
549 }
550
551 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)552 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
553 {
554 HSession hSession = (HSession)handle->data;
555 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
556 if (hSession->uvHandleRef > 0) {
557 WRITE_LOG(LOG_INFO, "FreeSessionFinally uvHandleRef:%d sessionId:%u",
558 hSession->uvHandleRef, hSession->sessionId);
559 return;
560 }
561 // Notify Server or Daemon, just UI or display commandline
562 thisClass->NotifyInstanceSessionFree(hSession, true);
563 // all hsession uv handle has been clear
564 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
565 WRITE_LOG(LOG_INFO, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
566 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
567 delete hSession;
568 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
569 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
570 --thisClass->sessionRef;
571 }
572
573 // work when child-work thread finish
FreeSessionContinue(HSession hSession)574 void HdcSessionBase::FreeSessionContinue(HSession hSession)
575 {
576 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
577 HSession hSession = (HSession)handle->data;
578 --hSession->uvHandleRef;
579 Base::TryCloseHandle((uv_handle_t *)handle);
580 if (handle == reinterpret_cast<uv_handle_t *>(hSession->pollHandle[STREAM_MAIN])) {
581 Base::CloseFd(hSession->ctrlFd[STREAM_MAIN]);
582 Base::CloseFd(hSession->ctrlFd[STREAM_WORK]);
583 free(hSession->pollHandle[STREAM_MAIN]);
584 }
585 };
586 if (hSession->connType == CONN_TCP) {
587 // Turn off TCP to prevent continuing writing
588 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
589 Base::CloseFd(hSession->dataFd[STREAM_WORK]);
590 }
591 hSession->availTailIndex = 0;
592 if (hSession->ioBuf) {
593 delete[] hSession->ioBuf;
594 hSession->ioBuf = nullptr;
595 }
596 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_MAIN], true, closeSessionTCPHandle);
597 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
598 FreeSessionByConnectType(hSession);
599 // finish
600 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
601 }
602
FreeSessionOpeate(uv_timer_t * handle)603 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
604 {
605 StartTraceScope("HdcSessionBase::FreeSessionOpeate");
606 HSession hSession = (HSession)handle->data;
607 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
608 if (hSession->ref > 0) {
609 WRITE_LOG(LOG_WARN, "FreeSessionOpeate sid:%u ref:%u > 0", hSession->sessionId, uint32_t(hSession->ref));
610 return;
611 }
612 WRITE_LOG(LOG_INFO, "FreeSessionOpeate sid:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
613 #ifdef HDC_HOST
614 if (hSession->hUSB != nullptr
615 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
616 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
617 pUSB->CancelUsbIo(hSession);
618 return;
619 }
620 #endif
621 // wait workthread to free
622 if (hSession->pollHandle[STREAM_WORK]->loop) {
623 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
624 Base::SendToPollFd(hSession->ctrlFd[STREAM_MAIN], ctrl.data(), ctrl.size());
625 WRITE_LOG(LOG_INFO, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
626 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
627 HSession hSession = (HSession)handle->data;
628 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
629 if (!hSession->childCleared) {
630 WRITE_LOG(LOG_INFO, "FreeSessionOpeate childCleared:%d sessionId:%u",
631 hSession->childCleared, hSession->sessionId);
632 return;
633 }
634 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
635 thisClass->FreeSessionContinue(hSession);
636 };
637 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
638 } else {
639 thisClass->FreeSessionContinue(hSession);
640 }
641 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
642 }
643
FreeSession(const uint32_t sessionId)644 void HdcSessionBase::FreeSession(const uint32_t sessionId)
645 {
646 StartTraceScope("HdcSessionBase::FreeSession");
647 AddDeletedSessionId(sessionId);
648 if (threadSessionMain != uv_thread_self()) {
649 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
650 return;
651 }
652 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
653 WRITE_LOG(LOG_INFO, "Begin to free session, sessionid:%u", sessionId);
654 do {
655 if (!hSession || hSession->isDead) {
656 WRITE_LOG(LOG_WARN, "FreeSession hSession nullptr or isDead sessionId:%u", sessionId);
657 break;
658 }
659 WRITE_LOG(LOG_INFO, "dataFdSend:%llu, dataFdRecv:%llu",
660 uint64_t(hSession->stat.dataSendBytes),
661 uint64_t(hSession->stat.dataRecvBytes));
662 hSession->isDead = true;
663 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
664 NotifyInstanceSessionFree(hSession, false);
665 WRITE_LOG(LOG_INFO, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
666 } while (false);
667 }
668
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)669 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
670 {
671 HSession hRet = nullptr;
672 switch (op) {
673 case OP_ADD:
674 uv_rwlock_wrlock(&lockMapSession);
675 mapSession[sessionId] = hInput;
676 uv_rwlock_wrunlock(&lockMapSession);
677 break;
678 case OP_REMOVE:
679 uv_rwlock_wrlock(&lockMapSession);
680 mapSession.erase(sessionId);
681 uv_rwlock_wrunlock(&lockMapSession);
682 break;
683 case OP_QUERY:
684 uv_rwlock_rdlock(&lockMapSession);
685 if (mapSession.count(sessionId)) {
686 hRet = mapSession[sessionId];
687 }
688 uv_rwlock_rdunlock(&lockMapSession);
689 break;
690 case OP_QUERY_REF:
691 uv_rwlock_wrlock(&lockMapSession);
692 if (mapSession.count(sessionId)) {
693 hRet = mapSession[sessionId];
694 ++hRet->ref;
695 }
696 uv_rwlock_wrunlock(&lockMapSession);
697 break;
698 case OP_UPDATE:
699 uv_rwlock_wrlock(&lockMapSession);
700 // remove old
701 mapSession.erase(sessionId);
702 mapSession[hInput->sessionId] = hInput;
703 uv_rwlock_wrunlock(&lockMapSession);
704 break;
705 case OP_VOTE_RESET:
706 if (mapSession.count(sessionId) == 0) {
707 break;
708 }
709 bool needReset;
710 if (serverOrDaemon) {
711 uv_rwlock_wrlock(&lockMapSession);
712 hRet = mapSession[sessionId];
713 hRet->voteReset = true;
714 needReset = true;
715 for (auto &kv : mapSession) {
716 if (sessionId == kv.first) {
717 continue;
718 }
719 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
720 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
721 if (!kv.second->voteReset) {
722 needReset = false;
723 }
724 }
725 uv_rwlock_wrunlock(&lockMapSession);
726 } else {
727 needReset = true;
728 }
729 if (needReset) {
730 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
731 abort();
732 }
733 break;
734 default:
735 break;
736 }
737 return hRet;
738 }
739
AddDeletedSessionId(uint32_t sessionId)740 void HdcSessionBase::AddDeletedSessionId(uint32_t sessionId)
741 {
742 std::unique_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
743 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
744 WRITE_LOG(LOG_INFO, "SessionId:%u is already in the cache", sessionId);
745 return;
746 }
747 WRITE_LOG(LOG_INFO, "AddDeletedSessionId:%u", sessionId);
748 deletedSessionIdSet.insert(sessionId);
749 deletedSessionIdQueue.push(sessionId);
750
751 // Delete old records and only save MAX_DELETED_SESSION_ID_RECORD_COUNT records
752 if (deletedSessionIdQueue.size() > MAX_DELETED_SESSION_ID_RECORD_COUNT) {
753 uint32_t id = deletedSessionIdQueue.front();
754 WRITE_LOG(LOG_INFO, "deletedSessionIdQueue size:%u, deletedSessionIdSet size:%u, pop session id:%u",
755 deletedSessionIdQueue.size(), deletedSessionIdSet.size(), id);
756 deletedSessionIdQueue.pop();
757 deletedSessionIdSet.erase(id);
758 }
759 }
760
IsSessionDeleted(uint32_t sessionId) const761 bool HdcSessionBase::IsSessionDeleted(uint32_t sessionId) const
762 {
763 std::shared_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
764 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
765 return true;
766 }
767 return false;
768 }
769
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)770 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
771 {
772 int idx = 1;
773 for (auto t : mapTask) {
774 HTaskInfo ti = t.second;
775 WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
776 idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
777 }
778 }
779
780 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)781 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
782 {
783 HTaskInfo hRet = nullptr;
784 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
785
786 switch (op) {
787 case OP_ADD:
788 hRet = mapTask[channelId];
789 if (hRet != nullptr) {
790 delete hRet;
791 }
792 mapTask[channelId] = hInput;
793 hRet = hInput;
794
795 WRITE_LOG(LOG_WARN, "AdminTask add session %u, channelId %u, mapTask size: %zu",
796 hSession->sessionId, channelId, mapTask.size());
797
798 break;
799 case OP_REMOVE:
800 mapTask.erase(channelId);
801 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
802 hSession->sessionId, channelId, mapTask.size());
803 break;
804 case OP_QUERY:
805 if (mapTask.count(channelId)) {
806 hRet = mapTask[channelId];
807 }
808 break;
809 case OP_VOTE_RESET:
810 AdminSession(op, hSession->sessionId, nullptr);
811 break;
812 default:
813 break;
814 }
815 return hRet;
816 }
817
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)818 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo)
819 {
820 StartTraceScope("HdcSessionBase::SendByProtocol");
821 if (hSession->isDead) {
822 delete[] bufPtr;
823 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
824 return ERR_SESSION_NOFOUND;
825 }
826 int ret = 0;
827 switch (hSession->connType) {
828 case CONN_TCP: {
829 HdcTCPBase *pTCP = ((HdcTCPBase *)hSession->classModule);
830 if (echo && !hSession->serverOrDaemon) {
831 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
832 } else {
833 if (hSession->hWorkThread == uv_thread_self()) {
834 ret = pTCP->WriteUvTcpFd(&hSession->hWorkTCP, bufPtr, bufLen);
835 } else {
836 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
837 }
838 }
839 break;
840 }
841 case CONN_USB: {
842 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
843 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
844 delete[] bufPtr;
845 break;
846 }
847 #ifdef HDC_SUPPORT_UART
848 case CONN_SERIAL: {
849 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
850 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
851 delete[] bufPtr;
852 break;
853 }
854 #endif
855 default:
856 break;
857 }
858 return ret;
859 }
860
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)861 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
862 const uint8_t *data, const int dataSize)
863 {
864 StartTraceScope("HdcSessionBase::Send");
865 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
866 if (!hSession) {
867 WRITE_LOG(LOG_WARN, "Send to offline device, drop it, sessionId:%u", sessionId);
868 return ERR_SESSION_NOFOUND;
869 }
870 PayloadProtect protectBuf; // noneed convert to big-endian
871 protectBuf.channelId = channelId;
872 protectBuf.commandFlag = commandFlag;
873 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
874 protectBuf.vCode = payloadProtectStaticVcode;
875 string s = SerialStruct::SerializeToString(protectBuf);
876 // reserve for encrypt here
877 // xx-encrypt
878
879 PayloadHead payloadHead = {}; // need convert to big-endian
880 payloadHead.flag[0] = PACKET_FLAG.at(0);
881 payloadHead.flag[1] = PACKET_FLAG.at(1);
882 payloadHead.protocolVer = VER_PROTOCOL;
883 payloadHead.headSize = htons(s.size());
884 payloadHead.dataSize = htonl(dataSize);
885 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
886 uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
887 if (finayBuf == nullptr) {
888 WRITE_LOG(LOG_WARN, "send allocmem err");
889 return ERR_BUF_ALLOC;
890 }
891 bool bufRet = false;
892 do {
893 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
894 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
895 break;
896 }
897 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
898 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
899 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
900 break;
901 }
902 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
903 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
904 break;
905 }
906 bufRet = true;
907 } while (false);
908 if (!bufRet) {
909 delete[] finayBuf;
910 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
911 return ERR_BUF_COPY;
912 }
913 if (CMD_KERNEL_ECHO == commandFlag) {
914 return SendByProtocol(hSession, finayBuf, finalBufSize, true);
915 } else {
916 return SendByProtocol(hSession, finayBuf, finalBufSize);
917 }
918 }
919
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)920 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
921 {
922 StartTraceScope("HdcSessionBase::DecryptPayload");
923 PayloadProtect protectBuf = {};
924 uint16_t headSize = ntohs(payloadHeadBe->headSize);
925 int dataSize = ntohl(payloadHeadBe->dataSize);
926 string encString(reinterpret_cast<char *>(encBuf), headSize);
927 SerialStruct::ParseFromString(protectBuf, encString);
928 if (protectBuf.vCode != payloadProtectStaticVcode) {
929 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
930 return ERR_BUF_CHECK;
931 }
932 uint8_t *data = encBuf + headSize;
933 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
934 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
935 return ERR_BUF_CHECK;
936 }
937 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
938 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
939 protectBuf.channelId, protectBuf.commandFlag);
940 return ERR_GENERIC;
941 }
942 return RET_SUCCESS;
943 }
944
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)945 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
946 {
947 int ret = ERR_GENERIC;
948 StartTraceScope("HdcSessionBase::OnRead");
949 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
950 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
951 return ERR_BUF_CHECK;
952 }
953 struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
954 // to prevent integer overflow caused by the add operation of two input num
955 uint64_t payloadHeadSize = static_cast<uint64_t>(ntohl(payloadHead->dataSize)) +
956 static_cast<uint64_t>(ntohs(payloadHead->headSize));
957 int packetHeadSize = sizeof(struct PayloadHead);
958 if (payloadHeadSize == 0 || payloadHeadSize > static_cast<uint64_t>(HDC_BUF_MAX_BYTES)) {
959 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
960 return ERR_BUF_CHECK;
961 }
962
963 // 0 < payloadHeadSize < HDC_BUF_MAX_BYTES
964 int tobeReadLen = static_cast<int>(payloadHeadSize);
965 if (bufLen - packetHeadSize < tobeReadLen) {
966 return 0;
967 }
968 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
969 WRITE_LOG(LOG_WARN, "decrypt plhead error");
970 return ERR_BUF_CHECK;
971 }
972 ret = packetHeadSize + tobeReadLen;
973 return ret;
974 }
975
976 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)977 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
978 {
979 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
980 int indexBuf = 0;
981 int childRet = 0;
982 StartTraceScope("HdcSessionBase::FetchIOBuf");
983 if (read < 0) {
984 constexpr int bufSize = 1024;
985 char buf[bufSize] = { 0 };
986 uv_strerror_r(read, buf, bufSize);
987 WRITE_LOG(LOG_FATAL, "FetchIOBuf read io failed,%s", buf);
988 return ERR_IO_FAIL;
989 }
990 hSession->stat.dataRecvBytes += read;
991 hSession->availTailIndex += read;
992 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
993 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
994 if (childRet > 0) {
995 hSession->availTailIndex -= childRet;
996 indexBuf += childRet;
997 } else if (childRet == 0) {
998 // Not enough a IO
999 break;
1000 } else { // <0
1001 WRITE_LOG(LOG_FATAL, "FetchIOBuf error childRet:%d sessionId:%u", childRet, hSession->sessionId);
1002 hSession->availTailIndex = 0; // Preventing malicious data packages
1003 indexBuf = ERR_BUF_SIZE;
1004 break;
1005 }
1006 // It may be multi-time IO to merge in a BUF, need to loop processing
1007 }
1008 if (indexBuf > 0 && hSession->availTailIndex > 0) {
1009 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
1010 != EOK) {
1011 return ERR_BUF_COPY;
1012 };
1013 uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
1014 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
1015 }
1016 return indexBuf;
1017 }
1018
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)1019 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
1020 {
1021 HSession context = (HSession)handle->data;
1022 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
1023 buf->base = (char *)context->ioBuf + context->availTailIndex;
1024 int size = context->bufSize - context->availTailIndex;
1025 buf->len = std::min(size, static_cast<int>(sizeWanted));
1026 }
1027
FinishWriteSessionTCP(uv_write_t * req,int status)1028 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
1029 {
1030 HSession hSession = (HSession)req->handle->data;
1031 --hSession->ref;
1032 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1033 if (status < 0) {
1034 WRITE_LOG(LOG_WARN, "FinishWriteSessionTCP status:%d sessionId:%u isDead:%d ref:%u",
1035 status, hSession->sessionId, hSession->isDead, uint32_t(hSession->ref));
1036 Base::TryCloseHandle((uv_handle_t *)req->handle);
1037 if (!hSession->isDead && !hSession->ref) {
1038 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%u", hSession->sessionId);
1039 thisClass->FreeSession(hSession->sessionId);
1040 }
1041 }
1042 delete[]((uint8_t *)req->data);
1043 delete req;
1044 }
1045
DispatchSessionThreadCommand(HSession hSession,const uint8_t * baseBuf,const int bytesIO)1046 bool HdcSessionBase::DispatchSessionThreadCommand(HSession hSession, const uint8_t *baseBuf,
1047 const int bytesIO)
1048 {
1049 bool ret = true;
1050 uint8_t flag = *const_cast<uint8_t *>(baseBuf);
1051
1052 switch (flag) {
1053 case SP_JDWP_NEWFD:
1054 case SP_ARK_NEWFD: {
1055 JdwpNewFileDescriptor(baseBuf, bytesIO);
1056 break;
1057 }
1058 default:
1059 WRITE_LOG(LOG_WARN, "Not support session command");
1060 break;
1061 }
1062 return ret;
1063 }
1064
ReadCtrlFromSession(uv_poll_t * poll,int status,int events)1065 void HdcSessionBase::ReadCtrlFromSession(uv_poll_t *poll, int status, int events)
1066 {
1067 HSession hSession = (HSession)poll->data;
1068 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1069 const int size = Base::GetMaxBufSizeStable();
1070 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1071 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_MAIN], buf, size);
1072 while (true) {
1073 if (nread < 0) {
1074 constexpr int bufSize = 1024;
1075 char buffer[bufSize] = { 0 };
1076 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1077 WRITE_LOG(LOG_WARN, "ReadCtrlFromSession failed,%s", buffer);
1078 uv_poll_stop(poll);
1079 break;
1080 }
1081 if (nread == 0) {
1082 WRITE_LOG(LOG_FATAL, "ReadCtrlFromSession read data zero byte");
1083 break;
1084 }
1085 // only one command, no need to split command from stream
1086 // if add more commands, consider the format command
1087 hSessionBase->DispatchSessionThreadCommand(hSession, reinterpret_cast<uint8_t *>(buf), nread);
1088 break;
1089 }
1090 delete[] buf;
1091 }
1092
WorkThreadInitSession(HSession hSession,SessionHandShake & handshake)1093 void HdcSessionBase::WorkThreadInitSession(HSession hSession, SessionHandShake &handshake)
1094 {
1095 handshake.banner = HANDSHAKE_MESSAGE;
1096 handshake.sessionId = hSession->sessionId;
1097 handshake.connectKey = hSession->connectKey;
1098 if (!hSession->isCheck) {
1099 handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1100 WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1101 }
1102 handshake.authType = AUTH_NONE;
1103 // told daemon, we support RSA_3072_SHA512 auth
1104 Base::TlvAppend(handshake.buf, TAG_AUTH_TYPE, std::to_string(AuthVerifyType::RSA_3072_SHA512));
1105 }
1106
WorkThreadStartSession(HSession hSession)1107 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1108 {
1109 bool regOK = false;
1110 int childRet = 0;
1111 if (hSession->connType == CONN_TCP) {
1112 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1113 hSession->hChildWorkTCP.data = hSession;
1114 if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1115 WRITE_LOG(LOG_WARN, "HdcSessionBase SessionCtrl failed 1");
1116 return false;
1117 }
1118 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1119 constexpr int bufSize = 1024;
1120 char buf[bufSize] = { 0 };
1121 uv_strerror_r(childRet, buf, bufSize);
1122 WRITE_LOG(LOG_WARN, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1123 return false;
1124 }
1125 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1126 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1127 regOK = true;
1128 #ifdef HDC_SUPPORT_UART
1129 } else if (hSession->connType == CONN_SERIAL) { // UART
1130 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1131 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1132 regOK = pUARTBase->ReadyForWorkThread(hSession);
1133 #endif
1134 } else { // USB
1135 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1136 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1137 regOK = pUSBBase->ReadyForWorkThread(hSession);
1138 }
1139
1140 if (regOK && hSession->serverOrDaemon) {
1141 // session handshake step1
1142 SessionHandShake handshake = {};
1143 WorkThreadInitSession(hSession, handshake);
1144 string hs = SerialStruct::SerializeToString(handshake);
1145 #ifdef HDC_SUPPORT_UART
1146 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1147 hSession->sessionId, handshake.authType, hs.c_str());
1148 #endif
1149 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1150 reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1151 }
1152 return regOK;
1153 }
1154
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1155 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1156 int dataSize)
1157 {
1158 vector<uint8_t> ret;
1159 while (true) {
1160 if (dataSize > BUF_SIZE_MICRO) {
1161 WRITE_LOG(LOG_WARN, "BuildCtrlString dataSize:%d", dataSize);
1162 break;
1163 }
1164 CtrlStruct ctrl = {};
1165 ctrl.command = command;
1166 ctrl.channelId = channelId;
1167 ctrl.dataSize = dataSize;
1168 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1169 break;
1170 }
1171 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1172 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1173 break;
1174 }
1175 return ret;
1176 }
1177
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1178 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1179 {
1180 bool ret = true;
1181 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1182 switch (ctrl->command) {
1183 case SP_START_SESSION: {
1184 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1185 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1186 ret = WorkThreadStartSession(hSession);
1187 break;
1188 }
1189 case SP_STOP_SESSION: {
1190 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1191 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1192 HSession hSession = (HSession)handle->data;
1193 Base::TryCloseHandle((uv_handle_t *)handle);
1194 if (handle == (uv_handle_t *)hSession->pollHandle[STREAM_WORK]) {
1195 free(hSession->pollHandle[STREAM_WORK]);
1196 }
1197 if (--hSession->uvChildRef == 0) {
1198 uv_stop(&hSession->childLoop);
1199 };
1200 };
1201 constexpr int uvChildRefOffset = 2;
1202 hSession->uvChildRef += uvChildRefOffset;
1203 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1204 ++hSession->uvChildRef;
1205 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1206 }
1207 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_WORK], true,
1208 closeSessionChildThreadTCPHandle);
1209 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1210 closeSessionChildThreadTCPHandle);
1211 break;
1212 }
1213 case SP_ATTACH_CHANNEL: {
1214 if (!serverOrDaemon) {
1215 break; // Only Server has this feature
1216 }
1217 AttachChannel(hSession, channelId);
1218 break;
1219 }
1220 case SP_DEATCH_CHANNEL: {
1221 if (!serverOrDaemon) {
1222 break; // Only Server has this feature
1223 }
1224 DeatchChannel(hSession, channelId);
1225 break;
1226 }
1227 default:
1228 WRITE_LOG(LOG_WARN, "Not support main command");
1229 ret = false;
1230 break;
1231 }
1232 return ret;
1233 }
1234
1235 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_poll_t * poll,int status,int events)1236 void HdcSessionBase::ReadCtrlFromMain(uv_poll_t *poll, int status, int events)
1237 {
1238 HSession hSession = (HSession)poll->data;
1239 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1240 int formatCommandSize = sizeof(CtrlStruct);
1241 int index = 0;
1242 const int size = Base::GetMaxBufSizeStable();
1243 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1244 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_WORK], buf, size);
1245 while (true) {
1246 if (nread < 0) {
1247 constexpr int bufSize = 1024;
1248 char buffer[bufSize] = { 0 };
1249 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1250 WRITE_LOG(LOG_WARN, "SessionCtrl failed,%s", buffer);
1251 break;
1252 }
1253 if (nread % formatCommandSize != 0) {
1254 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1255 break;
1256 }
1257 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf + index);
1258 if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1259 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1260 hSession->sessionId, ctrl->channelId, ctrl->command);
1261 break;
1262 }
1263 index += sizeof(CtrlStruct);
1264 if (index >= nread) {
1265 break;
1266 }
1267 }
1268 delete[] buf;
1269 }
1270
ReChildLoopForSessionClear(HSession hSession)1271 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1272 {
1273 // Restart loop close task
1274 ClearOwnTasks(hSession, 0);
1275 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1276 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1277 HSession hSession = (HSession)handle->data;
1278 for (auto v : *hSession->mapTask) {
1279 HTaskInfo hTask = (HTaskInfo)v.second;
1280 uint8_t level;
1281 if (hTask->closeRetryCount < GLOBAL_TIMEOUT / 2) {
1282 level = LOG_DEBUG;
1283 } else {
1284 level = LOG_WARN;
1285 }
1286 WRITE_LOG(level, "wait task free retry %d/%d, channelId:%u, sessionId:%u",
1287 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
1288 if (hTask->closeRetryCount++ >= GLOBAL_TIMEOUT) {
1289 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
1290 hSession = thisClass->AdminSession(OP_QUERY, hTask->sessionId, nullptr);
1291 thisClass->AdminTask(OP_VOTE_RESET, hSession, hTask->channelId, nullptr);
1292 }
1293 if (!hTask->taskFree)
1294 return;
1295 }
1296 // all task has been free
1297 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1298 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1299 };
1300 Base::TimerUvTask(
1301 &hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1302 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1303 // clear
1304 Base::TryCloseChildLoop(&hSession->childLoop, "Session childUV");
1305 }
1306
SessionWorkThread(uv_work_t * arg)1307 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1308 {
1309 HSession hSession = (HSession)arg->data;
1310 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1311 hSession->hWorkChildThread = uv_thread_self();
1312
1313 uv_poll_t *pollHandle = hSession->pollHandle[STREAM_WORK];
1314 pollHandle->data = hSession;
1315 uv_poll_init_socket(&hSession->childLoop, pollHandle, hSession->ctrlFd[STREAM_WORK]);
1316 uv_poll_start(pollHandle, UV_READABLE, ReadCtrlFromMain);
1317 WRITE_LOG(LOG_DEBUG, "!!!Workthread run begin, sessionId:%u instance:%s", hSession->sessionId,
1318 thisClass->serverOrDaemon ? "server" : "daemon");
1319 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1320 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1321 // main loop has exit
1322 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1323 hSession->childCleared = true;
1324 WRITE_LOG(LOG_WARN, "!!!Workthread run finish, sessionId:%u", hSession->sessionId);
1325 }
1326
1327 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1328 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1329 MessageLevel level, const char *msg, ...)
1330 // clang-format on
1331 {
1332 va_list vaArgs;
1333 va_start(vaArgs, msg);
1334 string log = Base::StringFormat(msg, vaArgs);
1335 va_end(vaArgs);
1336 vector<uint8_t> buf;
1337 buf.push_back(level);
1338 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1339 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1340 }
1341
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1342 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1343 {
1344 // referer from HdcServerForClient::DoCommandRemote
1345 bool ret = false;
1346 bool taskMasterInit = false;
1347 masterTask = false;
1348 switch (command) {
1349 case CMD_FILE_INIT:
1350 case CMD_FLASHD_FLASH_INIT:
1351 case CMD_FLASHD_UPDATE_INIT:
1352 case CMD_FLASHD_ERASE:
1353 case CMD_FLASHD_FORMAT:
1354 case CMD_FORWARD_INIT:
1355 case CMD_APP_INIT:
1356 case CMD_APP_UNINSTALL:
1357 case CMD_APP_SIDELOAD:
1358 taskMasterInit = true;
1359 break;
1360 default:
1361 break;
1362 }
1363 if (!serverOrDaemon &&
1364 (command == CMD_SHELL_INIT || command == CMD_UNITY_EXECUTE_EX ||
1365 (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1366 // daemon's single side command
1367 ret = true;
1368 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1369 // slave tasks
1370 ret = true;
1371 } else if (command == CMD_UNITY_BUGREPORT_INIT) {
1372 ret = true;
1373 } else if (taskMasterInit) {
1374 // task init command
1375 masterTask = true;
1376 ret = true;
1377 }
1378 return ret;
1379 }
1380 // Heavy and time-consuming work was putted in the new thread to do, and does
1381 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1382 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1383 uint8_t *payload, int payloadSize)
1384 {
1385 StartTraceScope("HdcSessionBase::DispatchTaskData");
1386 bool ret = true;
1387 HTaskInfo hTaskInfo = nullptr;
1388 bool masterTask = false;
1389 while (true) {
1390 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1391 if (NeedNewTaskInfo(command, masterTask)) {
1392 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1393 hTaskInfo = new(std::nothrow) TaskInformation();
1394 if (hTaskInfo == nullptr) {
1395 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1396 break;
1397 }
1398 hTaskInfo->channelId = channelId;
1399 hTaskInfo->sessionId = hSession->sessionId;
1400 hTaskInfo->runLoop = &hSession->childLoop;
1401 hTaskInfo->serverOrDaemon = serverOrDaemon;
1402 hTaskInfo->masterSlave = masterTask;
1403 hTaskInfo->closeRetryCount = 0;
1404 hTaskInfo->channelTask = false;
1405 hTaskInfo->isCleared = false;
1406
1407 int addTaskRetry = 3; // try 3 time
1408 while (addTaskRetry > 0) {
1409 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1410 break;
1411 }
1412 sleep(1);
1413 --addTaskRetry;
1414 }
1415
1416 if (addTaskRetry == 0) {
1417 #ifndef HDC_HOST
1418 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId,
1419 MSG_FAIL, "hdc thread pool busy, may cause reset later");
1420 #endif
1421 delete hTaskInfo;
1422 hTaskInfo = nullptr;
1423 ret = false;
1424 break;
1425 }
1426 } else {
1427 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1428 }
1429 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1430 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1431 break;
1432 }
1433 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1434 break;
1435 }
1436 return ret;
1437 }
1438
PostStopInstanceMessage(bool restart)1439 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1440 {
1441 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1442 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1443 wantRestart = restart;
1444 }
1445 } // namespace Hdc
1446