1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24 // print version pid
25 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26 // server/daemon common initialization code
27 if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28 uvThreadSize = SIZE_THREAD_POOL_MIN;
29 } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30 uvThreadSize = SIZE_THREAD_POOL_MAX;
31 }
32 threadPoolCount = uvThreadSize;
33 WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34 string uvThreadEnv("UV_THREADPOOL_SIZE");
35 string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37 uvThreadEnv += "=";
38 uvThreadEnv += uvThreadVal;
39 _putenv(uvThreadEnv.c_str());
40 #else
41 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43 uv_loop_init(&loopMain);
44 WRITE_LOG(LOG_DEBUG, "loopMain init");
45 uv_rwlock_init(&mainAsync);
46 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
47 uv_rwlock_init(&lockMapSession);
48 serverOrDaemon = serverOrDaemonIn;
49 ctxUSB = nullptr;
50 wantRestart = false;
51 threadSessionMain = uv_thread_self();
52
53 #ifdef HDC_HOST
54 if (serverOrDaemon) {
55 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
56 ctxUSB = nullptr;
57 WRITE_LOG(LOG_FATAL, "libusb_init failed ctxUSB is nullptr");
58 }
59 }
60 #endif
61 }
62
~HdcSessionBase()63 HdcSessionBase::~HdcSessionBase()
64 {
65 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
66 uv_loop_close(&loopMain);
67 // clear base
68 uv_rwlock_destroy(&mainAsync);
69 uv_rwlock_destroy(&lockMapSession);
70 #ifdef HDC_HOST
71 if (serverOrDaemon and ctxUSB != nullptr) {
72 libusb_exit((libusb_context *)ctxUSB);
73 }
74 #endif
75 WRITE_LOG(LOG_WARN, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
76 serverOrDaemon ? "server" : "daemon");
77 }
78
79 // remove step2
TryRemoveTask(HTaskInfo hTask)80 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
81 {
82 if (hTask->taskFree) {
83 WRITE_LOG(LOG_WARN, "TryRemoveTask channelId:%u", hTask->channelId);
84 return true;
85 }
86 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
87 if (ret) {
88 hTask->taskFree = true;
89 } else {
90 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
91 // here to see which task has not been released
92 // print task clear
93 }
94 return ret;
95 }
96
97 // remove step1
BeginRemoveTask(HTaskInfo hTask)98 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
99 {
100 StartTraceScope("HdcSessionBase::BeginRemoveTask");
101 if (hTask->taskStop || hTask->taskFree) {
102 WRITE_LOG(LOG_WARN, "BeginRemoveTask channelId:%u taskStop:%d taskFree:%d",
103 hTask->channelId, hTask->taskStop, hTask->taskFree);
104 return;
105 }
106
107 WRITE_LOG(LOG_WARN, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
108 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
109 StartTraceScope("HdcSessionBase::BeginRemoveTask taskClassDeleteRetry");
110 HTaskInfo hTask = (HTaskInfo)handle->data;
111 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
112 if (hTask->isCleared == false) {
113 hTask->isCleared = true;
114 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry start clear task, taskType:%d cid:%u sid:%u",
115 hTask->taskType, hTask->channelId, hTask->sessionId);
116 bool ret = thisClass->RemoveInstanceTask(OP_CLEAR, hTask);
117 if (!ret) {
118 WRITE_LOG(LOG_WARN, "taskClassDeleteRetry RemoveInstanceTask return false taskType:%d cid:%u sid:%u",
119 hTask->taskType, hTask->channelId, hTask->sessionId);
120 }
121 }
122
123 constexpr uint32_t count = 1000;
124 if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
125 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
126 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
127 hTask->closeRetryCount = 1;
128 }
129 hTask->closeRetryCount++;
130 if (!thisClass->TryRemoveTask(hTask)) {
131 WRITE_LOG(LOG_WARN, "TaskDelay TryRemoveTask false channelId:%u", hTask->channelId);
132 return;
133 }
134 WRITE_LOG(LOG_WARN, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
135 if (hTask != nullptr) {
136 delete hTask;
137 hTask = nullptr;
138 }
139 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
140 };
141 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
142
143 hTask->taskStop = true;
144 }
145
146 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
147 // after the end of the LOOP.
148 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
149 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)150 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
151 {
152 // First case: normal task cleanup process (STOP Remove)
153 // Second: The task is cleaned up, the session ends
154 // Third: The task is cleaned up, and the session is directly over the session.
155 StartTraceScope("HdcSessionBase::ClearOwnTasks");
156 hSession->mapTaskMutex.lock();
157 map<uint32_t, HTaskInfo>::iterator iter;
158 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
159 uint32_t channelId = iter->first;
160 HTaskInfo hTask = iter->second;
161 if (channelIDInput != 0) { // single
162 if (channelIDInput != channelId) {
163 ++iter;
164 continue;
165 }
166 BeginRemoveTask(hTask);
167 WRITE_LOG(LOG_WARN, "ClearOwnTasks OP_CLEAR finish, sessionId:%u channelIDInput:%u",
168 hSession->sessionId, channelIDInput);
169 iter = hSession->mapTask->erase(iter);
170 break;
171 }
172 // multi
173 BeginRemoveTask(hTask);
174 iter = hSession->mapTask->erase(iter);
175 }
176 hSession->mapTaskMutex.unlock();
177 }
178
ClearSessions()179 void HdcSessionBase::ClearSessions()
180 {
181 // no need to lock mapSession
182 // broadcast free signal
183 for (auto v : mapSession) {
184 HSession hSession = (HSession)v.second;
185 if (!hSession->isDead) {
186 FreeSession(hSession->sessionId);
187 }
188 }
189 }
190
ReMainLoopForInstanceClear()191 void HdcSessionBase::ReMainLoopForInstanceClear()
192 { // reloop
193 StartTraceScope("HdcSessionBase::ReMainLoopForInstanceClear");
194 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
195 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
196 if (thisClass->sessionRef > 0) {
197 return;
198 }
199 // all task has been free
200 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
201 uv_stop(&thisClass->loopMain);
202 };
203 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
204 uv_run(&loopMain, UV_RUN_DEFAULT);
205 };
206
207 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)208 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
209 {
210 uv_rwlock_rdlock(&lockMapSession);
211 map<uint32_t, HSession>::iterator i;
212 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
213 HSession hs = i->second;
214 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
215 continue;
216 }
217 kickOut(hs);
218 break;
219 }
220 uv_rwlock_rdunlock(&lockMapSession);
221 }
222 #endif
223
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))224 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
225 {
226 if (!pCallBack) {
227 return;
228 }
229 uv_rwlock_rdlock(&lockMapSession);
230 map<uint32_t, HSession>::iterator i;
231 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
232 HSession hs = i->second;
233 if (hs->connType != CONN_USB) {
234 continue;
235 }
236 if (hs->hUSB == nullptr) {
237 continue;
238 }
239 if (pCallBack) {
240 pCallBack(hs);
241 }
242 break;
243 }
244 uv_rwlock_rdunlock(&lockMapSession);
245 }
246
247 // The PC side gives the device information, determines if the USB device is registered
248 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)249 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
250 {
251 #ifdef HDC_HOST
252 libusb_device *dev = (libusb_device *)pDev;
253 HSession hResult = nullptr;
254 if (!mapSession.size()) {
255 return nullptr;
256 }
257 uint8_t busId = 0;
258 uint8_t devId = 0;
259 if (pDev) {
260 busId = libusb_get_bus_number(dev);
261 devId = libusb_get_device_address(dev);
262 } else {
263 busId = busIDIn;
264 devId = devIDIn;
265 }
266 uv_rwlock_rdlock(&lockMapSession);
267 map<uint32_t, HSession>::iterator i;
268 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
269 HSession hs = i->second;
270 if (hs->connType == CONN_USB) {
271 continue;
272 }
273 if (hs->hUSB == nullptr) {
274 continue;
275 }
276 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
277 continue;
278 }
279 hResult = hs;
280 break;
281 }
282 uv_rwlock_rdunlock(&lockMapSession);
283 return hResult;
284 #else
285 return nullptr;
286 #endif
287 }
288
AsyncMainLoopTask(uv_idle_t * handle)289 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
290 {
291 AsyncParam *param = (AsyncParam *)handle->data;
292 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
293 switch (param->method) {
294 case ASYNC_FREE_SESSION:
295 // Destruction is unified in the main thread
296 thisClass->FreeSession(param->sid);
297 break;
298 case ASYNC_STOP_MAINLOOP:
299 uv_stop(&thisClass->loopMain);
300 break;
301 default:
302 break;
303 }
304 if (param->data) {
305 delete[]((uint8_t *)param->data);
306 }
307 delete param;
308 param = nullptr;
309 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
310 }
311
MainAsyncCallback(uv_async_t * handle)312 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
313 {
314 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
315 list<void *>::iterator i;
316 list<void *> &lst = thisClass->lstMainThreadOP;
317 uv_rwlock_wrlock(&thisClass->mainAsync);
318 for (i = lst.begin(); i != lst.end();) {
319 AsyncParam *param = (AsyncParam *)*i;
320 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
321 i = lst.erase(i);
322 }
323 uv_rwlock_wrunlock(&thisClass->mainAsync);
324 }
325
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)326 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
327 const int dataSize)
328 {
329 AsyncParam *param = new AsyncParam();
330 if (!param) {
331 return;
332 }
333 param->sid = sessionId;
334 param->thisClass = this;
335 param->method = method;
336 if (dataSize > 0) {
337 param->dataSize = dataSize;
338 param->data = new uint8_t[param->dataSize]();
339 if (!param->data) {
340 delete param;
341 return;
342 }
343 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
344 delete[]((uint8_t *)param->data);
345 delete param;
346 return;
347 }
348 }
349
350 asyncMainLoop.data = this;
351 uv_rwlock_wrlock(&mainAsync);
352 lstMainThreadOP.push_back(param);
353 uv_rwlock_wrunlock(&mainAsync);
354 uv_async_send(&asyncMainLoop);
355 }
356
WorkerPendding()357 void HdcSessionBase::WorkerPendding()
358 {
359 uv_run(&loopMain, UV_RUN_DEFAULT);
360 ClearInstanceResource();
361 }
362
MallocSessionByConnectType(HSession hSession)363 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
364 {
365 int ret = 0;
366 switch (hSession->connType) {
367 case CONN_TCP: {
368 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
369 ++hSession->uvHandleRef;
370 hSession->hWorkTCP.data = hSession;
371 break;
372 }
373 case CONN_USB: {
374 // Some members need to be placed at the primary thread
375 HUSB hUSB = new HdcUSB();
376 if (!hUSB) {
377 ret = -1;
378 break;
379 }
380 hSession->hUSB = hUSB;
381 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
382 break;
383 }
384 #ifdef HDC_SUPPORT_UART
385 case CONN_SERIAL: {
386 HUART hUART = new HdcUART();
387 if (!hUART) {
388 ret = -1;
389 break;
390 }
391 hSession->hUART = hUART;
392 break;
393 }
394 #endif // HDC_SUPPORT_UART
395 default:
396 ret = -1;
397 break;
398 }
399 return ret;
400 }
401
402 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()403 uint32_t HdcSessionBase::GetSessionPseudoUid()
404 {
405 uint32_t uid = 0;
406 do {
407 uid = Base::GetSecureRandom();
408 } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
409 return uid;
410 }
411
412 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)413 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
414 uint32_t sessionId)
415 {
416 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
417 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
418 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
419 #endif
420 HSession hSession = new(std::nothrow) HdcSession();
421 if (!hSession) {
422 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
423 return nullptr;
424 }
425 int ret = 0;
426 ++sessionRef;
427 hSession->classInstance = this;
428 hSession->connType = connType;
429 hSession->classModule = classModule;
430 hSession->isDead = false;
431 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
432 hSession->serverOrDaemon = serverOrDaemon;
433 hSession->hWorkThread = uv_thread_self();
434 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
435 if (hSession->mapTask == nullptr) {
436 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
437 delete hSession;
438 hSession = nullptr;
439 return nullptr;
440 }
441 hSession->listKey = new(std::nothrow) list<void *>;
442 if (hSession->listKey == nullptr) {
443 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
444 delete hSession;
445 hSession = nullptr;
446 return nullptr;
447 }
448 uv_loop_init(&hSession->childLoop);
449 hSession->uvHandleRef = 0;
450 // pullup child
451 WRITE_LOG(LOG_INFO, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
452 hSession->sessionId, hSession->connType);
453 ++hSession->uvHandleRef;
454 Base::CreateSocketPair(hSession->ctrlFd);
455 size_t handleSize = sizeof(uv_poll_t);
456 hSession->pollHandle[STREAM_WORK] = (uv_poll_t *)malloc(handleSize);
457 hSession->pollHandle[STREAM_MAIN] = (uv_poll_t *)malloc(handleSize);
458 uv_poll_t *pollHandleMain = hSession->pollHandle[STREAM_MAIN];
459 if (pollHandleMain == nullptr || hSession->pollHandle[STREAM_WORK] == nullptr) {
460 WRITE_LOG(LOG_FATAL, "MallocSession malloc hSession->pollHandle failed");
461 delete hSession;
462 hSession = nullptr;
463 return nullptr;
464 }
465 uv_poll_init_socket(&loopMain, pollHandleMain, hSession->ctrlFd[STREAM_MAIN]);
466 uv_poll_start(pollHandleMain, UV_READABLE, ReadCtrlFromSession);
467 hSession->pollHandle[STREAM_MAIN]->data = hSession;
468 hSession->pollHandle[STREAM_WORK]->data = hSession;
469 // Activate USB DAEMON's data channel, may not for use
470 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
471 (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
472 0, sizeof(uv_tcp_t));
473 ++hSession->uvHandleRef;
474 Base::CreateSocketPair(hSession->dataFd);
475 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
476 hSession->dataPipe[STREAM_MAIN].data = hSession;
477 hSession->dataPipe[STREAM_WORK].data = hSession;
478 #ifdef HDC_HOST
479 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN], HOST_SOCKETPAIR_SIZE);
480 #else
481 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
482 #endif
483 ret = MallocSessionByConnectType(hSession);
484 if (ret) {
485 delete hSession;
486 hSession = nullptr;
487 } else {
488 AdminSession(OP_ADD, hSession->sessionId, hSession);
489 }
490 return hSession;
491 }
492
FreeSessionByConnectType(HSession hSession)493 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
494 {
495 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
496
497 if (hSession->connType == CONN_USB) {
498 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
499 if (!hSession->hUSB) {
500 return;
501 }
502 HUSB hUSB = hSession->hUSB;
503 if (!hUSB) {
504 return;
505 }
506 #ifdef HDC_HOST
507 if (hUSB->devHandle) {
508 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
509 libusb_close(hUSB->devHandle);
510 hUSB->devHandle = nullptr;
511 }
512 #else
513 Base::CloseFd(hUSB->bulkIn);
514 Base::CloseFd(hUSB->bulkOut);
515 #endif
516 delete hSession->hUSB;
517 hSession->hUSB = nullptr;
518 }
519 #ifdef HDC_SUPPORT_UART
520 if (CONN_SERIAL == hSession->connType) {
521 if (!hSession->hUART) {
522 return;
523 }
524 HUART hUART = hSession->hUART;
525 if (!hUART) {
526 return;
527 }
528 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
529 // tell uart session will be free
530 uartBase->StopSession(hSession);
531 #ifdef HDC_HOST
532 #ifdef HOST_MINGW
533 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
534 CloseHandle(hUART->devUartHandle);
535 hUART->devUartHandle = INVALID_HANDLE_VALUE;
536 }
537 #elif defined(HOST_LINUX)
538 Base::CloseFd(hUART->devUartHandle);
539 #endif // _WIN32
540 #endif
541 delete hSession->hUART;
542 hSession->hUART = nullptr;
543 }
544 #endif
545 }
546
547 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)548 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
549 {
550 HSession hSession = (HSession)handle->data;
551 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
552 if (hSession->uvHandleRef > 0) {
553 WRITE_LOG(LOG_INFO, "FreeSessionFinally uvHandleRef:%d sessionId:%u",
554 hSession->uvHandleRef, hSession->sessionId);
555 return;
556 }
557 // Notify Server or Daemon, just UI or display commandline
558 thisClass->NotifyInstanceSessionFree(hSession, true);
559 // all hsession uv handle has been clear
560 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
561 WRITE_LOG(LOG_INFO, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
562 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
563 delete hSession;
564 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
565 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
566 --thisClass->sessionRef;
567 }
568
569 // work when child-work thread finish
FreeSessionContinue(HSession hSession)570 void HdcSessionBase::FreeSessionContinue(HSession hSession)
571 {
572 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
573 HSession hSession = (HSession)handle->data;
574 --hSession->uvHandleRef;
575 Base::TryCloseHandle((uv_handle_t *)handle);
576 if (handle == reinterpret_cast<uv_handle_t *>(hSession->pollHandle[STREAM_MAIN])) {
577 Base::CloseFd(hSession->ctrlFd[STREAM_MAIN]);
578 Base::CloseFd(hSession->ctrlFd[STREAM_WORK]);
579 free(hSession->pollHandle[STREAM_MAIN]);
580 }
581 };
582 if (hSession->connType == CONN_TCP) {
583 // Turn off TCP to prevent continuing writing
584 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
585 Base::CloseFd(hSession->dataFd[STREAM_WORK]);
586 }
587 hSession->availTailIndex = 0;
588 if (hSession->ioBuf) {
589 delete[] hSession->ioBuf;
590 hSession->ioBuf = nullptr;
591 }
592 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_MAIN], true, closeSessionTCPHandle);
593 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
594 FreeSessionByConnectType(hSession);
595 // finish
596 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
597 }
598
FreeSessionOpeate(uv_timer_t * handle)599 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
600 {
601 StartTraceScope("HdcSessionBase::FreeSessionOpeate");
602 HSession hSession = (HSession)handle->data;
603 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
604 if (hSession->ref > 0) {
605 WRITE_LOG(LOG_WARN, "FreeSessionOpeate sid:%u ref:%u > 0", hSession->sessionId, uint32_t(hSession->ref));
606 return;
607 }
608 WRITE_LOG(LOG_INFO, "FreeSessionOpeate sid:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
609 #ifdef HDC_HOST
610 if (hSession->hUSB != nullptr
611 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
612 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
613 pUSB->CancelUsbIo(hSession);
614 return;
615 }
616 #endif
617 // wait workthread to free
618 if (hSession->pollHandle[STREAM_WORK]->loop) {
619 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
620 Base::SendToPollFd(hSession->ctrlFd[STREAM_MAIN], ctrl.data(), ctrl.size());
621 WRITE_LOG(LOG_INFO, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
622 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
623 HSession hSession = (HSession)handle->data;
624 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
625 if (!hSession->childCleared) {
626 WRITE_LOG(LOG_INFO, "FreeSessionOpeate childCleared:%d sessionId:%u",
627 hSession->childCleared, hSession->sessionId);
628 return;
629 }
630 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
631 thisClass->FreeSessionContinue(hSession);
632 };
633 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
634 } else {
635 thisClass->FreeSessionContinue(hSession);
636 }
637 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
638 }
639
FreeSession(const uint32_t sessionId)640 void HdcSessionBase::FreeSession(const uint32_t sessionId)
641 {
642 StartTraceScope("HdcSessionBase::FreeSession");
643 AddDeletedSessionId(sessionId);
644 if (threadSessionMain != uv_thread_self()) {
645 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
646 return;
647 }
648 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
649 WRITE_LOG(LOG_INFO, "Begin to free session, sessionid:%u", sessionId);
650 do {
651 if (!hSession || hSession->isDead) {
652 WRITE_LOG(LOG_WARN, "FreeSession hSession nullptr or isDead sessionId:%u", sessionId);
653 break;
654 }
655 WRITE_LOG(LOG_INFO, "dataFdSend:%llu, dataFdRecv:%llu",
656 uint64_t(hSession->stat.dataSendBytes),
657 uint64_t(hSession->stat.dataRecvBytes));
658 hSession->isDead = true;
659 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
660 NotifyInstanceSessionFree(hSession, false);
661 WRITE_LOG(LOG_INFO, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
662 } while (false);
663 }
664
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)665 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
666 {
667 HSession hRet = nullptr;
668 switch (op) {
669 case OP_ADD:
670 uv_rwlock_wrlock(&lockMapSession);
671 mapSession[sessionId] = hInput;
672 uv_rwlock_wrunlock(&lockMapSession);
673 break;
674 case OP_REMOVE:
675 uv_rwlock_wrlock(&lockMapSession);
676 mapSession.erase(sessionId);
677 uv_rwlock_wrunlock(&lockMapSession);
678 break;
679 case OP_QUERY:
680 uv_rwlock_rdlock(&lockMapSession);
681 if (mapSession.count(sessionId)) {
682 hRet = mapSession[sessionId];
683 }
684 uv_rwlock_rdunlock(&lockMapSession);
685 break;
686 case OP_QUERY_REF:
687 uv_rwlock_wrlock(&lockMapSession);
688 if (mapSession.count(sessionId)) {
689 hRet = mapSession[sessionId];
690 ++hRet->ref;
691 }
692 uv_rwlock_wrunlock(&lockMapSession);
693 break;
694 case OP_UPDATE:
695 uv_rwlock_wrlock(&lockMapSession);
696 // remove old
697 mapSession.erase(sessionId);
698 mapSession[hInput->sessionId] = hInput;
699 uv_rwlock_wrunlock(&lockMapSession);
700 break;
701 case OP_VOTE_RESET:
702 if (mapSession.count(sessionId) == 0) {
703 break;
704 }
705 bool needReset;
706 if (serverOrDaemon) {
707 uv_rwlock_wrlock(&lockMapSession);
708 hRet = mapSession[sessionId];
709 hRet->voteReset = true;
710 needReset = true;
711 for (auto &kv : mapSession) {
712 if (sessionId == kv.first) {
713 continue;
714 }
715 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
716 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
717 if (!kv.second->voteReset) {
718 needReset = false;
719 }
720 }
721 uv_rwlock_wrunlock(&lockMapSession);
722 } else {
723 needReset = true;
724 }
725 if (needReset) {
726 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
727 abort();
728 }
729 break;
730 default:
731 break;
732 }
733 return hRet;
734 }
735
AddDeletedSessionId(uint32_t sessionId)736 void HdcSessionBase::AddDeletedSessionId(uint32_t sessionId)
737 {
738 std::unique_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
739 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
740 WRITE_LOG(LOG_INFO, "SessionId:%u is already in the cache", sessionId);
741 return;
742 }
743 WRITE_LOG(LOG_INFO, "AddDeletedSessionId:%u", sessionId);
744 deletedSessionIdSet.insert(sessionId);
745 deletedSessionIdQueue.push(sessionId);
746
747 // Delete old records and only save MAX_DELETED_SESSION_ID_RECORD_COUNT records
748 if (deletedSessionIdQueue.size() > MAX_DELETED_SESSION_ID_RECORD_COUNT) {
749 uint32_t id = deletedSessionIdQueue.front();
750 WRITE_LOG(LOG_INFO, "deletedSessionIdQueue size:%u, deletedSessionIdSet size:%u, pop session id:%u",
751 deletedSessionIdQueue.size(), deletedSessionIdSet.size(), id);
752 deletedSessionIdQueue.pop();
753 deletedSessionIdSet.erase(id);
754 }
755 }
756
IsSessionDeleted(uint32_t sessionId) const757 bool HdcSessionBase::IsSessionDeleted(uint32_t sessionId) const
758 {
759 std::shared_lock<std::shared_mutex> lock(deletedSessionIdRecordMutex);
760 if (deletedSessionIdSet.find(sessionId) != deletedSessionIdSet.end()) {
761 return true;
762 }
763 return false;
764 }
765
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)766 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
767 {
768 int idx = 1;
769 for (auto t : mapTask) {
770 HTaskInfo ti = t.second;
771 WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
772 idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
773 }
774 }
775
776 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)777 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
778 {
779 HTaskInfo hRet = nullptr;
780 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
781
782 switch (op) {
783 case OP_ADD:
784 hRet = mapTask[channelId];
785 if (hRet != nullptr) {
786 delete hRet;
787 }
788 mapTask[channelId] = hInput;
789 hRet = hInput;
790
791 WRITE_LOG(LOG_WARN, "AdminTask add session %u, channelId %u, mapTask size: %zu",
792 hSession->sessionId, channelId, mapTask.size());
793
794 break;
795 case OP_REMOVE:
796 mapTask.erase(channelId);
797 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
798 hSession->sessionId, channelId, mapTask.size());
799 break;
800 case OP_QUERY:
801 if (mapTask.count(channelId)) {
802 hRet = mapTask[channelId];
803 }
804 break;
805 case OP_VOTE_RESET:
806 AdminSession(op, hSession->sessionId, nullptr);
807 break;
808 default:
809 break;
810 }
811 return hRet;
812 }
813
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)814 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo)
815 {
816 StartTraceScope("HdcSessionBase::SendByProtocol");
817 if (hSession->isDead) {
818 delete[] bufPtr;
819 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
820 return ERR_SESSION_NOFOUND;
821 }
822 int ret = 0;
823 switch (hSession->connType) {
824 case CONN_TCP: {
825 HdcTCPBase *pTCP = ((HdcTCPBase *)hSession->classModule);
826 if (echo && !hSession->serverOrDaemon) {
827 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
828 } else {
829 if (hSession->hWorkThread == uv_thread_self()) {
830 ret = pTCP->WriteUvTcpFd(&hSession->hWorkTCP, bufPtr, bufLen);
831 } else {
832 ret = pTCP->WriteUvTcpFd(&hSession->hChildWorkTCP, bufPtr, bufLen);
833 }
834 }
835 break;
836 }
837 case CONN_USB: {
838 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
839 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
840 delete[] bufPtr;
841 break;
842 }
843 #ifdef HDC_SUPPORT_UART
844 case CONN_SERIAL: {
845 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
846 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
847 delete[] bufPtr;
848 break;
849 }
850 #endif
851 default:
852 break;
853 }
854 return ret;
855 }
856
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)857 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
858 const uint8_t *data, const int dataSize)
859 {
860 StartTraceScope("HdcSessionBase::Send");
861 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
862 if (!hSession) {
863 WRITE_LOG(LOG_WARN, "Send to offline device, drop it, sessionId:%u", sessionId);
864 return ERR_SESSION_NOFOUND;
865 }
866 PayloadProtect protectBuf; // noneed convert to big-endian
867 protectBuf.channelId = channelId;
868 protectBuf.commandFlag = commandFlag;
869 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
870 protectBuf.vCode = payloadProtectStaticVcode;
871 string s = SerialStruct::SerializeToString(protectBuf);
872 // reserve for encrypt here
873 // xx-encrypt
874
875 PayloadHead payloadHead = {}; // need convert to big-endian
876 payloadHead.flag[0] = PACKET_FLAG.at(0);
877 payloadHead.flag[1] = PACKET_FLAG.at(1);
878 payloadHead.protocolVer = VER_PROTOCOL;
879 payloadHead.headSize = htons(s.size());
880 payloadHead.dataSize = htonl(dataSize);
881 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
882 uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
883 if (finayBuf == nullptr) {
884 WRITE_LOG(LOG_WARN, "send allocmem err");
885 return ERR_BUF_ALLOC;
886 }
887 bool bufRet = false;
888 do {
889 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
890 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
891 break;
892 }
893 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
894 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
895 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
896 break;
897 }
898 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
899 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
900 break;
901 }
902 bufRet = true;
903 } while (false);
904 if (!bufRet) {
905 delete[] finayBuf;
906 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
907 return ERR_BUF_COPY;
908 }
909 if (CMD_KERNEL_ECHO == commandFlag) {
910 return SendByProtocol(hSession, finayBuf, finalBufSize, true);
911 } else {
912 return SendByProtocol(hSession, finayBuf, finalBufSize);
913 }
914 }
915
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)916 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
917 {
918 StartTraceScope("HdcSessionBase::DecryptPayload");
919 PayloadProtect protectBuf = {};
920 uint16_t headSize = ntohs(payloadHeadBe->headSize);
921 int dataSize = ntohl(payloadHeadBe->dataSize);
922 string encString(reinterpret_cast<char *>(encBuf), headSize);
923 SerialStruct::ParseFromString(protectBuf, encString);
924 if (protectBuf.vCode != payloadProtectStaticVcode) {
925 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
926 return ERR_BUF_CHECK;
927 }
928 uint8_t *data = encBuf + headSize;
929 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
930 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
931 return ERR_BUF_CHECK;
932 }
933 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
934 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
935 protectBuf.channelId, protectBuf.commandFlag);
936 return ERR_GENERIC;
937 }
938 return RET_SUCCESS;
939 }
940
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)941 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
942 {
943 int ret = ERR_GENERIC;
944 StartTraceScope("HdcSessionBase::OnRead");
945 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
946 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
947 return ERR_BUF_CHECK;
948 }
949 struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
950 // to prevent integer overflow caused by the add operation of two input num
951 uint64_t payloadHeadSize = static_cast<uint64_t>(ntohl(payloadHead->dataSize)) +
952 static_cast<uint64_t>(ntohs(payloadHead->headSize));
953 int packetHeadSize = sizeof(struct PayloadHead);
954 if (payloadHeadSize == 0 || payloadHeadSize > static_cast<uint64_t>(HDC_BUF_MAX_BYTES)) {
955 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
956 return ERR_BUF_CHECK;
957 }
958
959 // 0 < payloadHeadSize < HDC_BUF_MAX_BYTES
960 int tobeReadLen = static_cast<int>(payloadHeadSize);
961 if (bufLen - packetHeadSize < tobeReadLen) {
962 return 0;
963 }
964 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
965 WRITE_LOG(LOG_WARN, "decrypt plhead error");
966 return ERR_BUF_CHECK;
967 }
968 ret = packetHeadSize + tobeReadLen;
969 return ret;
970 }
971
972 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)973 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
974 {
975 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
976 int indexBuf = 0;
977 int childRet = 0;
978 StartTraceScope("HdcSessionBase::FetchIOBuf");
979 if (read < 0) {
980 constexpr int bufSize = 1024;
981 char buf[bufSize] = { 0 };
982 uv_strerror_r(read, buf, bufSize);
983 WRITE_LOG(LOG_FATAL, "FetchIOBuf read io failed,%s", buf);
984 return ERR_IO_FAIL;
985 }
986 hSession->stat.dataRecvBytes += read;
987 hSession->availTailIndex += read;
988 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
989 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
990 if (childRet > 0) {
991 hSession->availTailIndex -= childRet;
992 indexBuf += childRet;
993 } else if (childRet == 0) {
994 // Not enough a IO
995 break;
996 } else { // <0
997 WRITE_LOG(LOG_FATAL, "FetchIOBuf error childRet:%d sessionId:%u", childRet, hSession->sessionId);
998 hSession->availTailIndex = 0; // Preventing malicious data packages
999 indexBuf = ERR_BUF_SIZE;
1000 break;
1001 }
1002 // It may be multi-time IO to merge in a BUF, need to loop processing
1003 }
1004 if (indexBuf > 0 && hSession->availTailIndex > 0) {
1005 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
1006 != EOK) {
1007 return ERR_BUF_COPY;
1008 };
1009 uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
1010 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
1011 }
1012 return indexBuf;
1013 }
1014
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)1015 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
1016 {
1017 HSession context = (HSession)handle->data;
1018 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
1019 buf->base = (char *)context->ioBuf + context->availTailIndex;
1020 int size = context->bufSize - context->availTailIndex;
1021 buf->len = std::min(size, static_cast<int>(sizeWanted));
1022 }
1023
FinishWriteSessionTCP(uv_write_t * req,int status)1024 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
1025 {
1026 HSession hSession = (HSession)req->handle->data;
1027 --hSession->ref;
1028 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1029 if (status < 0) {
1030 WRITE_LOG(LOG_WARN, "FinishWriteSessionTCP status:%d sessionId:%u isDead:%d ref:%u",
1031 status, hSession->sessionId, hSession->isDead, uint32_t(hSession->ref));
1032 Base::TryCloseHandle((uv_handle_t *)req->handle);
1033 if (!hSession->isDead && !hSession->ref) {
1034 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%u", hSession->sessionId);
1035 thisClass->FreeSession(hSession->sessionId);
1036 }
1037 }
1038 delete[]((uint8_t *)req->data);
1039 delete req;
1040 }
1041
DispatchSessionThreadCommand(HSession hSession,const uint8_t * baseBuf,const int bytesIO)1042 bool HdcSessionBase::DispatchSessionThreadCommand(HSession hSession, const uint8_t *baseBuf,
1043 const int bytesIO)
1044 {
1045 bool ret = true;
1046 uint8_t flag = *const_cast<uint8_t *>(baseBuf);
1047
1048 switch (flag) {
1049 case SP_JDWP_NEWFD:
1050 case SP_ARK_NEWFD: {
1051 JdwpNewFileDescriptor(baseBuf, bytesIO);
1052 break;
1053 }
1054 default:
1055 WRITE_LOG(LOG_WARN, "Not support session command");
1056 break;
1057 }
1058 return ret;
1059 }
1060
ReadCtrlFromSession(uv_poll_t * poll,int status,int events)1061 void HdcSessionBase::ReadCtrlFromSession(uv_poll_t *poll, int status, int events)
1062 {
1063 HSession hSession = (HSession)poll->data;
1064 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1065 const int size = Base::GetMaxBufSizeStable();
1066 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1067 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_MAIN], buf, size);
1068 while (true) {
1069 if (nread < 0) {
1070 constexpr int bufSize = 1024;
1071 char buffer[bufSize] = { 0 };
1072 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1073 WRITE_LOG(LOG_WARN, "ReadCtrlFromSession failed,%s", buffer);
1074 uv_poll_stop(poll);
1075 break;
1076 }
1077 if (nread == 0) {
1078 WRITE_LOG(LOG_FATAL, "ReadCtrlFromSession read data zero byte");
1079 break;
1080 }
1081 // only one command, no need to split command from stream
1082 // if add more commands, consider the format command
1083 hSessionBase->DispatchSessionThreadCommand(hSession, reinterpret_cast<uint8_t *>(buf), nread);
1084 break;
1085 }
1086 delete[] buf;
1087 }
1088
WorkThreadInitSession(HSession hSession,SessionHandShake & handshake)1089 void HdcSessionBase::WorkThreadInitSession(HSession hSession, SessionHandShake &handshake)
1090 {
1091 handshake.banner = HANDSHAKE_MESSAGE;
1092 handshake.sessionId = hSession->sessionId;
1093 handshake.connectKey = hSession->connectKey;
1094 if (!hSession->isCheck) {
1095 handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1096 WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1097 }
1098 handshake.authType = AUTH_NONE;
1099 // told daemon, we support RSA_3072_SHA512 auth
1100 Base::TlvAppend(handshake.buf, TAG_AUTH_TYPE, std::to_string(AuthVerifyType::RSA_3072_SHA512));
1101 }
1102
WorkThreadStartSession(HSession hSession)1103 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1104 {
1105 bool regOK = false;
1106 int childRet = 0;
1107 if (hSession->connType == CONN_TCP) {
1108 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1109 hSession->hChildWorkTCP.data = hSession;
1110 if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1111 WRITE_LOG(LOG_WARN, "HdcSessionBase SessionCtrl failed 1");
1112 return false;
1113 }
1114 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1115 constexpr int bufSize = 1024;
1116 char buf[bufSize] = { 0 };
1117 uv_strerror_r(childRet, buf, bufSize);
1118 WRITE_LOG(LOG_WARN, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1119 return false;
1120 }
1121 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1122 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1123 regOK = true;
1124 #ifdef HDC_SUPPORT_UART
1125 } else if (hSession->connType == CONN_SERIAL) { // UART
1126 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1127 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1128 regOK = pUARTBase->ReadyForWorkThread(hSession);
1129 #endif
1130 } else { // USB
1131 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1132 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1133 regOK = pUSBBase->ReadyForWorkThread(hSession);
1134 }
1135
1136 if (regOK && hSession->serverOrDaemon) {
1137 // session handshake step1
1138 SessionHandShake handshake = {};
1139 WorkThreadInitSession(hSession, handshake);
1140 string hs = SerialStruct::SerializeToString(handshake);
1141 #ifdef HDC_SUPPORT_UART
1142 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1143 hSession->sessionId, handshake.authType, hs.c_str());
1144 #endif
1145 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1146 reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1147 }
1148 return regOK;
1149 }
1150
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1151 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1152 int dataSize)
1153 {
1154 vector<uint8_t> ret;
1155 while (true) {
1156 if (dataSize > BUF_SIZE_MICRO) {
1157 WRITE_LOG(LOG_WARN, "BuildCtrlString dataSize:%d", dataSize);
1158 break;
1159 }
1160 CtrlStruct ctrl = {};
1161 ctrl.command = command;
1162 ctrl.channelId = channelId;
1163 ctrl.dataSize = dataSize;
1164 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1165 break;
1166 }
1167 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1168 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1169 break;
1170 }
1171 return ret;
1172 }
1173
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1174 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1175 {
1176 bool ret = true;
1177 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1178 switch (ctrl->command) {
1179 case SP_START_SESSION: {
1180 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1181 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1182 ret = WorkThreadStartSession(hSession);
1183 break;
1184 }
1185 case SP_STOP_SESSION: {
1186 WRITE_LOG(LOG_WARN, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1187 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1188 HSession hSession = (HSession)handle->data;
1189 Base::TryCloseHandle((uv_handle_t *)handle);
1190 if (handle == (uv_handle_t *)hSession->pollHandle[STREAM_WORK]) {
1191 free(hSession->pollHandle[STREAM_WORK]);
1192 }
1193 if (--hSession->uvChildRef == 0) {
1194 uv_stop(&hSession->childLoop);
1195 };
1196 };
1197 constexpr int uvChildRefOffset = 2;
1198 hSession->uvChildRef += uvChildRefOffset;
1199 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1200 ++hSession->uvChildRef;
1201 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1202 }
1203 Base::TryCloseHandle((uv_handle_t *)hSession->pollHandle[STREAM_WORK], true,
1204 closeSessionChildThreadTCPHandle);
1205 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1206 closeSessionChildThreadTCPHandle);
1207 break;
1208 }
1209 case SP_ATTACH_CHANNEL: {
1210 if (!serverOrDaemon) {
1211 break; // Only Server has this feature
1212 }
1213 AttachChannel(hSession, channelId);
1214 break;
1215 }
1216 case SP_DEATCH_CHANNEL: {
1217 if (!serverOrDaemon) {
1218 break; // Only Server has this feature
1219 }
1220 DeatchChannel(hSession, channelId);
1221 break;
1222 }
1223 default:
1224 WRITE_LOG(LOG_WARN, "Not support main command");
1225 ret = false;
1226 break;
1227 }
1228 return ret;
1229 }
1230
1231 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_poll_t * poll,int status,int events)1232 void HdcSessionBase::ReadCtrlFromMain(uv_poll_t *poll, int status, int events)
1233 {
1234 HSession hSession = (HSession)poll->data;
1235 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1236 int formatCommandSize = sizeof(CtrlStruct);
1237 int index = 0;
1238 const int size = Base::GetMaxBufSizeStable();
1239 char *buf = reinterpret_cast<char *>(new uint8_t[size]());
1240 ssize_t nread = Base::ReadFromFd(hSession->ctrlFd[STREAM_WORK], buf, size);
1241 while (true) {
1242 if (nread < 0) {
1243 constexpr int bufSize = 1024;
1244 char buffer[bufSize] = { 0 };
1245 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1246 WRITE_LOG(LOG_WARN, "SessionCtrl failed,%s", buffer);
1247 break;
1248 }
1249 if (nread % formatCommandSize != 0) {
1250 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1251 break;
1252 }
1253 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf + index);
1254 if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1255 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1256 hSession->sessionId, ctrl->channelId, ctrl->command);
1257 break;
1258 }
1259 index += sizeof(CtrlStruct);
1260 if (index >= nread) {
1261 break;
1262 }
1263 }
1264 delete[] buf;
1265 }
1266
ReChildLoopForSessionClear(HSession hSession)1267 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1268 {
1269 // Restart loop close task
1270 ClearOwnTasks(hSession, 0);
1271 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1272 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1273 HSession hSession = (HSession)handle->data;
1274 for (auto v : *hSession->mapTask) {
1275 HTaskInfo hTask = (HTaskInfo)v.second;
1276 uint8_t level;
1277 if (hTask->closeRetryCount < GLOBAL_TIMEOUT / 2) {
1278 level = LOG_DEBUG;
1279 } else {
1280 level = LOG_WARN;
1281 }
1282 WRITE_LOG(level, "wait task free retry %d/%d, channelId:%u, sessionId:%u",
1283 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
1284 if (hTask->closeRetryCount++ >= GLOBAL_TIMEOUT) {
1285 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
1286 hSession = thisClass->AdminSession(OP_QUERY, hTask->sessionId, nullptr);
1287 thisClass->AdminTask(OP_VOTE_RESET, hSession, hTask->channelId, nullptr);
1288 }
1289 if (!hTask->taskFree)
1290 return;
1291 }
1292 // all task has been free
1293 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1294 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1295 };
1296 Base::TimerUvTask(
1297 &hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1298 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1299 // clear
1300 Base::TryCloseChildLoop(&hSession->childLoop, "Session childUV");
1301 }
1302
SessionWorkThread(uv_work_t * arg)1303 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1304 {
1305 HSession hSession = (HSession)arg->data;
1306 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1307 hSession->hWorkChildThread = uv_thread_self();
1308
1309 uv_poll_t *pollHandle = hSession->pollHandle[STREAM_WORK];
1310 pollHandle->data = hSession;
1311 uv_poll_init_socket(&hSession->childLoop, pollHandle, hSession->ctrlFd[STREAM_WORK]);
1312 uv_poll_start(pollHandle, UV_READABLE, ReadCtrlFromMain);
1313 WRITE_LOG(LOG_DEBUG, "!!!Workthread run begin, sessionId:%u instance:%s", hSession->sessionId,
1314 thisClass->serverOrDaemon ? "server" : "daemon");
1315 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1316 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1317 // main loop has exit
1318 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1319 hSession->childCleared = true;
1320 WRITE_LOG(LOG_WARN, "!!!Workthread run finish, sessionId:%u", hSession->sessionId);
1321 }
1322
1323 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1324 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1325 MessageLevel level, const char *msg, ...)
1326 // clang-format on
1327 {
1328 va_list vaArgs;
1329 va_start(vaArgs, msg);
1330 string log = Base::StringFormat(msg, vaArgs);
1331 va_end(vaArgs);
1332 vector<uint8_t> buf;
1333 buf.push_back(level);
1334 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1335 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1336 }
1337
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1338 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1339 {
1340 // referer from HdcServerForClient::DoCommandRemote
1341 bool ret = false;
1342 bool taskMasterInit = false;
1343 masterTask = false;
1344 switch (command) {
1345 case CMD_FILE_INIT:
1346 case CMD_FLASHD_FLASH_INIT:
1347 case CMD_FLASHD_UPDATE_INIT:
1348 case CMD_FLASHD_ERASE:
1349 case CMD_FLASHD_FORMAT:
1350 case CMD_FORWARD_INIT:
1351 case CMD_APP_INIT:
1352 case CMD_APP_UNINSTALL:
1353 case CMD_APP_SIDELOAD:
1354 taskMasterInit = true;
1355 break;
1356 default:
1357 break;
1358 }
1359 if (!serverOrDaemon
1360 && (command == CMD_SHELL_INIT || (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1361 // daemon's single side command
1362 ret = true;
1363 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1364 // slave tasks
1365 ret = true;
1366 } else if (command == CMD_UNITY_BUGREPORT_INIT) {
1367 ret = true;
1368 } else if (taskMasterInit) {
1369 // task init command
1370 masterTask = true;
1371 ret = true;
1372 }
1373 return ret;
1374 }
1375 // Heavy and time-consuming work was putted in the new thread to do, and does
1376 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1377 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1378 uint8_t *payload, int payloadSize)
1379 {
1380 StartTraceScope("HdcSessionBase::DispatchTaskData");
1381 bool ret = true;
1382 HTaskInfo hTaskInfo = nullptr;
1383 bool masterTask = false;
1384 while (true) {
1385 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1386 if (NeedNewTaskInfo(command, masterTask)) {
1387 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1388 hTaskInfo = new(std::nothrow) TaskInformation();
1389 if (hTaskInfo == nullptr) {
1390 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1391 break;
1392 }
1393 hTaskInfo->channelId = channelId;
1394 hTaskInfo->sessionId = hSession->sessionId;
1395 hTaskInfo->runLoop = &hSession->childLoop;
1396 hTaskInfo->serverOrDaemon = serverOrDaemon;
1397 hTaskInfo->masterSlave = masterTask;
1398 hTaskInfo->closeRetryCount = 0;
1399 hTaskInfo->channelTask = false;
1400 hTaskInfo->isCleared = false;
1401
1402 int addTaskRetry = 3; // try 3 time
1403 while (addTaskRetry > 0) {
1404 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1405 break;
1406 }
1407 sleep(1);
1408 --addTaskRetry;
1409 }
1410
1411 if (addTaskRetry == 0) {
1412 #ifndef HDC_HOST
1413 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId,
1414 MSG_FAIL, "hdc thread pool busy, may cause reset later");
1415 #endif
1416 delete hTaskInfo;
1417 hTaskInfo = nullptr;
1418 ret = false;
1419 break;
1420 }
1421 } else {
1422 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1423 }
1424 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1425 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1426 break;
1427 }
1428 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1429 break;
1430 }
1431 return ret;
1432 }
1433
PostStopInstanceMessage(bool restart)1434 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1435 {
1436 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1437 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1438 wantRestart = restart;
1439 }
1440 } // namespace Hdc
1441