1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24 // print version pid
25 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26 // server/daemon common initialization code
27 if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28 uvThreadSize = SIZE_THREAD_POOL_MIN;
29 } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30 uvThreadSize = SIZE_THREAD_POOL_MAX;
31 }
32 threadPoolCount = uvThreadSize;
33 WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34 string uvThreadEnv("UV_THREADPOOL_SIZE");
35 string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37 uvThreadEnv += "=";
38 uvThreadEnv += uvThreadVal;
39 _putenv(uvThreadEnv.c_str());
40 #else
41 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43 uv_loop_init(&loopMain);
44 WRITE_LOG(LOG_DEBUG, "loopMain init");
45 uv_rwlock_init(&mainAsync);
46 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
47 uv_rwlock_init(&lockMapSession);
48 serverOrDaemon = serverOrDaemonIn;
49 ctxUSB = nullptr;
50 wantRestart = false;
51 threadSessionMain = uv_thread_self();
52
53 #ifdef HDC_HOST
54 if (serverOrDaemon) {
55 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
56 ctxUSB = nullptr;
57 WRITE_LOG(LOG_FATAL, "libusb_init failed ctxUSB is nullptr");
58 }
59 }
60 #endif
61 }
62
~HdcSessionBase()63 HdcSessionBase::~HdcSessionBase()
64 {
65 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
66 uv_loop_close(&loopMain);
67 // clear base
68 uv_rwlock_destroy(&mainAsync);
69 uv_rwlock_destroy(&lockMapSession);
70 #ifdef HDC_HOST
71 if (serverOrDaemon and ctxUSB != nullptr) {
72 libusb_exit((libusb_context *)ctxUSB);
73 }
74 #endif
75 WRITE_LOG(LOG_WARN, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
76 serverOrDaemon ? "server" : "daemon");
77 }
78
79 // remove step2
TryRemoveTask(HTaskInfo hTask)80 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
81 {
82 if (hTask->taskFree) {
83 WRITE_LOG(LOG_WARN, "TryRemoveTask channelId:%u", hTask->channelId);
84 return true;
85 }
86 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
87 if (ret) {
88 hTask->taskFree = true;
89 } else {
90 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
91 // here to see which task has not been released
92 // print task clear
93 }
94 return ret;
95 }
96
97 // remove step1
BeginRemoveTask(HTaskInfo hTask)98 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
99 {
100 StartTraceScope("HdcSessionBase::BeginRemoveTask");
101 if (hTask->taskStop || hTask->taskFree) {
102 WRITE_LOG(LOG_WARN, "BeginRemoveTask channelId:%u taskStop:%d taskFree:%d",
103 hTask->channelId, hTask->taskStop, hTask->taskFree);
104 return;
105 }
106
107 WRITE_LOG(LOG_WARN, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
108 bool ret = RemoveInstanceTask(OP_CLEAR, hTask);
109 if (!ret) {
110 WRITE_LOG(LOG_INFO, "RemoveInstanceTask false taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
111 }
112 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
113 StartTraceScope("HdcSessionBase::BeginRemoveTask taskClassDeleteRetry");
114 HTaskInfo hTask = (HTaskInfo)handle->data;
115 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
116 constexpr uint32_t count = 1000;
117 if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
118 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
119 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
120 hTask->closeRetryCount = 1;
121 }
122 hTask->closeRetryCount++;
123 if (!thisClass->TryRemoveTask(hTask)) {
124 WRITE_LOG(LOG_WARN, "TaskDelay TryRemoveTask false channelId:%u", hTask->channelId);
125 return;
126 }
127 WRITE_LOG(LOG_WARN, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
128 if (hTask != nullptr) {
129 delete hTask;
130 hTask = nullptr;
131 }
132 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
133 };
134 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
135
136 hTask->taskStop = true;
137 }
138
139 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
140 // after the end of the LOOP.
141 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
142 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)143 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
144 {
145 // First case: normal task cleanup process (STOP Remove)
146 // Second: The task is cleaned up, the session ends
147 // Third: The task is cleaned up, and the session is directly over the session.
148 StartTraceScope("HdcSessionBase::ClearOwnTasks");
149 hSession->mapTaskMutex.lock();
150 map<uint32_t, HTaskInfo>::iterator iter;
151 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
152 uint32_t channelId = iter->first;
153 HTaskInfo hTask = iter->second;
154 if (channelIDInput != 0) { // single
155 if (channelIDInput != channelId) {
156 ++iter;
157 continue;
158 }
159 BeginRemoveTask(hTask);
160 WRITE_LOG(LOG_WARN, "ClearOwnTasks OP_CLEAR finish, sessionId:%u channelIDInput:%u",
161 hSession->sessionId, channelIDInput);
162 iter = hSession->mapTask->erase(iter);
163 break;
164 }
165 // multi
166 BeginRemoveTask(hTask);
167 iter = hSession->mapTask->erase(iter);
168 }
169 hSession->mapTaskMutex.unlock();
170 }
171
ClearSessions()172 void HdcSessionBase::ClearSessions()
173 {
174 // no need to lock mapSession
175 // broadcast free signal
176 for (auto v : mapSession) {
177 HSession hSession = (HSession)v.second;
178 if (!hSession->isDead) {
179 FreeSession(hSession->sessionId);
180 }
181 }
182 }
183
ReMainLoopForInstanceClear()184 void HdcSessionBase::ReMainLoopForInstanceClear()
185 { // reloop
186 StartTraceScope("HdcSessionBase::ReMainLoopForInstanceClear");
187 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
188 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
189 if (thisClass->sessionRef > 0) {
190 return;
191 }
192 // all task has been free
193 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
194 uv_stop(&thisClass->loopMain);
195 };
196 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
197 uv_run(&loopMain, UV_RUN_DEFAULT);
198 };
199
200 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)201 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
202 {
203 uv_rwlock_rdlock(&lockMapSession);
204 map<uint32_t, HSession>::iterator i;
205 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
206 HSession hs = i->second;
207 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
208 continue;
209 }
210 kickOut(hs);
211 break;
212 }
213 uv_rwlock_rdunlock(&lockMapSession);
214 }
215 #endif
216
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))217 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
218 {
219 if (!pCallBack) {
220 return;
221 }
222 uv_rwlock_rdlock(&lockMapSession);
223 map<uint32_t, HSession>::iterator i;
224 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
225 HSession hs = i->second;
226 if (hs->connType != CONN_USB) {
227 continue;
228 }
229 if (hs->hUSB == nullptr) {
230 continue;
231 }
232 if (pCallBack) {
233 pCallBack(hs);
234 }
235 break;
236 }
237 uv_rwlock_rdunlock(&lockMapSession);
238 }
239
240 // The PC side gives the device information, determines if the USB device is registered
241 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)242 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
243 {
244 #ifdef HDC_HOST
245 libusb_device *dev = (libusb_device *)pDev;
246 HSession hResult = nullptr;
247 if (!mapSession.size()) {
248 return nullptr;
249 }
250 uint8_t busId = 0;
251 uint8_t devId = 0;
252 if (pDev) {
253 busId = libusb_get_bus_number(dev);
254 devId = libusb_get_device_address(dev);
255 } else {
256 busId = busIDIn;
257 devId = devIDIn;
258 }
259 uv_rwlock_rdlock(&lockMapSession);
260 map<uint32_t, HSession>::iterator i;
261 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
262 HSession hs = i->second;
263 if (hs->connType == CONN_USB) {
264 continue;
265 }
266 if (hs->hUSB == nullptr) {
267 continue;
268 }
269 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
270 continue;
271 }
272 hResult = hs;
273 break;
274 }
275 uv_rwlock_rdunlock(&lockMapSession);
276 return hResult;
277 #else
278 return nullptr;
279 #endif
280 }
281
AsyncMainLoopTask(uv_idle_t * handle)282 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
283 {
284 AsyncParam *param = (AsyncParam *)handle->data;
285 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
286 switch (param->method) {
287 case ASYNC_FREE_SESSION:
288 // Destruction is unified in the main thread
289 thisClass->FreeSession(param->sid);
290 break;
291 case ASYNC_STOP_MAINLOOP:
292 uv_stop(&thisClass->loopMain);
293 break;
294 default:
295 break;
296 }
297 if (param->data) {
298 delete[]((uint8_t *)param->data);
299 }
300 delete param;
301 param = nullptr;
302 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
303 }
304
MainAsyncCallback(uv_async_t * handle)305 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
306 {
307 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
308 list<void *>::iterator i;
309 list<void *> &lst = thisClass->lstMainThreadOP;
310 uv_rwlock_wrlock(&thisClass->mainAsync);
311 for (i = lst.begin(); i != lst.end();) {
312 AsyncParam *param = (AsyncParam *)*i;
313 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
314 i = lst.erase(i);
315 }
316 uv_rwlock_wrunlock(&thisClass->mainAsync);
317 }
318
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)319 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
320 const int dataSize)
321 {
322 AsyncParam *param = new AsyncParam();
323 if (!param) {
324 return;
325 }
326 param->sid = sessionId;
327 param->thisClass = this;
328 param->method = method;
329 if (dataSize > 0) {
330 param->dataSize = dataSize;
331 param->data = new uint8_t[param->dataSize]();
332 if (!param->data) {
333 delete param;
334 return;
335 }
336 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
337 delete[]((uint8_t *)param->data);
338 delete param;
339 return;
340 }
341 }
342
343 asyncMainLoop.data = this;
344 uv_rwlock_wrlock(&mainAsync);
345 lstMainThreadOP.push_back(param);
346 uv_rwlock_wrunlock(&mainAsync);
347 uv_async_send(&asyncMainLoop);
348 }
349
WorkerPendding()350 void HdcSessionBase::WorkerPendding()
351 {
352 uv_run(&loopMain, UV_RUN_DEFAULT);
353 ClearInstanceResource();
354 }
355
MallocSessionByConnectType(HSession hSession)356 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
357 {
358 int ret = 0;
359 switch (hSession->connType) {
360 case CONN_TCP: {
361 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
362 ++hSession->uvHandleRef;
363 hSession->hWorkTCP.data = hSession;
364 break;
365 }
366 case CONN_USB: {
367 // Some members need to be placed at the primary thread
368 HUSB hUSB = new HdcUSB();
369 if (!hUSB) {
370 ret = -1;
371 break;
372 }
373 hSession->hUSB = hUSB;
374 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
375 break;
376 }
377 #ifdef HDC_SUPPORT_UART
378 case CONN_SERIAL: {
379 HUART hUART = new HdcUART();
380 if (!hUART) {
381 ret = -1;
382 break;
383 }
384 hSession->hUART = hUART;
385 break;
386 }
387 #endif // HDC_SUPPORT_UART
388 default:
389 ret = -1;
390 break;
391 }
392 return ret;
393 }
394
395 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()396 uint32_t HdcSessionBase::GetSessionPseudoUid()
397 {
398 uint32_t uid = 0;
399 do {
400 uid = Base::GetSecureRandom();
401 } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
402 return uid;
403 }
404
405 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)406 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
407 uint32_t sessionId)
408 {
409 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
410 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
411 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
412 #endif
413 HSession hSession = new(std::nothrow) HdcSession();
414 if (!hSession) {
415 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
416 return nullptr;
417 }
418 int ret = 0;
419 ++sessionRef;
420 hSession->classInstance = this;
421 hSession->connType = connType;
422 hSession->classModule = classModule;
423 hSession->isDead = false;
424 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
425 hSession->serverOrDaemon = serverOrDaemon;
426 hSession->hWorkThread = uv_thread_self();
427 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
428 if (hSession->mapTask == nullptr) {
429 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
430 delete hSession;
431 hSession = nullptr;
432 return nullptr;
433 }
434 hSession->listKey = new(std::nothrow) list<void *>;
435 if (hSession->listKey == nullptr) {
436 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
437 delete hSession;
438 hSession = nullptr;
439 return nullptr;
440 }
441 uv_loop_init(&hSession->childLoop);
442 hSession->uvHandleRef = 0;
443 // pullup child
444 WRITE_LOG(LOG_INFO, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
445 hSession->sessionId, hSession->connType);
446 ++hSession->uvHandleRef;
447 Base::CreateSocketPair(hSession->ctrlFd);
448 size_t handleSize = sizeof(uv_poll_t);
449 hSession->pollHandle[STREAM_WORK] = (uv_poll_t *)malloc(handleSize);
450 hSession->pollHandle[STREAM_MAIN] = (uv_poll_t *)malloc(handleSize);
451 uv_poll_t *pollHandleMain = hSession->pollHandle[STREAM_MAIN];
452 if (pollHandleMain == nullptr || hSession->pollHandle[STREAM_WORK] == nullptr) {
453 WRITE_LOG(LOG_FATAL, "MallocSession malloc hSession->pollHandle failed");
454 delete hSession;
455 hSession = nullptr;
456 return nullptr;
457 }
458 uv_poll_init_socket(&loopMain, pollHandleMain, hSession->ctrlFd[STREAM_MAIN]);
459 uv_poll_start(pollHandleMain, UV_READABLE, ReadCtrlFromSession);
460 hSession->pollHandle[STREAM_MAIN]->data = hSession;
461 hSession->pollHandle[STREAM_WORK]->data = hSession;
462 // Activate USB DAEMON's data channel, may not for use
463 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
464 (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
465 0, sizeof(uv_tcp_t));
466 ++hSession->uvHandleRef;
467 Base::CreateSocketPair(hSession->dataFd);
468 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
469 hSession->dataPipe[STREAM_MAIN].data = hSession;
470 hSession->dataPipe[STREAM_WORK].data = hSession;
471 #ifdef HDC_HOST
472 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN], HOST_SOCKETPAIR_SIZE);
473 #else
474 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
475 #endif
476 ret = MallocSessionByConnectType(hSession);
477 if (ret) {
478 delete hSession;
479 hSession = nullptr;
480 } else {
481 AdminSession(OP_ADD, hSession->sessionId, hSession);
482 }
483 return hSession;
484 }
485
FreeSessionByConnectType(HSession hSession)486 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
487 {
488 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
489
490 if (hSession->connType == CONN_USB) {
491 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
492 if (!hSession->hUSB) {
493 return;
494 }
495 HUSB hUSB = hSession->hUSB;
496 if (!hUSB) {
497 return;
498 }
499 #ifdef HDC_HOST
500 if (hUSB->devHandle) {
501 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
502 libusb_close(hUSB->devHandle);
503 hUSB->devHandle = nullptr;
504 }
505 #else
506 Base::CloseFd(hUSB->bulkIn);
507 Base::CloseFd(hUSB->bulkOut);
508 #endif
509 delete hSession->hUSB;
510 hSession->hUSB = nullptr;
511 }
512 #ifdef HDC_SUPPORT_UART
513 if (CONN_SERIAL == hSession->connType) {
514 if (!hSession->hUART) {
515 return;
516 }
517 HUART hUART = hSession->hUART;
518 if (!hUART) {
519 return;
520 }
521 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
522 // tell uart session will be free
523 uartBase->StopSession(hSession);
524 #ifdef HDC_HOST
525 #ifdef HOST_MINGW
526 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
527 CloseHandle(hUART->devUartHandle);
528 hUART->devUartHandle = INVALID_HANDLE_VALUE;
529 }
530 #elif defined(HOST_LINUX)
531 Base::CloseFd(hUART->devUartHandle);
532 #endif // _WIN32
533 #endif
534 delete hSession->hUART;
535 hSession->hUART = nullptr;
536 }
537 #endif
538 }
539
540 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)541 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
542 {
543 HSession hSession = (HSession)handle->data;
544 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
545 if (hSession->uvHandleRef > 0) {
546 WRITE_LOG(LOG_INFO, "FreeSessionFinally uvHandleRef:%d sessionId:%u",
547 hSession->uvHandleRef, hSession->sessionId);
548 return;
549 }
550 // Notify Server or Daemon, just UI or display commandline
551 thisClass->NotifyInstanceSessionFree(hSession, true);
552 // all hsession uv handle has been clear
553 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
554 WRITE_LOG(LOG_INFO, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
555 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
556 delete hSession;
557 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
558 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
559 --thisClass->sessionRef;
560 }
561
562 // work when child-work thread finish
FreeSessionContinue(HSession hSession)563 void HdcSessionBase::FreeSessionContinue(HSession hSession)
564 {
565 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
566 HSession hSession = (HSession)handle->data;
567 --hSession->uvHandleRef;
568 Base::TryCloseHandle((uv_handle_t *)handle);
569 if (handle == reinterpret_cast<uv_handle_t *>(hSession->pollHandle[STREAM_MAIN])) {
570 Base::CloseFd(hSession->ctrlFd[STREAM_MAIN]);
571 Base::CloseFd(hSession->ctrlFd[STREAM_WORK]);
572 free(hSession->pollHandle[STREAM_MAIN]);
573 }
574 };
575 if (hSession->connType == CONN_TCP) {
576 // Turn off TCP to prevent continuing writing
577 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
578 Base::CloseFd(hSession->dataFd[STREAM_WORK]);
579 }
580 hSession->availTailIndex = 0;
581 if (hSession->ioBuf) {
582 delete[] hSession->ioBuf;
583 hSession->ioBuf = nullptr;
584 }
585 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_MAIN], true, closeSessionTCPHandle);
586 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
587 FreeSessionByConnectType(hSession);
588 // finish
589 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
590 }
591
FreeSessionOpeate(uv_timer_t * handle)592 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
593 {
594 StartTraceScope("HdcSessionBase::FreeSessionOpeate");
595 HSession hSession = (HSession)handle->data;
596 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
597 if (hSession->ref > 0) {
598 WRITE_LOG(LOG_WARN, "FreeSessionOpeate sid:%u ref:%u > 0", hSession->sessionId, uint32_t(hSession->ref));
599 return;
600 }
601 WRITE_LOG(LOG_INFO, "FreeSessionOpeate sid:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
602 #ifdef HDC_HOST
603 if (hSession->hUSB != nullptr
604 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
605 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
606 pUSB->CancelUsbIo(hSession);
607 return;
608 }
609 #endif
610 // wait workthread to free
611 if (hSession->pollHandle[STREAM_WORK]->loop) {
612 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
613 Base::SendToPollFd(hSession->ctrlFd[STREAM_MAIN], ctrl.data(), ctrl.size());
614 WRITE_LOG(LOG_INFO, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
615 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
616 HSession hSession = (HSession)handle->data;
617 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
618 if (!hSession->childCleared) {
619 WRITE_LOG(LOG_INFO, "FreeSessionOpeate childCleared:%d sessionId:%u",
620 hSession->childCleared, hSession->sessionId);
621 return;
622 }
623 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
624 thisClass->FreeSessionContinue(hSession);
625 };
626 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
627 } else {
628 thisClass->FreeSessionContinue(hSession);
629 }
630 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
631 }
632
FreeSession(const uint32_t sessionId)633 void HdcSessionBase::FreeSession(const uint32_t sessionId)
634 {
635 StartTraceScope("HdcSessionBase::FreeSession");
636 AddDeletedSessionId(sessionId);
637 if (threadSessionMain != uv_thread_self()) {
638 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
639 return;
640 }
641 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
642 WRITE_LOG(LOG_INFO, "Begin to free session, sessionid:%u", sessionId);
643 do {
644 if (!hSession || hSession->isDead) {
645 WRITE_LOG(LOG_WARN, "FreeSession hSession nullptr or isDead sessionId:%u", sessionId);
646 break;
647 }
648 WRITE_LOG(LOG_INFO, "dataFdSend:%llu, dataFdRecv:%llu",
649 uint64_t(hSession->stat.dataSendBytes),
650 uint64_t(hSession->stat.dataRecvBytes));
651 hSession->isDead = true;
652 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
653 NotifyInstanceSessionFree(hSession, false);
654 WRITE_LOG(LOG_INFO, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
655 } while (false);
656 }
657
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)658 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
659 {
660 HSession hRet = nullptr;
661 switch (op) {
662 case OP_ADD:
663 uv_rwlock_wrlock(&lockMapSession);
664 mapSession[sessionId] = hInput;
665 uv_rwlock_wrunlock(&lockMapSession);
666 break;
667 case OP_REMOVE:
668 uv_rwlock_wrlock(&lockMapSession);
669 mapSession.erase(sessionId);
670 uv_rwlock_wrunlock(&lockMapSession);
671 break;
672 case OP_QUERY:
673 uv_rwlock_rdlock(&lockMapSession);
674 if (mapSession.count(sessionId)) {
675 hRet = mapSession[sessionId];
676 }
677 uv_rwlock_rdunlock(&lockMapSession);
678 break;
679 case OP_QUERY_REF:
680 uv_rwlock_wrlock(&lockMapSession);
681 if (mapSession.count(sessionId)) {
682 hRet = mapSession[sessionId];
683 ++hRet->ref;
684 }
685 uv_rwlock_wrunlock(&lockMapSession);
686 break;
687 case OP_UPDATE:
688 uv_rwlock_wrlock(&lockMapSession);
689 // remove old
690 mapSession.erase(sessionId);
691 mapSession[hInput->sessionId] = hInput;
692 uv_rwlock_wrunlock(&lockMapSession);
693 break;
694 case OP_VOTE_RESET:
695 if (mapSession.count(sessionId) == 0) {
696 break;
697 }
698 bool needReset;
699 if (serverOrDaemon) {
700 uv_rwlock_wrlock(&lockMapSession);
701 hRet = mapSession[sessionId];
702 hRet->voteReset = true;
703 needReset = true;
704 for (auto &kv : mapSession) {
705 if (sessionId == kv.first) {
706 continue;
707 }
708 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
709 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
710 if (!kv.second->voteReset) {
711 needReset = false;
712 }
713 }
714 uv_rwlock_wrunlock(&lockMapSession);
715 } else {
716 needReset = true;
717 }
718 if (needReset) {
719 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
720 abort();
721 }
722 break;
723 default:
724 break;
725 }
726 return hRet;
727 }
728
AddDeletedSessionId(uint32_t sessionId)729 void HdcSessionBase::AddDeletedSessionId(uint32_t sessionId)
730 {
731 std::unique_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
732 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
733 WRITE_LOG(LOG_INFO, "SessionId:%u is already in the cache", sessionId);
734 return;
735 }
736 WRITE_LOG(LOG_INFO, "AddDeletedSessionId:%u", sessionId);
737 deletedSessionIdSet.insert(sessionId);
738 deletedSessionIdQueue.push(sessionId);
739
740 // Delete old records and only save MAX_DELETED_SESSION_ID_RECORD_COUNT records
741 if (deletedSessionIdQueue.size() > MAX_DELETED_SESSION_ID_RECORD_COUNT) {
742 uint32_t id = deletedSessionIdQueue.front();
743 WRITE_LOG(LOG_INFO, "deletedSessionIdQueue size:%u, deletedSessionIdSet size:%u, pop session id:%u",
744 deletedSessionIdQueue.size(), deletedSessionIdSet.size(), id);
745 deletedSessionIdQueue.pop();
746 deletedSessionIdSet.erase(id);
747 }
748 }
749
IsSessionDeleted(uint32_t sessionId) const750 bool HdcSessionBase::IsSessionDeleted(uint32_t sessionId) const
751 {
752 std::shared_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
753 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
754 return true;
755 }
756 return false;
757 }
758
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)759 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
760 {
761 int idx = 1;
762 for (auto t : mapTask) {
763 HTaskInfo ti = t.second;
764 WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
765 idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
766 }
767 }
768
769 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)770 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
771 {
772 HTaskInfo hRet = nullptr;
773 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
774
775 switch (op) {
776 case OP_ADD:
777 hRet = mapTask[channelId];
778 if (hRet != nullptr) {
779 delete hRet;
780 }
781 mapTask[channelId] = hInput;
782 hRet = hInput;
783
784 WRITE_LOG(LOG_WARN, "AdminTask add session %u, channelId %u, mapTask size: %zu",
785 hSession->sessionId, channelId, mapTask.size());
786
787 break;
788 case OP_REMOVE:
789 mapTask.erase(channelId);
790 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
791 hSession->sessionId, channelId, mapTask.size());
792 break;
793 case OP_QUERY:
794 if (mapTask.count(channelId)) {
795 hRet = mapTask[channelId];
796 }
797 break;
798 case OP_VOTE_RESET:
799 AdminSession(op, hSession->sessionId, nullptr);
800 break;
801 default:
802 break;
803 }
804 return hRet;
805 }
806
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)807 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo)
808 {
809 StartTraceScope("HdcSessionBase::SendByProtocol");
810 if (hSession->isDead) {
811 delete[] bufPtr;
812 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
813 return ERR_SESSION_NOFOUND;
814 }
815 int ret = 0;
816 switch (hSession->connType) {
817 case CONN_TCP: {
818 HdcTCPBase *pTCP = ((HdcTCPBase *)hSession->classModule);
819 if (echo && !hSession->serverOrDaemon) {
820 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
821 } else {
822 if (hSession->hWorkThread == uv_thread_self()) {
823 ret = pTCP->WriteUvTcpFd(&hSession->hWorkTCP, bufPtr, bufLen);
824 } else {
825 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
826 }
827 }
828 break;
829 }
830 case CONN_USB: {
831 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
832 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
833 delete[] bufPtr;
834 break;
835 }
836 #ifdef HDC_SUPPORT_UART
837 case CONN_SERIAL: {
838 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
839 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
840 delete[] bufPtr;
841 break;
842 }
843 #endif
844 default:
845 break;
846 }
847 return ret;
848 }
849
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)850 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
851 const uint8_t *data, const int dataSize)
852 {
853 StartTraceScope("HdcSessionBase::Send");
854 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
855 if (!hSession) {
856 WRITE_LOG(LOG_WARN, "Send to offline device, drop it, sessionId:%u", sessionId);
857 return ERR_SESSION_NOFOUND;
858 }
859 PayloadProtect protectBuf; // noneed convert to big-endian
860 protectBuf.channelId = channelId;
861 protectBuf.commandFlag = commandFlag;
862 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
863 protectBuf.vCode = payloadProtectStaticVcode;
864 string s = SerialStruct::SerializeToString(protectBuf);
865 // reserve for encrypt here
866 // xx-encrypt
867
868 PayloadHead payloadHead = {}; // need convert to big-endian
869 payloadHead.flag[0] = PACKET_FLAG.at(0);
870 payloadHead.flag[1] = PACKET_FLAG.at(1);
871 payloadHead.protocolVer = VER_PROTOCOL;
872 payloadHead.headSize = htons(s.size());
873 payloadHead.dataSize = htonl(dataSize);
874 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
875 uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
876 if (finayBuf == nullptr) {
877 WRITE_LOG(LOG_WARN, "send allocmem err");
878 return ERR_BUF_ALLOC;
879 }
880 bool bufRet = false;
881 do {
882 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
883 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
884 break;
885 }
886 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
887 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
888 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
889 break;
890 }
891 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
892 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
893 break;
894 }
895 bufRet = true;
896 } while (false);
897 if (!bufRet) {
898 delete[] finayBuf;
899 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
900 return ERR_BUF_COPY;
901 }
902 if (CMD_KERNEL_ECHO == commandFlag) {
903 return SendByProtocol(hSession, finayBuf, finalBufSize, true);
904 } else {
905 return SendByProtocol(hSession, finayBuf, finalBufSize);
906 }
907 }
908
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)909 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
910 {
911 StartTraceScope("HdcSessionBase::DecryptPayload");
912 PayloadProtect protectBuf = {};
913 uint16_t headSize = ntohs(payloadHeadBe->headSize);
914 int dataSize = ntohl(payloadHeadBe->dataSize);
915 string encString(reinterpret_cast<char *>(encBuf), headSize);
916 SerialStruct::ParseFromString(protectBuf, encString);
917 if (protectBuf.vCode != payloadProtectStaticVcode) {
918 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
919 return ERR_BUF_CHECK;
920 }
921 uint8_t *data = encBuf + headSize;
922 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
923 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
924 return ERR_BUF_CHECK;
925 }
926 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
927 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
928 protectBuf.channelId, protectBuf.commandFlag);
929 return ERR_GENERIC;
930 }
931 return RET_SUCCESS;
932 }
933
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)934 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
935 {
936 int ret = ERR_GENERIC;
937 StartTraceScope("HdcSessionBase::OnRead");
938 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
939 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
940 return ERR_BUF_CHECK;
941 }
942 struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
943 // to prevent integer overflow caused by the add operation of two input num
944 uint64_t payloadHeadSize = static_cast<uint64_t>(ntohl(payloadHead->dataSize)) +
945 static_cast<uint64_t>(ntohs(payloadHead->headSize));
946 int packetHeadSize = sizeof(struct PayloadHead);
947 if (payloadHeadSize == 0 || payloadHeadSize > static_cast<uint64_t>(HDC_BUF_MAX_BYTES)) {
948 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
949 return ERR_BUF_CHECK;
950 }
951
952 // 0 < payloadHeadSize < HDC_BUF_MAX_BYTES
953 int tobeReadLen = static_cast<int>(payloadHeadSize);
954 if (bufLen - packetHeadSize < tobeReadLen) {
955 return 0;
956 }
957 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
958 WRITE_LOG(LOG_WARN, "decrypt plhead error");
959 return ERR_BUF_CHECK;
960 }
961 ret = packetHeadSize + tobeReadLen;
962 return ret;
963 }
964
965 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)966 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
967 {
968 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
969 int indexBuf = 0;
970 int childRet = 0;
971 StartTraceScope("HdcSessionBase::FetchIOBuf");
972 if (read < 0) {
973 constexpr int bufSize = 1024;
974 char buf[bufSize] = { 0 };
975 uv_strerror_r(read, buf, bufSize);
976 WRITE_LOG(LOG_FATAL, "FetchIOBuf read io failed,%s", buf);
977 return ERR_IO_FAIL;
978 }
979 hSession->stat.dataRecvBytes += read;
980 hSession->availTailIndex += read;
981 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
982 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
983 if (childRet > 0) {
984 hSession->availTailIndex -= childRet;
985 indexBuf += childRet;
986 } else if (childRet == 0) {
987 // Not enough a IO
988 break;
989 } else { // <0
990 WRITE_LOG(LOG_FATAL, "FetchIOBuf error childRet:%d sessionId:%u", childRet, hSession->sessionId);
991 hSession->availTailIndex = 0; // Preventing malicious data packages
992 indexBuf = ERR_BUF_SIZE;
993 break;
994 }
995 // It may be multi-time IO to merge in a BUF, need to loop processing
996 }
997 if (indexBuf > 0 && hSession->availTailIndex > 0) {
998 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
999 != EOK) {
1000 return ERR_BUF_COPY;
1001 };
1002 uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
1003 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
1004 }
1005 return indexBuf;
1006 }
1007
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)1008 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
1009 {
1010 HSession context = (HSession)handle->data;
1011 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
1012 buf->base = (char *)context->ioBuf + context->availTailIndex;
1013 int size = context->bufSize - context->availTailIndex;
1014 buf->len = std::min(size, static_cast<int>(sizeWanted));
1015 }
1016
FinishWriteSessionTCP(uv_write_t * req,int status)1017 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
1018 {
1019 HSession hSession = (HSession)req->handle->data;
1020 --hSession->ref;
1021 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1022 if (status < 0) {
1023 WRITE_LOG(LOG_WARN, "FinishWriteSessionTCP status:%d sessionId:%u isDead:%d ref:%u",
1024 status, hSession->sessionId, hSession->isDead, uint32_t(hSession->ref));
1025 Base::TryCloseHandle((uv_handle_t *)req->handle);
1026 if (!hSession->isDead && !hSession->ref) {
1027 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%u", hSession->sessionId);
1028 thisClass->FreeSession(hSession->sessionId);
1029 }
1030 }
1031 delete[]((uint8_t *)req->data);
1032 delete req;
1033 }
1034
DispatchSessionThreadCommand(HSession hSession,const uint8_t * baseBuf,const int bytesIO)1035 bool HdcSessionBase::DispatchSessionThreadCommand(HSession hSession, const uint8_t *baseBuf,
1036 const int bytesIO)
1037 {
1038 bool ret = true;
1039 uint8_t flag = *const_cast<uint8_t *>(baseBuf);
1040
1041 switch (flag) {
1042 case SP_JDWP_NEWFD:
1043 case SP_ARK_NEWFD: {
1044 JdwpNewFileDescriptor(baseBuf, bytesIO);
1045 break;
1046 }
1047 default:
1048 WRITE_LOG(LOG_WARN, "Not support session command");
1049 break;
1050 }
1051 return ret;
1052 }
1053
ReadCtrlFromSession(uv_poll_t * poll,int status,int events)1054 void HdcSessionBase::ReadCtrlFromSession(uv_poll_t *poll, int status, int events)
1055 {
1056 HSession hSession = (HSession)poll->data;
1057 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1058 const int size = Base::GetMaxBufSizeStable();
1059 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1060 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_MAIN], buf, size);
1061 while (true) {
1062 if (nread < 0) {
1063 constexpr int bufSize = 1024;
1064 char buffer[bufSize] = { 0 };
1065 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1066 WRITE_LOG(LOG_WARN, "ReadCtrlFromSession failed,%s", buffer);
1067 uv_poll_stop(poll);
1068 break;
1069 }
1070 if (nread == 0) {
1071 WRITE_LOG(LOG_FATAL, "ReadCtrlFromSession read data zero byte");
1072 break;
1073 }
1074 // only one command, no need to split command from stream
1075 // if add more commands, consider the format command
1076 hSessionBase->DispatchSessionThreadCommand(hSession, reinterpret_cast<uint8_t *>(buf), nread);
1077 break;
1078 }
1079 delete[] buf;
1080 }
1081
WorkThreadInitSession(HSession hSession,SessionHandShake & handshake)1082 void HdcSessionBase::WorkThreadInitSession(HSession hSession, SessionHandShake &handshake)
1083 {
1084 handshake.banner = HANDSHAKE_MESSAGE;
1085 handshake.sessionId = hSession->sessionId;
1086 handshake.connectKey = hSession->connectKey;
1087 if (!hSession->isCheck) {
1088 handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1089 WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1090 }
1091 handshake.authType = AUTH_NONE;
1092 // told daemon, we support RSA_3072_SHA512 auth
1093 Base::TlvAppend(handshake.buf, TAG_AUTH_TYPE, std::to_string(AuthVerifyType::RSA_3072_SHA512));
1094 }
1095
WorkThreadStartSession(HSession hSession)1096 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1097 {
1098 bool regOK = false;
1099 int childRet = 0;
1100 if (hSession->connType == CONN_TCP) {
1101 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1102 hSession->hChildWorkTCP.data = hSession;
1103 if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1104 WRITE_LOG(LOG_WARN, "HdcSessionBase SessionCtrl failed 1");
1105 return false;
1106 }
1107 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1108 constexpr int bufSize = 1024;
1109 char buf[bufSize] = { 0 };
1110 uv_strerror_r(childRet, buf, bufSize);
1111 WRITE_LOG(LOG_WARN, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1112 return false;
1113 }
1114 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1115 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1116 regOK = true;
1117 #ifdef HDC_SUPPORT_UART
1118 } else if (hSession->connType == CONN_SERIAL) { // UART
1119 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1120 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1121 regOK = pUARTBase->ReadyForWorkThread(hSession);
1122 #endif
1123 } else { // USB
1124 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1125 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1126 regOK = pUSBBase->ReadyForWorkThread(hSession);
1127 }
1128
1129 if (regOK && hSession->serverOrDaemon) {
1130 // session handshake step1
1131 SessionHandShake handshake = {};
1132 WorkThreadInitSession(hSession, handshake);
1133 string hs = SerialStruct::SerializeToString(handshake);
1134 #ifdef HDC_SUPPORT_UART
1135 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1136 hSession->sessionId, handshake.authType, hs.c_str());
1137 #endif
1138 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1139 reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1140 }
1141 return regOK;
1142 }
1143
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1144 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1145 int dataSize)
1146 {
1147 vector<uint8_t> ret;
1148 while (true) {
1149 if (dataSize > BUF_SIZE_MICRO) {
1150 WRITE_LOG(LOG_WARN, "BuildCtrlString dataSize:%d", dataSize);
1151 break;
1152 }
1153 CtrlStruct ctrl = {};
1154 ctrl.command = command;
1155 ctrl.channelId = channelId;
1156 ctrl.dataSize = dataSize;
1157 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1158 break;
1159 }
1160 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1161 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1162 break;
1163 }
1164 return ret;
1165 }
1166
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1167 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1168 {
1169 bool ret = true;
1170 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1171 switch (ctrl->command) {
1172 case SP_START_SESSION: {
1173 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1174 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1175 ret = WorkThreadStartSession(hSession);
1176 break;
1177 }
1178 case SP_STOP_SESSION: {
1179 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1180 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1181 HSession hSession = (HSession)handle->data;
1182 Base::TryCloseHandle((uv_handle_t *)handle);
1183 if (handle == (uv_handle_t *)hSession->pollHandle[STREAM_WORK]) {
1184 free(hSession->pollHandle[STREAM_WORK]);
1185 }
1186 if (--hSession->uvChildRef == 0) {
1187 uv_stop(&hSession->childLoop);
1188 };
1189 };
1190 constexpr int uvChildRefOffset = 2;
1191 hSession->uvChildRef += uvChildRefOffset;
1192 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1193 ++hSession->uvChildRef;
1194 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1195 }
1196 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_WORK], true,
1197 closeSessionChildThreadTCPHandle);
1198 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1199 closeSessionChildThreadTCPHandle);
1200 break;
1201 }
1202 case SP_ATTACH_CHANNEL: {
1203 if (!serverOrDaemon) {
1204 break; // Only Server has this feature
1205 }
1206 AttachChannel(hSession, channelId);
1207 break;
1208 }
1209 case SP_DEATCH_CHANNEL: {
1210 if (!serverOrDaemon) {
1211 break; // Only Server has this feature
1212 }
1213 DeatchChannel(hSession, channelId);
1214 break;
1215 }
1216 default:
1217 WRITE_LOG(LOG_WARN, "Not support main command");
1218 ret = false;
1219 break;
1220 }
1221 return ret;
1222 }
1223
1224 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_poll_t * poll,int status,int events)1225 void HdcSessionBase::ReadCtrlFromMain(uv_poll_t *poll, int status, int events)
1226 {
1227 HSession hSession = (HSession)poll->data;
1228 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1229 int formatCommandSize = sizeof(CtrlStruct);
1230 int index = 0;
1231 const int size = Base::GetMaxBufSizeStable();
1232 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1233 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_WORK], buf, size);
1234 while (true) {
1235 if (nread < 0) {
1236 constexpr int bufSize = 1024;
1237 char buffer[bufSize] = { 0 };
1238 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1239 WRITE_LOG(LOG_WARN, "SessionCtrl failed,%s", buffer);
1240 break;
1241 }
1242 if (nread % formatCommandSize != 0) {
1243 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1244 break;
1245 }
1246 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf + index);
1247 if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1248 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1249 hSession->sessionId, ctrl->channelId, ctrl->command);
1250 break;
1251 }
1252 index += sizeof(CtrlStruct);
1253 if (index >= nread) {
1254 break;
1255 }
1256 }
1257 delete[] buf;
1258 }
1259
ReChildLoopForSessionClear(HSession hSession)1260 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1261 {
1262 // Restart loop close task
1263 ClearOwnTasks(hSession, 0);
1264 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1265 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1266 HSession hSession = (HSession)handle->data;
1267 for (auto v : *hSession->mapTask) {
1268 HTaskInfo hTask = (HTaskInfo)v.second;
1269 uint8_t level;
1270 if (hTask->closeRetryCount < GLOBAL_TIMEOUT / 2) {
1271 level = LOG_DEBUG;
1272 } else {
1273 level = LOG_WARN;
1274 }
1275 WRITE_LOG(level, "wait task free retry %d/%d, channelId:%u, sessionId:%u",
1276 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
1277 if (hTask->closeRetryCount++ >= GLOBAL_TIMEOUT) {
1278 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
1279 hSession = thisClass->AdminSession(OP_QUERY, hTask->sessionId, nullptr);
1280 thisClass->AdminTask(OP_VOTE_RESET, hSession, hTask->channelId, nullptr);
1281 }
1282 if (!hTask->taskFree)
1283 return;
1284 }
1285 // all task has been free
1286 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1287 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1288 };
1289 Base::TimerUvTask(
1290 &hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1291 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1292 // clear
1293 Base::TryCloseChildLoop(&hSession->childLoop, "Session childUV");
1294 }
1295
SessionWorkThread(uv_work_t * arg)1296 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1297 {
1298 HSession hSession = (HSession)arg->data;
1299 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1300 hSession->hWorkChildThread = uv_thread_self();
1301
1302 uv_poll_t *pollHandle = hSession->pollHandle[STREAM_WORK];
1303 pollHandle->data = hSession;
1304 uv_poll_init_socket(&hSession->childLoop, pollHandle, hSession->ctrlFd[STREAM_WORK]);
1305 uv_poll_start(pollHandle, UV_READABLE, ReadCtrlFromMain);
1306 WRITE_LOG(LOG_DEBUG, "!!!Workthread run begin, sessionId:%u instance:%s", hSession->sessionId,
1307 thisClass->serverOrDaemon ? "server" : "daemon");
1308 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1309 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1310 // main loop has exit
1311 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1312 hSession->childCleared = true;
1313 WRITE_LOG(LOG_WARN, "!!!Workthread run finish, sessionId:%u", hSession->sessionId);
1314 }
1315
1316 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1317 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1318 MessageLevel level, const char *msg, ...)
1319 // clang-format on
1320 {
1321 va_list vaArgs;
1322 va_start(vaArgs, msg);
1323 string log = Base::StringFormat(msg, vaArgs);
1324 va_end(vaArgs);
1325 vector<uint8_t> buf;
1326 buf.push_back(level);
1327 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1328 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1329 }
1330
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1331 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1332 {
1333 // referer from HdcServerForClient::DoCommandRemote
1334 bool ret = false;
1335 bool taskMasterInit = false;
1336 masterTask = false;
1337 switch (command) {
1338 case CMD_FILE_INIT:
1339 case CMD_FLASHD_FLASH_INIT:
1340 case CMD_FLASHD_UPDATE_INIT:
1341 case CMD_FLASHD_ERASE:
1342 case CMD_FLASHD_FORMAT:
1343 case CMD_FORWARD_INIT:
1344 case CMD_APP_INIT:
1345 case CMD_APP_UNINSTALL:
1346 case CMD_APP_SIDELOAD:
1347 taskMasterInit = true;
1348 break;
1349 default:
1350 break;
1351 }
1352 if (!serverOrDaemon
1353 && (command == CMD_SHELL_INIT || (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1354 // daemon's single side command
1355 ret = true;
1356 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1357 // slave tasks
1358 ret = true;
1359 } else if (command == CMD_UNITY_BUGREPORT_INIT) {
1360 ret = true;
1361 } else if (taskMasterInit) {
1362 // task init command
1363 masterTask = true;
1364 ret = true;
1365 }
1366 return ret;
1367 }
1368 // Heavy and time-consuming work was putted in the new thread to do, and does
1369 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1370 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1371 uint8_t *payload, int payloadSize)
1372 {
1373 StartTraceScope("HdcSessionBase::DispatchTaskData");
1374 bool ret = true;
1375 HTaskInfo hTaskInfo = nullptr;
1376 bool masterTask = false;
1377 while (true) {
1378 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1379 if (NeedNewTaskInfo(command, masterTask)) {
1380 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1381 hTaskInfo = new(std::nothrow) TaskInformation();
1382 if (hTaskInfo == nullptr) {
1383 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1384 break;
1385 }
1386 hTaskInfo->channelId = channelId;
1387 hTaskInfo->sessionId = hSession->sessionId;
1388 hTaskInfo->runLoop = &hSession->childLoop;
1389 hTaskInfo->serverOrDaemon = serverOrDaemon;
1390 hTaskInfo->masterSlave = masterTask;
1391 hTaskInfo->closeRetryCount = 0;
1392 hTaskInfo->channelTask = false;
1393
1394 int addTaskRetry = 3; // try 3 time
1395 while (addTaskRetry > 0) {
1396 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1397 break;
1398 }
1399 sleep(1);
1400 --addTaskRetry;
1401 }
1402
1403 if (addTaskRetry == 0) {
1404 #ifndef HDC_HOST
1405 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId,
1406 MSG_FAIL, "hdc thread pool busy, may cause reset later");
1407 #endif
1408 delete hTaskInfo;
1409 hTaskInfo = nullptr;
1410 ret = false;
1411 break;
1412 }
1413 } else {
1414 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1415 }
1416 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1417 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1418 break;
1419 }
1420 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1421 break;
1422 }
1423 return ret;
1424 }
1425
PostStopInstanceMessage(bool restart)1426 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1427 {
1428 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1429 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1430 wantRestart = restart;
1431 }
1432 } // namespace Hdc
1433