1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #include "serial_struct.h"
17
18 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn)19 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn)
20 {
21 // print version pid
22 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
23 // server/daemon common initialization code
24 threadPoolCount = SIZE_THREAD_POOL;
25 string uvThreadEnv("UV_THREADPOOL_SIZE");
26 string uvThreadVal = std::to_string(threadPoolCount);
27 #ifdef _WIN32
28 uvThreadEnv += "=";
29 uvThreadEnv += uvThreadVal;
30 _putenv(uvThreadEnv.c_str());
31 #else
32 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
33 #endif
34 uv_loop_init(&loopMain);
35 WRITE_LOG(LOG_DEBUG, "loopMain init");
36 uv_rwlock_init(&mainAsync);
37 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
38 uv_rwlock_init(&lockMapSession);
39 serverOrDaemon = serverOrDaemonIn;
40 ctxUSB = nullptr;
41 wantRestart = false;
42 threadSessionMain = uv_thread_self();
43
44 #ifdef HDC_HOST
45 if (serverOrDaemon) {
46 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
47 ctxUSB = nullptr;
48 }
49 }
50 #endif
51 }
52
~HdcSessionBase()53 HdcSessionBase::~HdcSessionBase()
54 {
55 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
56 uv_loop_close(&loopMain);
57 // clear base
58 uv_rwlock_destroy(&mainAsync);
59 uv_rwlock_destroy(&lockMapSession);
60 #ifdef HDC_HOST
61 if (serverOrDaemon and ctxUSB != nullptr) {
62 libusb_exit((libusb_context *)ctxUSB);
63 }
64 #endif
65 WRITE_LOG(LOG_DEBUG, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
66 serverOrDaemon ? "server" : "daemon");
67 }
68
69 // remove step2
TryRemoveTask(HTaskInfo hTask)70 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
71 {
72 if (hTask->taskFree) {
73 return true;
74 }
75 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
76 if (ret) {
77 hTask->taskFree = true;
78 } else {
79 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
80 // here to see which task has not been released
81 // print task clear
82 }
83 return ret;
84 }
85
86 // remove step1
BeginRemoveTask(HTaskInfo hTask)87 bool HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
88 {
89 bool ret = true;
90
91 if (hTask->taskStop || hTask->taskFree) {
92 return true;
93 }
94
95 WRITE_LOG(LOG_DEBUG, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
96 ret = RemoveInstanceTask(OP_CLEAR, hTask);
97 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
98 HTaskInfo hTask = (HTaskInfo)handle->data;
99 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
100 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove current try count %d/%d, channelId:%u, sessionId:%u",
101 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
102 if (!thisClass->TryRemoveTask(hTask)) {
103 return;
104 }
105 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
106 if (hTask != nullptr) {
107 delete hTask;
108 hTask = nullptr;
109 }
110 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
111 };
112 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
113
114 hTask->taskStop = true;
115 ret = true;
116 return ret;
117 }
118
119 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
120 // after the end of the LOOP.
121 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
122 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)123 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
124 {
125 // First case: normal task cleanup process (STOP Remove)
126 // Second: The task is cleaned up, the session ends
127 // Third: The task is cleaned up, and the session is directly over the session.
128 hSession->mapTaskMutex.lock();
129 map<uint32_t, HTaskInfo>::iterator iter;
130 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
131 uint32_t channelId = iter->first;
132 HTaskInfo hTask = iter->second;
133 if (channelIDInput != 0) { // single
134 WRITE_LOG(LOG_DEBUG, "channelIDInput = %u, cur = %u", channelIDInput, channelId);
135 if (channelIDInput != channelId) {
136 ++iter;
137 continue;
138 }
139 BeginRemoveTask(hTask);
140 WRITE_LOG(LOG_DEBUG, "ClearOwnTasks OP_CLEAR finish, session:%p channelIDInput:%u", hSession,
141 channelIDInput);
142 iter = hSession->mapTask->erase(iter);
143 break;
144 }
145 // multi
146 BeginRemoveTask(hTask);
147 iter = hSession->mapTask->erase(iter);
148 }
149 hSession->mapTaskMutex.unlock();
150 }
151
ClearSessions()152 void HdcSessionBase::ClearSessions()
153 {
154 // no need to lock mapSession
155 // broadcast free singal
156 for (auto v : mapSession) {
157 HSession hSession = (HSession)v.second;
158 if (!hSession->isDead) {
159 FreeSession(hSession->sessionId);
160 }
161 }
162 }
163
ReMainLoopForInstanceClear()164 void HdcSessionBase::ReMainLoopForInstanceClear()
165 { // reloop
166 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
167 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
168 if (thisClass->sessionRef > 0) {
169 return;
170 }
171 // all task has been free
172 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
173 uv_stop(&thisClass->loopMain);
174 };
175 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
176 uv_run(&loopMain, UV_RUN_DEFAULT);
177 };
178
179 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)180 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
181 {
182 uv_rwlock_rdlock(&lockMapSession);
183 map<uint32_t, HSession>::iterator i;
184 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
185 HSession hs = i->second;
186 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
187 continue;
188 }
189 kickOut(hs);
190 break;
191 }
192 uv_rwlock_rdunlock(&lockMapSession);
193 }
194 #endif
195
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))196 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
197 {
198 if (!pCallBack) {
199 return;
200 }
201 uv_rwlock_rdlock(&lockMapSession);
202 map<uint32_t, HSession>::iterator i;
203 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
204 HSession hs = i->second;
205 if (hs->connType != CONN_USB) {
206 continue;
207 }
208 if (hs->hUSB == nullptr) {
209 continue;
210 }
211 if (pCallBack) {
212 pCallBack(hs);
213 }
214 break;
215 }
216 uv_rwlock_rdunlock(&lockMapSession);
217 }
218
219 // The PC side gives the device information, determines if the USB device is registered
220 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)221 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
222 {
223 #ifdef HDC_HOST
224 libusb_device *dev = (libusb_device *)pDev;
225 HSession hResult = nullptr;
226 if (!mapSession.size()) {
227 return nullptr;
228 }
229 uint8_t busId = 0;
230 uint8_t devId = 0;
231 if (pDev) {
232 busId = libusb_get_bus_number(dev);
233 devId = libusb_get_device_address(dev);
234 } else {
235 busId = busIDIn;
236 devId = devIDIn;
237 }
238 uv_rwlock_rdlock(&lockMapSession);
239 map<uint32_t, HSession>::iterator i;
240 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
241 HSession hs = i->second;
242 if (hs->connType == CONN_USB) {
243 continue;
244 }
245 if (hs->hUSB == nullptr) {
246 continue;
247 }
248 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
249 continue;
250 }
251 hResult = hs;
252 break;
253 }
254 uv_rwlock_rdunlock(&lockMapSession);
255 return hResult;
256 #else
257 return nullptr;
258 #endif
259 }
260
AsyncMainLoopTask(uv_idle_t * handle)261 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
262 {
263 AsyncParam *param = (AsyncParam *)handle->data;
264 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
265 switch (param->method) {
266 case ASYNC_FREE_SESSION:
267 // Destruction is unified in the main thread
268 thisClass->FreeSession(param->sid);
269 break;
270 case ASYNC_STOP_MAINLOOP:
271 uv_stop(&thisClass->loopMain);
272 break;
273 default:
274 break;
275 }
276 if (param->data) {
277 delete[]((uint8_t *)param->data);
278 }
279 delete param;
280 param = nullptr;
281 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
282 }
283
MainAsyncCallback(uv_async_t * handle)284 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
285 {
286 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
287 list<void *>::iterator i;
288 list<void *> &lst = thisClass->lstMainThreadOP;
289 uv_rwlock_wrlock(&thisClass->mainAsync);
290 for (i = lst.begin(); i != lst.end();) {
291 AsyncParam *param = (AsyncParam *)*i;
292 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
293 i = lst.erase(i);
294 }
295 uv_rwlock_wrunlock(&thisClass->mainAsync);
296 }
297
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)298 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
299 const int dataSize)
300 {
301 AsyncParam *param = new AsyncParam();
302 if (!param) {
303 return;
304 }
305 param->sid = sessionId;
306 param->thisClass = this;
307 param->method = method;
308 if (dataSize > 0) {
309 param->dataSize = dataSize;
310 param->data = new uint8_t[param->dataSize]();
311 if (!param->data) {
312 delete param;
313 return;
314 }
315 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
316 delete[]((uint8_t *)param->data);
317 delete param;
318 return;
319 }
320 }
321
322 asyncMainLoop.data = this;
323 uv_rwlock_wrlock(&mainAsync);
324 lstMainThreadOP.push_back(param);
325 uv_rwlock_wrunlock(&mainAsync);
326 uv_async_send(&asyncMainLoop);
327 }
328
WorkerPendding()329 void HdcSessionBase::WorkerPendding()
330 {
331 uv_run(&loopMain, UV_RUN_DEFAULT);
332 ClearInstanceResource();
333 }
334
MallocSessionByConnectType(HSession hSession)335 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
336 {
337 int ret = 0;
338 switch (hSession->connType) {
339 case CONN_TCP: {
340 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
341 ++hSession->uvHandleRef;
342 hSession->hWorkTCP.data = hSession;
343 break;
344 }
345 case CONN_USB: {
346 // Some members need to be placed at the primary thread
347 HUSB hUSB = new HdcUSB();
348 if (!hUSB) {
349 ret = -1;
350 break;
351 }
352 hSession->hUSB = hUSB;
353 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
354 break;
355 }
356 #ifdef HDC_SUPPORT_UART
357 case CONN_SERIAL: {
358 HUART hUART = new HdcUART();
359 if (!hUART) {
360 ret = -1;
361 break;
362 }
363 hSession->hUART = hUART;
364 break;
365 }
366 #endif // HDC_SUPPORT_UART
367 default:
368 ret = -1;
369 break;
370 }
371 return ret;
372 }
373
374 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()375 uint32_t HdcSessionBase::GetSessionPseudoUid()
376 {
377 uint32_t uid = 0;
378 HSession hInput = nullptr;
379 do {
380 uid = static_cast<uint32_t>(Base::GetRandom());
381 } while ((hInput = AdminSession(OP_QUERY, uid, nullptr)) != nullptr);
382 return uid;
383 }
384
385 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)386 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
387 uint32_t sessionId)
388 {
389 HSession hSession = new(std::nothrow) HdcSession();
390 if (!hSession) {
391 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
392 return nullptr;
393 }
394 int ret = 0;
395 ++sessionRef;
396 (void)memset_s(hSession->ctrlFd, sizeof(hSession->ctrlFd), 0, sizeof(hSession->ctrlFd));
397 hSession->classInstance = this;
398 hSession->connType = connType;
399 hSession->classModule = classModule;
400 hSession->isDead = false;
401 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
402 hSession->serverOrDaemon = serverOrDaemon;
403 hSession->hWorkThread = uv_thread_self();
404 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
405 if (hSession->mapTask == nullptr) {
406 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
407 delete hSession;
408 hSession = nullptr;
409 return nullptr;
410 }
411 hSession->listKey = new(std::nothrow) list<void *>;
412 if (hSession->listKey == nullptr) {
413 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
414 delete hSession;
415 hSession = nullptr;
416 return nullptr;
417 }
418 uv_loop_init(&hSession->childLoop);
419 hSession->uvHandleRef = 0;
420 // pullup child
421 WRITE_LOG(LOG_DEBUG, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
422 hSession->sessionId, hSession->connType);
423 uv_tcp_init(&loopMain, &hSession->ctrlPipe[STREAM_MAIN]);
424 (void)memset_s(&hSession->ctrlPipe[STREAM_WORK], sizeof(hSession->ctrlPipe[STREAM_WORK]),
425 0, sizeof(uv_tcp_t));
426 ++hSession->uvHandleRef;
427 Base::CreateSocketPair(hSession->ctrlFd);
428 uv_tcp_open(&hSession->ctrlPipe[STREAM_MAIN], hSession->ctrlFd[STREAM_MAIN]);
429 uv_read_start((uv_stream_t *)&hSession->ctrlPipe[STREAM_MAIN], Base::AllocBufferCallback, ReadCtrlFromSession);
430 hSession->ctrlPipe[STREAM_MAIN].data = hSession;
431 hSession->ctrlPipe[STREAM_WORK].data = hSession;
432 // Activate USB DAEMON's data channel, may not for use
433 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
434 ++hSession->uvHandleRef;
435 Base::CreateSocketPair(hSession->dataFd);
436 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
437 hSession->dataPipe[STREAM_MAIN].data = hSession;
438 hSession->dataPipe[STREAM_WORK].data = hSession;
439 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
440 ret = MallocSessionByConnectType(hSession);
441 if (ret) {
442 delete hSession;
443 hSession = nullptr;
444 } else {
445 AdminSession(OP_ADD, hSession->sessionId, hSession);
446 }
447 return hSession;
448 }
449
FreeSessionByConnectType(HSession hSession)450 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
451 {
452 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
453
454 if (CONN_USB == hSession->connType) {
455 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
456 if (!hSession->hUSB) {
457 return;
458 }
459 HUSB hUSB = hSession->hUSB;
460 if (!hUSB) {
461 return;
462 }
463 #ifdef HDC_HOST
464 if (hUSB->devHandle) {
465 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
466 libusb_close(hUSB->devHandle);
467 hUSB->devHandle = nullptr;
468 }
469 #else
470 if (hUSB->bulkIn > 0) {
471 close(hUSB->bulkIn);
472 hUSB->bulkIn = 0;
473 }
474 if (hUSB->bulkOut > 0) {
475 close(hUSB->bulkOut);
476 hUSB->bulkOut = 0;
477 }
478 #endif
479 delete hSession->hUSB;
480 hSession->hUSB = nullptr;
481 }
482 #ifdef HDC_SUPPORT_UART
483 if (CONN_SERIAL == hSession->connType) {
484 if (!hSession->hUART) {
485 return;
486 }
487 HUART hUART = hSession->hUART;
488 if (!hUART) {
489 return;
490 }
491 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
492 // tell uart session will be free
493 uartBase->StopSession(hSession);
494 #ifdef HDC_HOST
495 #ifdef HOST_MINGW
496 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
497 CloseHandle(hUART->devUartHandle);
498 hUART->devUartHandle = INVALID_HANDLE_VALUE;
499 }
500 #elif defined(HOST_LINUX)
501 if (hUART->devUartHandle >= 0) {
502 close(hUART->devUartHandle);
503 hUART->devUartHandle = -1;
504 }
505 #endif // _WIN32
506 #endif
507 delete hSession->hUART;
508 hSession->hUART = nullptr;
509 }
510 #endif
511 }
512
513 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)514 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
515 {
516 HSession hSession = (HSession)handle->data;
517 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
518 if (hSession->uvHandleRef > 0) {
519 return;
520 }
521 // Notify Server or Daemon, just UI or display commandline
522 thisClass->NotifyInstanceSessionFree(hSession, true);
523 // all hsession uv handle has been clear
524 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
525 WRITE_LOG(LOG_DEBUG, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
526 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
527 delete hSession;
528 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
529 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
530 --thisClass->sessionRef;
531 }
532
533 // work when child-work thread finish
FreeSessionContinue(HSession hSession)534 void HdcSessionBase::FreeSessionContinue(HSession hSession)
535 {
536 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
537 HSession hSession = (HSession)handle->data;
538 --hSession->uvHandleRef;
539 Base::TryCloseHandle((uv_handle_t *)handle);
540 };
541 if (CONN_TCP == hSession->connType) {
542 // Turn off TCP to prevent continuing writing
543 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
544 }
545 hSession->availTailIndex = 0;
546 if (hSession->ioBuf) {
547 delete[] hSession->ioBuf;
548 hSession->ioBuf = nullptr;
549 }
550 Base::TryCloseHandle((uv_handle_t *)&hSession->ctrlPipe[STREAM_MAIN], true, closeSessionTCPHandle);
551 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
552 FreeSessionByConnectType(hSession);
553 // finish
554 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
555 }
556
FreeSessionOpeate(uv_timer_t * handle)557 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
558 {
559 HSession hSession = (HSession)handle->data;
560 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
561 if (hSession->ref > 0) {
562 return;
563 }
564 WRITE_LOG(LOG_DEBUG, "FreeSessionOpeate ref:%u", uint32_t(hSession->ref));
565 #ifdef HDC_HOST
566 if (hSession->hUSB != nullptr
567 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
568 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
569 pUSB->CancelUsbIo(hSession);
570 return;
571 }
572 #endif
573 // wait workthread to free
574 if (hSession->ctrlPipe[STREAM_WORK].loop) {
575 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
576 Base::SendToStream((uv_stream_t *)&hSession->ctrlPipe[STREAM_MAIN], ctrl.data(), ctrl.size());
577 WRITE_LOG(LOG_DEBUG, "FreeSessionOpeate, send workthread fo free. sessionId:%u", hSession->sessionId);
578 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
579 HSession hSession = (HSession)handle->data;
580 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
581 if (!hSession->childCleared) {
582 return;
583 }
584 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
585 thisClass->FreeSessionContinue(hSession);
586 };
587 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
588 } else {
589 thisClass->FreeSessionContinue(hSession);
590 }
591 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
592 }
593
FreeSession(const uint32_t sessionId)594 void HdcSessionBase::FreeSession(const uint32_t sessionId)
595 {
596 if (threadSessionMain != uv_thread_self()) {
597 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
598 return;
599 }
600 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
601 WRITE_LOG(LOG_DEBUG, "Begin to free session, sessionid:%u", sessionId);
602 do {
603 if (!hSession || hSession->isDead) {
604 break;
605 }
606 hSession->isDead = true;
607 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
608 NotifyInstanceSessionFree(hSession, false);
609 WRITE_LOG(LOG_DEBUG, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
610 } while (false);
611 }
612
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)613 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
614 {
615 HSession hRet = nullptr;
616 switch (op) {
617 case OP_ADD:
618 uv_rwlock_wrlock(&lockMapSession);
619 mapSession[sessionId] = hInput;
620 uv_rwlock_wrunlock(&lockMapSession);
621 break;
622 case OP_REMOVE:
623 uv_rwlock_wrlock(&lockMapSession);
624 mapSession.erase(sessionId);
625 uv_rwlock_wrunlock(&lockMapSession);
626 break;
627 case OP_QUERY:
628 uv_rwlock_rdlock(&lockMapSession);
629 if (mapSession.count(sessionId)) {
630 hRet = mapSession[sessionId];
631 }
632 uv_rwlock_rdunlock(&lockMapSession);
633 break;
634 case OP_QUERY_REF:
635 uv_rwlock_wrlock(&lockMapSession);
636 if (mapSession.count(sessionId)) {
637 hRet = mapSession[sessionId];
638 ++hRet->ref;
639 }
640 uv_rwlock_wrunlock(&lockMapSession);
641 break;
642 case OP_UPDATE:
643 uv_rwlock_wrlock(&lockMapSession);
644 // remove old
645 mapSession.erase(sessionId);
646 mapSession[hInput->sessionId] = hInput;
647 uv_rwlock_wrunlock(&lockMapSession);
648 break;
649 case OP_VOTE_RESET:
650 if (mapSession.count(sessionId) == 0) {
651 break;
652 }
653 bool needReset;
654 if (serverOrDaemon) {
655 uv_rwlock_wrlock(&lockMapSession);
656 hRet = mapSession[sessionId];
657 hRet->voteReset = true;
658 needReset = true;
659 for (auto &kv : mapSession) {
660 if (sessionId == kv.first) {
661 continue;
662 }
663 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
664 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
665 if (!kv.second->voteReset) {
666 needReset = false;
667 }
668 }
669 uv_rwlock_wrunlock(&lockMapSession);
670 } else {
671 needReset = true;
672 }
673 if (needReset) {
674 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
675 abort();
676 }
677 break;
678 default:
679 break;
680 }
681 return hRet;
682 }
683
684 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)685 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
686 {
687 HTaskInfo hRet = nullptr;
688 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
689
690 switch (op) {
691 case OP_ADD:
692 #ifndef HDC_HOST
693 // uv sub-thread confiured by threadPoolCount, reserve 2 for main & communicate
694 if (mapTask.size() >= (threadPoolCount - 2)) {
695 WRITE_LOG(LOG_WARN, "mapTask.size:%d, hdc is busy", mapTask.size());
696 break;
697 }
698 #endif
699 hRet = mapTask[channelId];
700 if (hRet != nullptr) {
701 delete hRet;
702 }
703 mapTask[channelId] = hInput;
704 hRet = hInput;
705
706 WRITE_LOG(LOG_DEBUG, "AdminTask add session %u, channelId %u, mapTask size: %zu",
707 hSession->sessionId, channelId, mapTask.size());
708
709 break;
710 case OP_REMOVE:
711 mapTask.erase(channelId);
712 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
713 hSession->sessionId, channelId, mapTask.size());
714 break;
715 case OP_QUERY:
716 if (mapTask.count(channelId)) {
717 hRet = mapTask[channelId];
718 }
719 break;
720 case OP_VOTE_RESET:
721 AdminSession(op, hSession->sessionId, nullptr);
722 break;
723 default:
724 break;
725 }
726 return hRet;
727 }
728
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)729 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo )
730 {
731 if (hSession->isDead) {
732 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
733 return ERR_SESSION_NOFOUND;
734 }
735 int ret = 0;
736 switch (hSession->connType) {
737 case CONN_TCP: {
738 WRITE_LOG(LOG_WARN, "CONN_TCP serverOrDaemon:%d", hSession->serverOrDaemon);
739 if (echo && !hSession->serverOrDaemon) {
740 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hChildWorkTCP, bufPtr, bufLen,
741 nullptr, (void *)FinishWriteSessionTCP, bufPtr);
742 } else {
743 if (hSession->hWorkThread == uv_thread_self()) {
744
745 WRITE_LOG(LOG_WARN, "CONN_TCP:%s serverOrDaemon:%d", bufPtr,
746 hSession->serverOrDaemon);
747 WRITE_LOG(LOG_WARN, "SendByProtocol %p size:%lu", hSession->hWorkTCP, bufLen);
748 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hWorkTCP, bufPtr, bufLen,
749 nullptr, (void *)FinishWriteSessionTCP, bufPtr);
750 } else if (hSession->hWorkChildThread == uv_thread_self()) {
751 WRITE_LOG(LOG_WARN, "CONN_TCP 2:%s serverOrDaemon:%d", bufPtr,
752 hSession->serverOrDaemon);
753 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hChildWorkTCP, bufPtr,
754 bufLen, nullptr, (void *)FinishWriteSessionTCP,
755 bufPtr);
756 } else {
757 WRITE_LOG(LOG_FATAL, "SendByProtocol uncontrol send");
758 ret = ERR_API_FAIL;
759 }
760 }
761 if (ret > 0) {
762 ++hSession->ref;
763 }
764 break;
765 }
766 case CONN_USB: {
767 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
768 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
769 delete[] bufPtr;
770 break;
771 }
772 #ifdef HDC_SUPPORT_UART
773 case CONN_SERIAL: {
774 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
775 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
776 delete[] bufPtr;
777 break;
778 }
779 #endif
780 default:
781 break;
782 }
783 return ret;
784 }
785
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)786 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
787 const uint8_t *data, const int dataSize)
788 {
789 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
790 if (!hSession) {
791 WRITE_LOG(LOG_DEBUG, "Send to offline device, drop it, sessionId:%u", sessionId);
792 return ERR_SESSION_NOFOUND;
793 }
794 PayloadProtect protectBuf; // noneed convert to big-endian
795 protectBuf.channelId = channelId;
796 protectBuf.commandFlag = commandFlag;
797 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
798 protectBuf.vCode = payloadProtectStaticVcode;
799 string s = SerialStruct::SerializeToString(protectBuf);
800 // reserve for encrypt here
801 // xx-encrypt
802
803 PayloadHead payloadHead = {}; // need convert to big-endian
804 payloadHead.flag[0] = PACKET_FLAG.at(0);
805 payloadHead.flag[1] = PACKET_FLAG.at(1);
806 payloadHead.protocolVer = VER_PROTOCOL;
807 payloadHead.headSize = htons(s.size());
808 payloadHead.dataSize = htonl(dataSize);
809 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
810 uint8_t *finayBuf = new uint8_t[finalBufSize]();
811 if (finayBuf == nullptr) {
812 WRITE_LOG(LOG_WARN, "send allocmem err");
813 return ERR_BUF_ALLOC;
814 }
815 bool bufRet = false;
816 do {
817 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
818 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
819 break;
820 }
821 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
822 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
823 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
824 break;
825 }
826 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
827 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
828 break;
829 }
830 bufRet = true;
831 } while (false);
832 if (!bufRet) {
833 delete[] finayBuf;
834 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
835 return ERR_BUF_COPY;
836 }
837 if (CMD_KERNEL_ECHO == commandFlag) {
838 return SendByProtocol(hSession, finayBuf, finalBufSize, true);
839 } else {
840 return SendByProtocol(hSession, finayBuf, finalBufSize);
841 }
842 }
843
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)844 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
845 {
846 PayloadProtect protectBuf = {};
847 uint16_t headSize = ntohs(payloadHeadBe->headSize);
848 int dataSize = ntohl(payloadHeadBe->dataSize);
849 string encString(reinterpret_cast<char *>(encBuf), headSize);
850 SerialStruct::ParseFromString(protectBuf, encString);
851 if (protectBuf.vCode != payloadProtectStaticVcode) {
852 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
853 return ERR_BUF_CHECK;
854 }
855 uint8_t *data = encBuf + headSize;
856 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
857 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
858 return ERR_BUF_CHECK;
859 }
860 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
861 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
862 protectBuf.channelId, protectBuf.commandFlag);
863 return ERR_GENERIC;
864 }
865 return RET_SUCCESS;
866 }
867
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)868 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
869 {
870 int ret = ERR_GENERIC;
871 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
872 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
873 return ERR_BUF_CHECK;
874 }
875 struct PayloadHead *payloadHead = (struct PayloadHead *)bufPtr;
876 int tobeReadLen = ntohl(payloadHead->dataSize) + ntohs(payloadHead->headSize);
877 int packetHeadSize = sizeof(struct PayloadHead);
878 if (tobeReadLen <= 0 || (uint32_t)tobeReadLen > HDC_BUF_MAX_BYTES) {
879 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
880 return ERR_BUF_CHECK;
881 }
882 if (bufLen - packetHeadSize < tobeReadLen) {
883 return 0;
884 }
885 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
886 WRITE_LOG(LOG_WARN, "decrypt plhead error");
887 return ERR_BUF_CHECK;
888 }
889 ret = packetHeadSize + tobeReadLen;
890 return ret;
891 }
892
893 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)894 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
895 {
896 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
897 int indexBuf = 0;
898 int childRet = 0;
899 if (read < 0) {
900 constexpr int bufSize = 1024;
901 char buf[bufSize] = { 0 };
902 uv_strerror_r(read, buf, bufSize);
903 WRITE_LOG(LOG_FATAL, "HdcSessionBase read io failed,%s", buf);
904 return ERR_IO_FAIL;
905 }
906 hSession->availTailIndex += read;
907 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
908 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
909 if (childRet > 0) {
910 hSession->availTailIndex -= childRet;
911 indexBuf += childRet;
912 } else if (childRet == 0) {
913 // Not enough a IO
914 break;
915 } else { // <0
916 hSession->availTailIndex = 0; // Preventing malicious data packages
917 indexBuf = ERR_BUF_SIZE;
918 break;
919 }
920 // It may be multi-time IO to merge in a BUF, need to loop processing
921 }
922 if (indexBuf > 0 && hSession->availTailIndex > 0) {
923 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
924 != EOK) {
925 return ERR_BUF_COPY;
926 };
927 uint8_t *bufToZero = (uint8_t *)(hSession->ioBuf + hSession->availTailIndex);
928 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
929 }
930 return indexBuf;
931 }
932
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)933 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
934 {
935 HSession context = (HSession)handle->data;
936 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
937 buf->base = (char *)context->ioBuf + context->availTailIndex;
938 int size = context->bufSize - context->availTailIndex;
939 buf->len = std::min(size, static_cast<int>(sizeWanted));
940 }
941
FinishWriteSessionTCP(uv_write_t * req,int status)942 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
943 {
944 HSession hSession = (HSession)req->handle->data;
945 --hSession->ref;
946 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
947 if (status < 0) {
948 Base::TryCloseHandle((uv_handle_t *)req->handle);
949 if (!hSession->isDead && !hSession->ref) {
950 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%p", hSession);
951 thisClass->FreeSession(hSession->sessionId);
952 }
953 }
954 delete[]((uint8_t *)req->data);
955 delete req;
956 }
957
DispatchSessionThreadCommand(uv_stream_t * uvpipe,HSession hSession,const uint8_t * baseBuf,const int bytesIO)958 bool HdcSessionBase::DispatchSessionThreadCommand(uv_stream_t *uvpipe, HSession hSession, const uint8_t *baseBuf,
959 const int bytesIO)
960 {
961 bool ret = true;
962 uint8_t flag = *(uint8_t *)baseBuf;
963
964 switch (flag) {
965 case SP_JDWP_NEWFD: {
966 JdwpNewFileDescriptor(baseBuf, bytesIO);
967 break;
968 }
969 default:
970 WRITE_LOG(LOG_WARN, "Not support session command");
971 break;
972 }
973 return ret;
974 }
975
ReadCtrlFromSession(uv_stream_t * uvpipe,ssize_t nread,const uv_buf_t * buf)976 void HdcSessionBase::ReadCtrlFromSession(uv_stream_t *uvpipe, ssize_t nread, const uv_buf_t *buf)
977 {
978 HSession hSession = (HSession)uvpipe->data;
979 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
980 while (true) {
981 if (nread < 0) {
982 constexpr int bufSize = 1024;
983 char buffer[bufSize] = { 0 };
984 uv_strerror_r((int)nread, buffer, bufSize);
985 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed,%s", buffer);
986 uv_read_stop(uvpipe);
987 break;
988 }
989 if (nread > 64) { // 64 : max length
990 WRITE_LOG(LOG_WARN, "HdcSessionBase read overlap data");
991 break;
992 }
993 // only one command, no need to split command from stream
994 // if add more commands, consider the format command
995 hSessionBase->DispatchSessionThreadCommand(uvpipe, hSession, (uint8_t *)buf->base, nread);
996 break;
997 }
998 delete[] buf->base;
999 }
1000
WorkThreadStartSession(HSession hSession)1001 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1002 {
1003 bool regOK = false;
1004 int childRet = 0;
1005 if (hSession->connType == CONN_TCP) {
1006 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1007 hSession->hChildWorkTCP.data = hSession;
1008 if ((childRet = uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP)) < 0) {
1009 WRITE_LOG(LOG_DEBUG, "HdcSessionBase SessionCtrl failed 1");
1010 return false;
1011 }
1012 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1013 constexpr int bufSize = 1024;
1014 char buf[bufSize] = { 0 };
1015 uv_strerror_r(childRet, buf, bufSize);
1016 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1017 return false;
1018 }
1019 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1020 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1021 regOK = true;
1022 #ifdef HDC_SUPPORT_UART
1023 } else if (hSession->connType == CONN_SERIAL) { // UART
1024 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1025 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1026 regOK = pUARTBase->ReadyForWorkThread(hSession);
1027 #endif
1028 } else { // USB
1029 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1030 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1031 regOK = pUSBBase->ReadyForWorkThread(hSession);
1032 }
1033
1034 if (regOK && hSession->serverOrDaemon) {
1035 // session handshake step1
1036 SessionHandShake handshake = {};
1037 handshake.banner = HANDSHAKE_MESSAGE;
1038 handshake.sessionId = hSession->sessionId;
1039 handshake.connectKey = hSession->connectKey;
1040 handshake.authType = AUTH_NONE;
1041 string hs = SerialStruct::SerializeToString(handshake);
1042 #ifdef HDC_SUPPORT_UART
1043 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1044 hSession->sessionId, handshake.authType, hs.c_str());
1045 #endif
1046 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE, (uint8_t *)hs.c_str(), hs.size());
1047 }
1048 return regOK;
1049 }
1050
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1051 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1052 int dataSize)
1053 {
1054 vector<uint8_t> ret;
1055 while (true) {
1056 if (dataSize > BUF_SIZE_MICRO) {
1057 break;
1058 }
1059 CtrlStruct ctrl = {};
1060 ctrl.command = command;
1061 ctrl.channelId = channelId;
1062 ctrl.dataSize = dataSize;
1063 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1064 break;
1065 }
1066 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1067 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1068 break;
1069 }
1070 return ret;
1071 }
1072
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1073 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1074 {
1075 bool ret = true;
1076 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1077 switch (ctrl->command) {
1078 case SP_START_SESSION: {
1079 WRITE_LOG(LOG_DEBUG, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1080 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1081 ret = WorkThreadStartSession(hSession);
1082 break;
1083 }
1084 case SP_STOP_SESSION: {
1085 WRITE_LOG(LOG_DEBUG, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1086 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1087 HSession hSession = (HSession)handle->data;
1088 Base::TryCloseHandle((uv_handle_t *)handle);
1089 if (--hSession->uvChildRef == 0) {
1090 uv_stop(&hSession->childLoop);
1091 };
1092 };
1093 hSession->uvChildRef += 2;
1094 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1095 ++hSession->uvChildRef;
1096 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1097 }
1098 Base::TryCloseHandle((uv_handle_t *)&hSession->ctrlPipe[STREAM_WORK], true,
1099 closeSessionChildThreadTCPHandle);
1100 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1101 closeSessionChildThreadTCPHandle);
1102 break;
1103 }
1104 case SP_ATTACH_CHANNEL: {
1105 if (!serverOrDaemon) {
1106 break; // Only Server has this feature
1107 }
1108 AttachChannel(hSession, channelId);
1109 break;
1110 }
1111 case SP_DEATCH_CHANNEL: {
1112 if (!serverOrDaemon) {
1113 break; // Only Server has this feature
1114 }
1115 DeatchChannel(hSession, channelId);
1116 break;
1117 }
1118 default:
1119 WRITE_LOG(LOG_WARN, "Not support main command");
1120 ret = false;
1121 break;
1122 }
1123 return ret;
1124 }
1125
1126 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_stream_t * uvpipe,ssize_t nread,const uv_buf_t * buf)1127 void HdcSessionBase::ReadCtrlFromMain(uv_stream_t *uvpipe, ssize_t nread, const uv_buf_t *buf)
1128 {
1129 HSession hSession = (HSession)uvpipe->data;
1130 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1131 int formatCommandSize = sizeof(CtrlStruct);
1132 int index = 0;
1133 bool ret = true;
1134 while (true) {
1135 if (nread < 0) {
1136 constexpr int bufSize = 1024;
1137 char buffer[bufSize] = { 0 };
1138 uv_strerror_r((int)nread, buffer, bufSize);
1139 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed,%s", buffer);
1140 ret = false;
1141 break;
1142 }
1143 if (nread % formatCommandSize != 0) {
1144 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1145 ret = false;
1146 break;
1147 }
1148 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf->base + index);
1149 if (!(ret = hSessionBase->DispatchMainThreadCommand(hSession, ctrl))) {
1150 ret = false;
1151 break;
1152 }
1153 index += sizeof(CtrlStruct);
1154 if (index >= nread) {
1155 break;
1156 }
1157 }
1158 delete[] buf->base;
1159 }
1160
ReChildLoopForSessionClear(HSession hSession)1161 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1162 {
1163 // Restart loop close task
1164 ClearOwnTasks(hSession, 0);
1165 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1166 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1167 HSession hSession = (HSession)handle->data;
1168 for (auto v : *hSession->mapTask) {
1169 HTaskInfo hTask = (HTaskInfo)v.second;
1170 uint8_t level;
1171 if (hTask->closeRetryCount < GLOBAL_TIMEOUT / 2) {
1172 level = LOG_DEBUG;
1173 } else {
1174 level = LOG_WARN;
1175 }
1176 WRITE_LOG(level, "wait task free retry %d/%d, channelId:%u, sessionId:%u",
1177 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
1178 if (hTask->closeRetryCount++ >= GLOBAL_TIMEOUT) {
1179 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
1180 hSession = thisClass->AdminSession(OP_QUERY, hTask->sessionId, nullptr);
1181 thisClass->AdminTask(OP_VOTE_RESET, hSession, hTask->channelId, nullptr);
1182 }
1183 if (!hTask->taskFree)
1184 return;
1185 }
1186 // all task has been free
1187 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1188 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1189 };
1190 Base::TimerUvTask(&hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1191 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1192 // clear
1193 Base::TryCloseLoop(&hSession->childLoop, "Session childUV");
1194 }
1195
SessionWorkThread(uv_work_t * arg)1196 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1197 {
1198 int childRet = 0;
1199 HSession hSession = (HSession)arg->data;
1200 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1201 hSession->hWorkChildThread = uv_thread_self();
1202 if ((childRet = uv_tcp_init(&hSession->childLoop, &hSession->ctrlPipe[STREAM_WORK])) < 0) {
1203 constexpr int bufSize = 1024;
1204 char buf[bufSize] = { 0 };
1205 uv_strerror_r(childRet, buf, bufSize);
1206 WRITE_LOG(LOG_DEBUG, "SessionCtrl err1, %s", buf);
1207 }
1208 if ((childRet = uv_tcp_open(&hSession->ctrlPipe[STREAM_WORK], hSession->ctrlFd[STREAM_WORK])) < 0) {
1209 constexpr int bufSize = 1024;
1210 char buf[bufSize] = { 0 };
1211 uv_strerror_r(childRet, buf, bufSize);
1212 WRITE_LOG(LOG_DEBUG, "SessionCtrl err2, %s fd:%d", buf, hSession->ctrlFd[STREAM_WORK]);
1213 }
1214 uv_read_start((uv_stream_t *)&hSession->ctrlPipe[STREAM_WORK], Base::AllocBufferCallback, ReadCtrlFromMain);
1215 WRITE_LOG(LOG_DEBUG, "!!!Workthread run begin, sessionId:%u instance:%s", hSession->sessionId,
1216 thisClass->serverOrDaemon ? "server" : "daemon");
1217 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1218 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1219 // main loop has exit
1220 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1221 hSession->childCleared = true;
1222 WRITE_LOG(LOG_DEBUG, "!!!Workthread run finish, sessionId:%u workthread:%p", hSession->sessionId, uv_thread_self());
1223 }
1224
1225 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1226 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1227 MessageLevel level, const char *msg, ...)
1228 // clang-format on
1229 {
1230 va_list vaArgs;
1231 va_start(vaArgs, msg);
1232 string log = Base::StringFormat(msg, vaArgs);
1233 va_end(vaArgs);
1234 vector<uint8_t> buf;
1235 buf.push_back(level);
1236 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1237 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1238 }
1239
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1240 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1241 {
1242 // referer from HdcServerForClient::DoCommandRemote
1243 bool ret = false;
1244 bool taskMasterInit = false;
1245 masterTask = false;
1246 switch (command) {
1247 case CMD_FILE_INIT:
1248 case CMD_FORWARD_INIT:
1249 case CMD_APP_INIT:
1250 case CMD_APP_UNINSTALL:
1251 case CMD_UNITY_BUGREPORT_INIT:
1252 case CMD_APP_SIDELOAD:
1253 taskMasterInit = true;
1254 break;
1255 default:
1256 break;
1257 }
1258 if (!serverOrDaemon
1259 && (command == CMD_SHELL_INIT || (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1260 // daemon's single side command
1261 ret = true;
1262 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1263 // slave tasks
1264 ret = true;
1265 } else if (taskMasterInit) {
1266 // task init command
1267 masterTask = true;
1268 ret = true;
1269 }
1270 return ret;
1271 }
1272 // Heavy and time-consuming work was putted in the new thread to do, and does
1273 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1274 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1275 uint8_t *payload, int payloadSize)
1276 {
1277 bool ret = true;
1278 HTaskInfo hTaskInfo = nullptr;
1279 bool masterTask = false;
1280 while (true) {
1281 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1282 if (NeedNewTaskInfo(command, masterTask)) {
1283 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1284 hTaskInfo = new(std::nothrow) TaskInformation();
1285 if (hTaskInfo == nullptr) {
1286 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1287 break;
1288 }
1289 hTaskInfo->channelId = channelId;
1290 hTaskInfo->sessionId = hSession->sessionId;
1291 hTaskInfo->runLoop = &hSession->childLoop;
1292 hTaskInfo->serverOrDaemon = serverOrDaemon;
1293 hTaskInfo->masterSlave = masterTask;
1294 hTaskInfo->closeRetryCount = 0;
1295
1296 int addTaskRetry = 3; // try 3 time
1297 while (addTaskRetry > 0) {
1298 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1299 break;
1300 }
1301 sleep(1);
1302 --addTaskRetry;
1303 }
1304
1305 if (addTaskRetry == 0) {
1306 #ifndef HDC_HOST
1307 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId, MSG_FAIL, "hdc thread pool busy, may cause reset later");
1308 #endif
1309 delete hTaskInfo;
1310 hTaskInfo = nullptr;
1311 ret = false;
1312 break;
1313 }
1314 } else {
1315 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1316 }
1317 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1318 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1319 break;
1320 }
1321 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1322 break;
1323 }
1324 return ret;
1325 }
1326
PostStopInstanceMessage(bool restart)1327 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1328 {
1329 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1330 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1331 wantRestart = restart;
1332 }
1333 } // namespace Hdc
1334