1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "session.h"
16 #ifndef TEST_HASH
17 #include "hdc_hash_gen.h"
18 #endif
19 #include "serial_struct.h"
20
21 namespace Hdc {
HdcSessionBase(bool serverOrDaemonIn,size_t uvThreadSize)22 HdcSessionBase::HdcSessionBase(bool serverOrDaemonIn, size_t uvThreadSize)
23 {
24 // print version pid
25 WRITE_LOG(LOG_INFO, "Program running. %s Pid:%u", Base::GetVersion().c_str(), getpid());
26 // server/daemon common initialization code
27 if (uvThreadSize < SIZE_THREAD_POOL_MIN) {
28 uvThreadSize = SIZE_THREAD_POOL_MIN;
29 } else if (uvThreadSize > SIZE_THREAD_POOL_MAX) {
30 uvThreadSize = SIZE_THREAD_POOL_MAX;
31 }
32 threadPoolCount = uvThreadSize;
33 WRITE_LOG(LOG_INFO, "set UV_THREADPOOL_SIZE:%zu", threadPoolCount);
34 string uvThreadEnv("UV_THREADPOOL_SIZE");
35 string uvThreadVal = std::to_string(threadPoolCount);
36 #ifdef _WIN32
37 uvThreadEnv += "=";
38 uvThreadEnv += uvThreadVal;
39 _putenv(uvThreadEnv.c_str());
40 #else
41 setenv(uvThreadEnv.c_str(), uvThreadVal.c_str(), 1);
42 #endif
43 uv_loop_init(&loopMain);
44 WRITE_LOG(LOG_DEBUG, "loopMain init");
45 uv_rwlock_init(&mainAsync);
46 uv_async_init(&loopMain, &asyncMainLoop, MainAsyncCallback);
47 uv_rwlock_init(&lockMapSession);
48 serverOrDaemon = serverOrDaemonIn;
49 ctxUSB = nullptr;
50 wantRestart = false;
51 threadSessionMain = uv_thread_self();
52
53 #ifdef HDC_HOST
54 if (serverOrDaemon) {
55 if (libusb_init((libusb_context **)&ctxUSB) != 0) {
56 ctxUSB = nullptr;
57 }
58 }
59 #endif
60 }
61
~HdcSessionBase()62 HdcSessionBase::~HdcSessionBase()
63 {
64 Base::TryCloseHandle((uv_handle_t *)&asyncMainLoop);
65 uv_loop_close(&loopMain);
66 // clear base
67 uv_rwlock_destroy(&mainAsync);
68 uv_rwlock_destroy(&lockMapSession);
69 #ifdef HDC_HOST
70 if (serverOrDaemon and ctxUSB != nullptr) {
71 libusb_exit((libusb_context *)ctxUSB);
72 }
73 #endif
74 WRITE_LOG(LOG_DEBUG, "~HdcSessionBase free sessionRef:%u instance:%s", uint32_t(sessionRef),
75 serverOrDaemon ? "server" : "daemon");
76 }
77
78 // remove step2
TryRemoveTask(HTaskInfo hTask)79 bool HdcSessionBase::TryRemoveTask(HTaskInfo hTask)
80 {
81 if (hTask->taskFree) {
82 return true;
83 }
84 bool ret = RemoveInstanceTask(OP_REMOVE, hTask);
85 if (ret) {
86 hTask->taskFree = true;
87 } else {
88 // This is used to check that the memory cannot be cleaned up. If the memory cannot be released, break point
89 // here to see which task has not been released
90 // print task clear
91 }
92 return ret;
93 }
94
95 // remove step1
BeginRemoveTask(HTaskInfo hTask)96 void HdcSessionBase::BeginRemoveTask(HTaskInfo hTask)
97 {
98 if (hTask->taskStop || hTask->taskFree) {
99 return;
100 }
101
102 WRITE_LOG(LOG_DEBUG, "BeginRemoveTask taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
103 bool ret = RemoveInstanceTask(OP_CLEAR, hTask);
104 if (!ret) {
105 WRITE_LOG(LOG_INFO, "RemoveInstanceTask false taskType:%d channelId:%u", hTask->taskType, hTask->channelId);
106 }
107 auto taskClassDeleteRetry = [](uv_timer_t *handle) -> void {
108 HTaskInfo hTask = (HTaskInfo)handle->data;
109 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
110 constexpr uint32_t count = 1000;
111 if (hTask->closeRetryCount == 0 || hTask->closeRetryCount > count) {
112 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove retry count %d/%d, taskType:%d channelId:%u, sessionId:%u",
113 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->taskType, hTask->channelId, hTask->sessionId);
114 hTask->closeRetryCount = 1;
115 }
116 hTask->closeRetryCount++;
117 if (!thisClass->TryRemoveTask(hTask)) {
118 return;
119 }
120 WRITE_LOG(LOG_DEBUG, "TaskDelay task remove finish, channelId:%u", hTask->channelId);
121 if (hTask != nullptr) {
122 delete hTask;
123 hTask = nullptr;
124 }
125 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
126 };
127 Base::TimerUvTask(hTask->runLoop, hTask, taskClassDeleteRetry, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
128
129 hTask->taskStop = true;
130 }
131
132 // Clear all Task or a single Task, the regular situation is stopped first, and the specific class memory is cleaned up
133 // after the end of the LOOP.
134 // When ChannelIdinput == 0, at this time, all of the LOOP ends, all runs in the class end, so directly skip STOP,
135 // physical memory deletion class trimming
ClearOwnTasks(HSession hSession,const uint32_t channelIDInput)136 void HdcSessionBase::ClearOwnTasks(HSession hSession, const uint32_t channelIDInput)
137 {
138 // First case: normal task cleanup process (STOP Remove)
139 // Second: The task is cleaned up, the session ends
140 // Third: The task is cleaned up, and the session is directly over the session.
141 hSession->mapTaskMutex.lock();
142 map<uint32_t, HTaskInfo>::iterator iter;
143 for (iter = hSession->mapTask->begin(); iter != hSession->mapTask->end();) {
144 uint32_t channelId = iter->first;
145 HTaskInfo hTask = iter->second;
146 if (channelIDInput != 0) { // single
147 if (channelIDInput != channelId) {
148 ++iter;
149 continue;
150 }
151 BeginRemoveTask(hTask);
152 WRITE_LOG(LOG_DEBUG, "ClearOwnTasks OP_CLEAR finish, session:%p channelIDInput:%u", hSession,
153 channelIDInput);
154 iter = hSession->mapTask->erase(iter);
155 break;
156 }
157 // multi
158 BeginRemoveTask(hTask);
159 iter = hSession->mapTask->erase(iter);
160 }
161 hSession->mapTaskMutex.unlock();
162 }
163
ClearSessions()164 void HdcSessionBase::ClearSessions()
165 {
166 // no need to lock mapSession
167 // broadcast free signal
168 for (auto v : mapSession) {
169 HSession hSession = (HSession)v.second;
170 if (!hSession->isDead) {
171 FreeSession(hSession->sessionId);
172 }
173 }
174 }
175
ReMainLoopForInstanceClear()176 void HdcSessionBase::ReMainLoopForInstanceClear()
177 { // reloop
178 auto clearSessionsForFinish = [](uv_idle_t *handle) -> void {
179 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
180 if (thisClass->sessionRef > 0) {
181 return;
182 }
183 // all task has been free
184 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
185 uv_stop(&thisClass->loopMain);
186 };
187 Base::IdleUvTask(&loopMain, this, clearSessionsForFinish);
188 uv_run(&loopMain, UV_RUN_DEFAULT);
189 };
190
191 #ifdef HDC_SUPPORT_UART
EnumUARTDeviceRegister(UartKickoutZombie kickOut)192 void HdcSessionBase::EnumUARTDeviceRegister(UartKickoutZombie kickOut)
193 {
194 uv_rwlock_rdlock(&lockMapSession);
195 map<uint32_t, HSession>::iterator i;
196 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
197 HSession hs = i->second;
198 if ((hs->connType != CONN_SERIAL) or (hs->hUART == nullptr)) {
199 continue;
200 }
201 kickOut(hs);
202 break;
203 }
204 uv_rwlock_rdunlock(&lockMapSession);
205 }
206 #endif
207
EnumUSBDeviceRegister(void (* pCallBack)(HSession hSession))208 void HdcSessionBase::EnumUSBDeviceRegister(void (*pCallBack)(HSession hSession))
209 {
210 if (!pCallBack) {
211 return;
212 }
213 uv_rwlock_rdlock(&lockMapSession);
214 map<uint32_t, HSession>::iterator i;
215 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
216 HSession hs = i->second;
217 if (hs->connType != CONN_USB) {
218 continue;
219 }
220 if (hs->hUSB == nullptr) {
221 continue;
222 }
223 if (pCallBack) {
224 pCallBack(hs);
225 }
226 break;
227 }
228 uv_rwlock_rdunlock(&lockMapSession);
229 }
230
231 // The PC side gives the device information, determines if the USB device is registered
232 // PDEV and Busid Devid two choices
QueryUSBDeviceRegister(void * pDev,uint8_t busIDIn,uint8_t devIDIn)233 HSession HdcSessionBase::QueryUSBDeviceRegister(void *pDev, uint8_t busIDIn, uint8_t devIDIn)
234 {
235 #ifdef HDC_HOST
236 libusb_device *dev = (libusb_device *)pDev;
237 HSession hResult = nullptr;
238 if (!mapSession.size()) {
239 return nullptr;
240 }
241 uint8_t busId = 0;
242 uint8_t devId = 0;
243 if (pDev) {
244 busId = libusb_get_bus_number(dev);
245 devId = libusb_get_device_address(dev);
246 } else {
247 busId = busIDIn;
248 devId = devIDIn;
249 }
250 uv_rwlock_rdlock(&lockMapSession);
251 map<uint32_t, HSession>::iterator i;
252 for (i = mapSession.begin(); i != mapSession.end(); ++i) {
253 HSession hs = i->second;
254 if (hs->connType == CONN_USB) {
255 continue;
256 }
257 if (hs->hUSB == nullptr) {
258 continue;
259 }
260 if (hs->hUSB->devId != devId || hs->hUSB->busId != busId) {
261 continue;
262 }
263 hResult = hs;
264 break;
265 }
266 uv_rwlock_rdunlock(&lockMapSession);
267 return hResult;
268 #else
269 return nullptr;
270 #endif
271 }
272
AsyncMainLoopTask(uv_idle_t * handle)273 void HdcSessionBase::AsyncMainLoopTask(uv_idle_t *handle)
274 {
275 AsyncParam *param = (AsyncParam *)handle->data;
276 HdcSessionBase *thisClass = (HdcSessionBase *)param->thisClass;
277 switch (param->method) {
278 case ASYNC_FREE_SESSION:
279 // Destruction is unified in the main thread
280 thisClass->FreeSession(param->sid);
281 break;
282 case ASYNC_STOP_MAINLOOP:
283 uv_stop(&thisClass->loopMain);
284 break;
285 default:
286 break;
287 }
288 if (param->data) {
289 delete[]((uint8_t *)param->data);
290 }
291 delete param;
292 param = nullptr;
293 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseIdleCallback);
294 }
295
MainAsyncCallback(uv_async_t * handle)296 void HdcSessionBase::MainAsyncCallback(uv_async_t *handle)
297 {
298 HdcSessionBase *thisClass = (HdcSessionBase *)handle->data;
299 list<void *>::iterator i;
300 list<void *> &lst = thisClass->lstMainThreadOP;
301 uv_rwlock_wrlock(&thisClass->mainAsync);
302 for (i = lst.begin(); i != lst.end();) {
303 AsyncParam *param = (AsyncParam *)*i;
304 Base::IdleUvTask(&thisClass->loopMain, param, AsyncMainLoopTask);
305 i = lst.erase(i);
306 }
307 uv_rwlock_wrunlock(&thisClass->mainAsync);
308 }
309
PushAsyncMessage(const uint32_t sessionId,const uint8_t method,const void * data,const int dataSize)310 void HdcSessionBase::PushAsyncMessage(const uint32_t sessionId, const uint8_t method, const void *data,
311 const int dataSize)
312 {
313 AsyncParam *param = new AsyncParam();
314 if (!param) {
315 return;
316 }
317 param->sid = sessionId;
318 param->thisClass = this;
319 param->method = method;
320 if (dataSize > 0) {
321 param->dataSize = dataSize;
322 param->data = new uint8_t[param->dataSize]();
323 if (!param->data) {
324 delete param;
325 return;
326 }
327 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
328 delete[]((uint8_t *)param->data);
329 delete param;
330 return;
331 }
332 }
333
334 asyncMainLoop.data = this;
335 uv_rwlock_wrlock(&mainAsync);
336 lstMainThreadOP.push_back(param);
337 uv_rwlock_wrunlock(&mainAsync);
338 uv_async_send(&asyncMainLoop);
339 }
340
WorkerPendding()341 void HdcSessionBase::WorkerPendding()
342 {
343 uv_run(&loopMain, UV_RUN_DEFAULT);
344 ClearInstanceResource();
345 }
346
MallocSessionByConnectType(HSession hSession)347 int HdcSessionBase::MallocSessionByConnectType(HSession hSession)
348 {
349 int ret = 0;
350 switch (hSession->connType) {
351 case CONN_TCP: {
352 uv_tcp_init(&loopMain, &hSession->hWorkTCP);
353 ++hSession->uvHandleRef;
354 hSession->hWorkTCP.data = hSession;
355 break;
356 }
357 case CONN_USB: {
358 // Some members need to be placed at the primary thread
359 HUSB hUSB = new HdcUSB();
360 if (!hUSB) {
361 ret = -1;
362 break;
363 }
364 hSession->hUSB = hUSB;
365 hSession->hUSB->wMaxPacketSizeSend = MAX_PACKET_SIZE_HISPEED;
366 break;
367 }
368 #ifdef HDC_SUPPORT_UART
369 case CONN_SERIAL: {
370 HUART hUART = new HdcUART();
371 if (!hUART) {
372 ret = -1;
373 break;
374 }
375 hSession->hUART = hUART;
376 break;
377 }
378 #endif // HDC_SUPPORT_UART
379 default:
380 ret = -1;
381 break;
382 }
383 return ret;
384 }
385
386 // Avoid unit test when client\server\daemon on the same host, maybe get the same ID value
GetSessionPseudoUid()387 uint32_t HdcSessionBase::GetSessionPseudoUid()
388 {
389 uint32_t uid = 0;
390 do {
391 uid = static_cast<uint32_t>(Base::GetRandom());
392 } while (AdminSession(OP_QUERY, uid, nullptr) != nullptr);
393 return uid;
394 }
395
396 // when client 0 to automatic generated,when daemon First place 1 followed by
MallocSession(bool serverOrDaemon,const ConnType connType,void * classModule,uint32_t sessionId)397 HSession HdcSessionBase::MallocSession(bool serverOrDaemon, const ConnType connType, void *classModule,
398 uint32_t sessionId)
399 {
400 HSession hSession = new(std::nothrow) HdcSession();
401 if (!hSession) {
402 WRITE_LOG(LOG_FATAL, "MallocSession new hSession failed");
403 return nullptr;
404 }
405 int ret = 0;
406 ++sessionRef;
407 (void)memset_s(hSession->ctrlFd, sizeof(hSession->ctrlFd), 0, sizeof(hSession->ctrlFd));
408 hSession->classInstance = this;
409 hSession->connType = connType;
410 hSession->classModule = classModule;
411 hSession->isDead = false;
412 hSession->sessionId = ((sessionId == 0) ? GetSessionPseudoUid() : sessionId);
413 hSession->serverOrDaemon = serverOrDaemon;
414 hSession->hWorkThread = uv_thread_self();
415 hSession->mapTask = new(std::nothrow) map<uint32_t, HTaskInfo>();
416 if (hSession->mapTask == nullptr) {
417 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->mapTask failed");
418 delete hSession;
419 hSession = nullptr;
420 return nullptr;
421 }
422 hSession->listKey = new(std::nothrow) list<void *>;
423 if (hSession->listKey == nullptr) {
424 WRITE_LOG(LOG_FATAL, "MallocSession new hSession->listKey failed");
425 delete hSession;
426 hSession = nullptr;
427 return nullptr;
428 }
429 uv_loop_init(&hSession->childLoop);
430 hSession->uvHandleRef = 0;
431 // pullup child
432 WRITE_LOG(LOG_DEBUG, "HdcSessionBase NewSession, sessionId:%u, connType:%d.",
433 hSession->sessionId, hSession->connType);
434 uv_tcp_init(&loopMain, &hSession->ctrlPipe[STREAM_MAIN]);
435 (void)memset_s(&hSession->ctrlPipe[STREAM_WORK], sizeof(hSession->ctrlPipe[STREAM_WORK]),
436 0, sizeof(uv_tcp_t));
437 ++hSession->uvHandleRef;
438 Base::CreateSocketPair(hSession->ctrlFd);
439 uv_tcp_open(&hSession->ctrlPipe[STREAM_MAIN], hSession->ctrlFd[STREAM_MAIN]);
440 uv_read_start((uv_stream_t *)&hSession->ctrlPipe[STREAM_MAIN], Base::AllocBufferCallback, ReadCtrlFromSession);
441 hSession->ctrlPipe[STREAM_MAIN].data = hSession;
442 hSession->ctrlPipe[STREAM_WORK].data = hSession;
443 // Activate USB DAEMON's data channel, may not for use
444 uv_tcp_init(&loopMain, &hSession->dataPipe[STREAM_MAIN]);
445 (void)memset_s(&hSession->dataPipe[STREAM_WORK], sizeof(hSession->dataPipe[STREAM_WORK]),
446 0, sizeof(uv_tcp_t));
447 ++hSession->uvHandleRef;
448 Base::CreateSocketPair(hSession->dataFd);
449 uv_tcp_open(&hSession->dataPipe[STREAM_MAIN], hSession->dataFd[STREAM_MAIN]);
450 hSession->dataPipe[STREAM_MAIN].data = hSession;
451 hSession->dataPipe[STREAM_WORK].data = hSession;
452 Base::SetTcpOptions(&hSession->dataPipe[STREAM_MAIN]);
453 ret = MallocSessionByConnectType(hSession);
454 if (ret) {
455 delete hSession;
456 hSession = nullptr;
457 } else {
458 AdminSession(OP_ADD, hSession->sessionId, hSession);
459 }
460 return hSession;
461 }
462
FreeSessionByConnectType(HSession hSession)463 void HdcSessionBase::FreeSessionByConnectType(HSession hSession)
464 {
465 WRITE_LOG(LOG_DEBUG, "FreeSessionByConnectType %s", hSession->ToDebugString().c_str());
466
467 if (CONN_USB == hSession->connType) {
468 // ibusb All context is applied for sub-threaded, so it needs to be destroyed in the subline
469 if (!hSession->hUSB) {
470 return;
471 }
472 HUSB hUSB = hSession->hUSB;
473 if (!hUSB) {
474 return;
475 }
476 #ifdef HDC_HOST
477 if (hUSB->devHandle) {
478 libusb_release_interface(hUSB->devHandle, hUSB->interfaceNumber);
479 libusb_close(hUSB->devHandle);
480 hUSB->devHandle = nullptr;
481 }
482 #else
483 Base::CloseFd(hUSB->bulkIn);
484 Base::CloseFd(hUSB->bulkOut);
485 #endif
486 delete hSession->hUSB;
487 hSession->hUSB = nullptr;
488 }
489 #ifdef HDC_SUPPORT_UART
490 if (CONN_SERIAL == hSession->connType) {
491 if (!hSession->hUART) {
492 return;
493 }
494 HUART hUART = hSession->hUART;
495 if (!hUART) {
496 return;
497 }
498 HdcUARTBase *uartBase = (HdcUARTBase *)hSession->classModule;
499 // tell uart session will be free
500 uartBase->StopSession(hSession);
501 #ifdef HDC_HOST
502 #ifdef HOST_MINGW
503 if (hUART->devUartHandle != INVALID_HANDLE_VALUE) {
504 CloseHandle(hUART->devUartHandle);
505 hUART->devUartHandle = INVALID_HANDLE_VALUE;
506 }
507 #elif defined(HOST_LINUX)
508 Base::CloseFd(hUART->devUartHandle);
509 #endif // _WIN32
510 #endif
511 delete hSession->hUART;
512 hSession->hUART = nullptr;
513 }
514 #endif
515 }
516
517 // work when libuv-handle at struct of HdcSession has all callback finished
FreeSessionFinally(uv_idle_t * handle)518 void HdcSessionBase::FreeSessionFinally(uv_idle_t *handle)
519 {
520 HSession hSession = (HSession)handle->data;
521 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
522 if (hSession->uvHandleRef > 0) {
523 return;
524 }
525 // Notify Server or Daemon, just UI or display commandline
526 thisClass->NotifyInstanceSessionFree(hSession, true);
527 // all hsession uv handle has been clear
528 thisClass->AdminSession(OP_REMOVE, hSession->sessionId, nullptr);
529 WRITE_LOG(LOG_DEBUG, "!!!FreeSessionFinally sessionId:%u finish", hSession->sessionId);
530 HdcAuth::FreeKey(!hSession->serverOrDaemon, hSession->listKey);
531 delete hSession;
532 hSession = nullptr; // fix CodeMars SetNullAfterFree issue
533 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
534 --thisClass->sessionRef;
535 }
536
537 // work when child-work thread finish
FreeSessionContinue(HSession hSession)538 void HdcSessionBase::FreeSessionContinue(HSession hSession)
539 {
540 auto closeSessionTCPHandle = [](uv_handle_t *handle) -> void {
541 HSession hSession = (HSession)handle->data;
542 --hSession->uvHandleRef;
543 Base::TryCloseHandle((uv_handle_t *)handle);
544 };
545 if (CONN_TCP == hSession->connType) {
546 // Turn off TCP to prevent continuing writing
547 Base::TryCloseHandle((uv_handle_t *)&hSession->hWorkTCP, true, closeSessionTCPHandle);
548 }
549 hSession->availTailIndex = 0;
550 if (hSession->ioBuf) {
551 delete[] hSession->ioBuf;
552 hSession->ioBuf = nullptr;
553 }
554 Base::TryCloseHandle((uv_handle_t *)&hSession->ctrlPipe[STREAM_MAIN], true, closeSessionTCPHandle);
555 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_MAIN], true, closeSessionTCPHandle);
556 FreeSessionByConnectType(hSession);
557 // finish
558 Base::IdleUvTask(&loopMain, hSession, FreeSessionFinally);
559 }
560
FreeSessionOpeate(uv_timer_t * handle)561 void HdcSessionBase::FreeSessionOpeate(uv_timer_t *handle)
562 {
563 HSession hSession = (HSession)handle->data;
564 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
565 if (hSession->ref > 0) {
566 return;
567 }
568 WRITE_LOG(LOG_DEBUG, "FreeSessionOpeate ref:%u", uint32_t(hSession->ref));
569 #ifdef HDC_HOST
570 if (hSession->hUSB != nullptr
571 && (!hSession->hUSB->hostBulkIn.isShutdown || !hSession->hUSB->hostBulkOut.isShutdown)) {
572 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
573 pUSB->CancelUsbIo(hSession);
574 return;
575 }
576 #endif
577 // wait workthread to free
578 if (hSession->ctrlPipe[STREAM_WORK].loop) {
579 auto ctrl = BuildCtrlString(SP_STOP_SESSION, 0, nullptr, 0);
580 Base::SendToStream((uv_stream_t *)&hSession->ctrlPipe[STREAM_MAIN], ctrl.data(), ctrl.size());
581 WRITE_LOG(LOG_DEBUG, "FreeSessionOpeate, send workthread for free. sessionId:%u", hSession->sessionId);
582 auto callbackCheckFreeSessionContinue = [](uv_timer_t *handle) -> void {
583 HSession hSession = (HSession)handle->data;
584 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
585 if (!hSession->childCleared) {
586 return;
587 }
588 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
589 thisClass->FreeSessionContinue(hSession);
590 };
591 Base::TimerUvTask(&thisClass->loopMain, hSession, callbackCheckFreeSessionContinue);
592 } else {
593 thisClass->FreeSessionContinue(hSession);
594 }
595 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
596 }
597
FreeSession(const uint32_t sessionId)598 void HdcSessionBase::FreeSession(const uint32_t sessionId)
599 {
600 if (threadSessionMain != uv_thread_self()) {
601 PushAsyncMessage(sessionId, ASYNC_FREE_SESSION, nullptr, 0);
602 return;
603 }
604 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
605 WRITE_LOG(LOG_DEBUG, "Begin to free session, sessionid:%u", sessionId);
606 do {
607 if (!hSession || hSession->isDead) {
608 break;
609 }
610 hSession->isDead = true;
611 Base::TimerUvTask(&loopMain, hSession, FreeSessionOpeate);
612 NotifyInstanceSessionFree(hSession, false);
613 WRITE_LOG(LOG_DEBUG, "FreeSession sessionId:%u ref:%u", hSession->sessionId, uint32_t(hSession->ref));
614 } while (false);
615 }
616
AdminSession(const uint8_t op,const uint32_t sessionId,HSession hInput)617 HSession HdcSessionBase::AdminSession(const uint8_t op, const uint32_t sessionId, HSession hInput)
618 {
619 HSession hRet = nullptr;
620 switch (op) {
621 case OP_ADD:
622 uv_rwlock_wrlock(&lockMapSession);
623 mapSession[sessionId] = hInput;
624 uv_rwlock_wrunlock(&lockMapSession);
625 break;
626 case OP_REMOVE:
627 uv_rwlock_wrlock(&lockMapSession);
628 mapSession.erase(sessionId);
629 uv_rwlock_wrunlock(&lockMapSession);
630 break;
631 case OP_QUERY:
632 uv_rwlock_rdlock(&lockMapSession);
633 if (mapSession.count(sessionId)) {
634 hRet = mapSession[sessionId];
635 }
636 uv_rwlock_rdunlock(&lockMapSession);
637 break;
638 case OP_QUERY_REF:
639 uv_rwlock_wrlock(&lockMapSession);
640 if (mapSession.count(sessionId)) {
641 hRet = mapSession[sessionId];
642 ++hRet->ref;
643 }
644 uv_rwlock_wrunlock(&lockMapSession);
645 break;
646 case OP_UPDATE:
647 uv_rwlock_wrlock(&lockMapSession);
648 // remove old
649 mapSession.erase(sessionId);
650 mapSession[hInput->sessionId] = hInput;
651 uv_rwlock_wrunlock(&lockMapSession);
652 break;
653 case OP_VOTE_RESET:
654 if (mapSession.count(sessionId) == 0) {
655 break;
656 }
657 bool needReset;
658 if (serverOrDaemon) {
659 uv_rwlock_wrlock(&lockMapSession);
660 hRet = mapSession[sessionId];
661 hRet->voteReset = true;
662 needReset = true;
663 for (auto &kv : mapSession) {
664 if (sessionId == kv.first) {
665 continue;
666 }
667 WRITE_LOG(LOG_DEBUG, "session:%u vote reset, session %u is %s",
668 sessionId, kv.first, kv.second->voteReset ? "YES" : "NO");
669 if (!kv.second->voteReset) {
670 needReset = false;
671 }
672 }
673 uv_rwlock_wrunlock(&lockMapSession);
674 } else {
675 needReset = true;
676 }
677 if (needReset) {
678 WRITE_LOG(LOG_FATAL, "!! session:%u vote reset, passed unanimously !!", sessionId);
679 abort();
680 }
681 break;
682 default:
683 break;
684 }
685 return hRet;
686 }
687
DumpTasksInfo(map<uint32_t,HTaskInfo> & mapTask)688 void HdcSessionBase::DumpTasksInfo(map<uint32_t, HTaskInfo> &mapTask)
689 {
690 int idx = 1;
691 for (auto t : mapTask) {
692 HTaskInfo ti = t.second;
693 WRITE_LOG(LOG_WARN, "%d: channelId: %lu, type: %d, closeRetry: %d\n",
694 idx++, ti->channelId, ti->taskType, ti->closeRetryCount);
695 }
696 }
697
698 // All in the corresponding sub-thread, no need locks
AdminTask(const uint8_t op,HSession hSession,const uint32_t channelId,HTaskInfo hInput)699 HTaskInfo HdcSessionBase::AdminTask(const uint8_t op, HSession hSession, const uint32_t channelId, HTaskInfo hInput)
700 {
701 HTaskInfo hRet = nullptr;
702 map<uint32_t, HTaskInfo> &mapTask = *hSession->mapTask;
703
704 switch (op) {
705 case OP_ADD:
706 #ifndef HDC_HOST
707 // uv sub-thread confiured by threadPoolCount, reserve 2 for main & communicate
708 if (mapTask.size() >= (threadPoolCount - 2)) {
709 WRITE_LOG(LOG_WARN, "mapTask.size:%d, hdc is busy", mapTask.size());
710 WRITE_LOG(LOG_WARN, ">> Session %lu mapTask Dumping Start", hSession->sessionId);
711 DumpTasksInfo(mapTask);
712 WRITE_LOG(LOG_WARN, "<< mapTask Dumping End");
713 break;
714 }
715 #endif
716 hRet = mapTask[channelId];
717 if (hRet != nullptr) {
718 delete hRet;
719 }
720 mapTask[channelId] = hInput;
721 hRet = hInput;
722
723 WRITE_LOG(LOG_DEBUG, "AdminTask add session %u, channelId %u, mapTask size: %zu",
724 hSession->sessionId, channelId, mapTask.size());
725
726 break;
727 case OP_REMOVE:
728 mapTask.erase(channelId);
729 WRITE_LOG(LOG_DEBUG, "AdminTask rm session %u, channelId %u, mapTask size: %zu",
730 hSession->sessionId, channelId, mapTask.size());
731 break;
732 case OP_QUERY:
733 if (mapTask.count(channelId)) {
734 hRet = mapTask[channelId];
735 }
736 break;
737 case OP_VOTE_RESET:
738 AdminSession(op, hSession->sessionId, nullptr);
739 break;
740 default:
741 break;
742 }
743 return hRet;
744 }
745
SendByProtocol(HSession hSession,uint8_t * bufPtr,const int bufLen,bool echo)746 int HdcSessionBase::SendByProtocol(HSession hSession, uint8_t *bufPtr, const int bufLen, bool echo )
747 {
748 if (hSession->isDead) {
749 WRITE_LOG(LOG_WARN, "SendByProtocol session dead error");
750 return ERR_SESSION_NOFOUND;
751 }
752 int ret = 0;
753 switch (hSession->connType) {
754 case CONN_TCP: {
755 if (echo && !hSession->serverOrDaemon) {
756 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hChildWorkTCP, bufPtr, bufLen,
757 nullptr, (void *)FinishWriteSessionTCP, bufPtr);
758 } else {
759 if (hSession->hWorkThread == uv_thread_self()) {
760 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hWorkTCP, bufPtr, bufLen,
761 nullptr, (void *)FinishWriteSessionTCP, bufPtr);
762 } else if (hSession->hWorkChildThread == uv_thread_self()) {
763 ret = Base::SendToStreamEx((uv_stream_t *)&hSession->hChildWorkTCP, bufPtr,
764 bufLen, nullptr, (void *)FinishWriteSessionTCP,
765 bufPtr);
766 } else {
767 WRITE_LOG(LOG_FATAL, "SendByProtocol uncontrol send");
768 ret = ERR_API_FAIL;
769 }
770 }
771 if (ret > 0) {
772 ++hSession->ref;
773 }
774 break;
775 }
776 case CONN_USB: {
777 HdcUSBBase *pUSB = ((HdcUSBBase *)hSession->classModule);
778 ret = pUSB->SendUSBBlock(hSession, bufPtr, bufLen);
779 delete[] bufPtr;
780 break;
781 }
782 #ifdef HDC_SUPPORT_UART
783 case CONN_SERIAL: {
784 HdcUARTBase *pUART = ((HdcUARTBase *)hSession->classModule);
785 ret = pUART->SendUARTData(hSession, bufPtr, bufLen);
786 delete[] bufPtr;
787 break;
788 }
789 #endif
790 default:
791 break;
792 }
793 return ret;
794 }
795
Send(const uint32_t sessionId,const uint32_t channelId,const uint16_t commandFlag,const uint8_t * data,const int dataSize)796 int HdcSessionBase::Send(const uint32_t sessionId, const uint32_t channelId, const uint16_t commandFlag,
797 const uint8_t *data, const int dataSize)
798 {
799 HSession hSession = AdminSession(OP_QUERY, sessionId, nullptr);
800 if (!hSession) {
801 WRITE_LOG(LOG_DEBUG, "Send to offline device, drop it, sessionId:%u", sessionId);
802 return ERR_SESSION_NOFOUND;
803 }
804 PayloadProtect protectBuf; // noneed convert to big-endian
805 protectBuf.channelId = channelId;
806 protectBuf.commandFlag = commandFlag;
807 protectBuf.checkSum = (ENABLE_IO_CHECKSUM && dataSize > 0) ? Base::CalcCheckSum(data, dataSize) : 0;
808 protectBuf.vCode = payloadProtectStaticVcode;
809 string s = SerialStruct::SerializeToString(protectBuf);
810 // reserve for encrypt here
811 // xx-encrypt
812
813 PayloadHead payloadHead = {}; // need convert to big-endian
814 payloadHead.flag[0] = PACKET_FLAG.at(0);
815 payloadHead.flag[1] = PACKET_FLAG.at(1);
816 payloadHead.protocolVer = VER_PROTOCOL;
817 payloadHead.headSize = htons(s.size());
818 payloadHead.dataSize = htonl(dataSize);
819 int finalBufSize = sizeof(PayloadHead) + s.size() + dataSize;
820 uint8_t *finayBuf = new(std::nothrow) uint8_t[finalBufSize]();
821 if (finayBuf == nullptr) {
822 WRITE_LOG(LOG_WARN, "send allocmem err");
823 return ERR_BUF_ALLOC;
824 }
825 bool bufRet = false;
826 do {
827 if (memcpy_s(finayBuf, sizeof(PayloadHead), reinterpret_cast<uint8_t *>(&payloadHead), sizeof(PayloadHead))) {
828 WRITE_LOG(LOG_WARN, "send copyhead err for dataSize:%d", dataSize);
829 break;
830 }
831 if (memcpy_s(finayBuf + sizeof(PayloadHead), s.size(),
832 reinterpret_cast<uint8_t *>(const_cast<char *>(s.c_str())), s.size())) {
833 WRITE_LOG(LOG_WARN, "send copyProtbuf err for dataSize:%d", dataSize);
834 break;
835 }
836 if (dataSize > 0 && memcpy_s(finayBuf + sizeof(PayloadHead) + s.size(), dataSize, data, dataSize)) {
837 WRITE_LOG(LOG_WARN, "send copyDatabuf err for dataSize:%d", dataSize);
838 break;
839 }
840 bufRet = true;
841 } while (false);
842 if (!bufRet) {
843 delete[] finayBuf;
844 WRITE_LOG(LOG_WARN, "send copywholedata err for dataSize:%d", dataSize);
845 return ERR_BUF_COPY;
846 }
847 if (CMD_KERNEL_ECHO == commandFlag) {
848 return SendByProtocol(hSession, finayBuf, finalBufSize, true);
849 } else {
850 return SendByProtocol(hSession, finayBuf, finalBufSize);
851 }
852 }
853
DecryptPayload(HSession hSession,PayloadHead * payloadHeadBe,uint8_t * encBuf)854 int HdcSessionBase::DecryptPayload(HSession hSession, PayloadHead *payloadHeadBe, uint8_t *encBuf)
855 {
856 PayloadProtect protectBuf = {};
857 uint16_t headSize = ntohs(payloadHeadBe->headSize);
858 int dataSize = ntohl(payloadHeadBe->dataSize);
859 string encString(reinterpret_cast<char *>(encBuf), headSize);
860 SerialStruct::ParseFromString(protectBuf, encString);
861 if (protectBuf.vCode != payloadProtectStaticVcode) {
862 WRITE_LOG(LOG_FATAL, "Session recv static vcode failed");
863 return ERR_BUF_CHECK;
864 }
865 uint8_t *data = encBuf + headSize;
866 if (ENABLE_IO_CHECKSUM && protectBuf.checkSum != 0 && (protectBuf.checkSum != Base::CalcCheckSum(data, dataSize))) {
867 WRITE_LOG(LOG_FATAL, "Session recv CalcCheckSum failed");
868 return ERR_BUF_CHECK;
869 }
870 if (!FetchCommand(hSession, protectBuf.channelId, protectBuf.commandFlag, data, dataSize)) {
871 WRITE_LOG(LOG_WARN, "FetchCommand failed: channelId %x commandFlag %x",
872 protectBuf.channelId, protectBuf.commandFlag);
873 return ERR_GENERIC;
874 }
875 return RET_SUCCESS;
876 }
877
OnRead(HSession hSession,uint8_t * bufPtr,const int bufLen)878 int HdcSessionBase::OnRead(HSession hSession, uint8_t *bufPtr, const int bufLen)
879 {
880 int ret = ERR_GENERIC;
881 if (memcmp(bufPtr, PACKET_FLAG.c_str(), PACKET_FLAG.size())) {
882 WRITE_LOG(LOG_FATAL, "PACKET_FLAG incorrect %x %x", bufPtr[0], bufPtr[1]);
883 return ERR_BUF_CHECK;
884 }
885 struct PayloadHead *payloadHead = reinterpret_cast<struct PayloadHead *>(bufPtr);
886 int tobeReadLen = ntohl(payloadHead->dataSize) + ntohs(payloadHead->headSize);
887 int packetHeadSize = sizeof(struct PayloadHead);
888 if (tobeReadLen <= 0 || static_cast<uint32_t>(tobeReadLen) > HDC_BUF_MAX_BYTES) {
889 WRITE_LOG(LOG_FATAL, "Packet size incorrect");
890 return ERR_BUF_CHECK;
891 }
892 if (bufLen - packetHeadSize < tobeReadLen) {
893 return 0;
894 }
895 if (DecryptPayload(hSession, payloadHead, bufPtr + packetHeadSize)) {
896 WRITE_LOG(LOG_WARN, "decrypt plhead error");
897 return ERR_BUF_CHECK;
898 }
899 ret = packetHeadSize + tobeReadLen;
900 return ret;
901 }
902
903 // Returns <0 error;> 0 receives the number of bytes; 0 untreated
FetchIOBuf(HSession hSession,uint8_t * ioBuf,int read)904 int HdcSessionBase::FetchIOBuf(HSession hSession, uint8_t *ioBuf, int read)
905 {
906 HdcSessionBase *ptrConnect = (HdcSessionBase *)hSession->classInstance;
907 int indexBuf = 0;
908 int childRet = 0;
909 if (read < 0) {
910 constexpr int bufSize = 1024;
911 char buf[bufSize] = { 0 };
912 uv_strerror_r(read, buf, bufSize);
913 WRITE_LOG(LOG_FATAL, "HdcSessionBase read io failed,%s", buf);
914 return ERR_IO_FAIL;
915 }
916 hSession->availTailIndex += read;
917 while (!hSession->isDead && hSession->availTailIndex > static_cast<int>(sizeof(PayloadHead))) {
918 childRet = ptrConnect->OnRead(hSession, ioBuf + indexBuf, hSession->availTailIndex);
919 if (childRet > 0) {
920 hSession->availTailIndex -= childRet;
921 indexBuf += childRet;
922 } else if (childRet == 0) {
923 // Not enough a IO
924 break;
925 } else { // <0
926 hSession->availTailIndex = 0; // Preventing malicious data packages
927 indexBuf = ERR_BUF_SIZE;
928 break;
929 }
930 // It may be multi-time IO to merge in a BUF, need to loop processing
931 }
932 if (indexBuf > 0 && hSession->availTailIndex > 0) {
933 if (memmove_s(hSession->ioBuf, hSession->bufSize, hSession->ioBuf + indexBuf, hSession->availTailIndex)
934 != EOK) {
935 return ERR_BUF_COPY;
936 };
937 uint8_t *bufToZero = reinterpret_cast<uint8_t *>(hSession->ioBuf + hSession->availTailIndex);
938 Base::ZeroBuf(bufToZero, hSession->bufSize - hSession->availTailIndex);
939 }
940 return indexBuf;
941 }
942
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)943 void HdcSessionBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
944 {
945 HSession context = (HSession)handle->data;
946 Base::ReallocBuf(&context->ioBuf, &context->bufSize, HDC_SOCKETPAIR_SIZE);
947 buf->base = (char *)context->ioBuf + context->availTailIndex;
948 int size = context->bufSize - context->availTailIndex;
949 buf->len = std::min(size, static_cast<int>(sizeWanted));
950 }
951
FinishWriteSessionTCP(uv_write_t * req,int status)952 void HdcSessionBase::FinishWriteSessionTCP(uv_write_t *req, int status)
953 {
954 HSession hSession = (HSession)req->handle->data;
955 --hSession->ref;
956 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
957 if (status < 0) {
958 Base::TryCloseHandle((uv_handle_t *)req->handle);
959 if (!hSession->isDead && !hSession->ref) {
960 WRITE_LOG(LOG_DEBUG, "FinishWriteSessionTCP freesession :%p", hSession);
961 thisClass->FreeSession(hSession->sessionId);
962 }
963 }
964 delete[]((uint8_t *)req->data);
965 delete req;
966 }
967
DispatchSessionThreadCommand(uv_stream_t * uvpipe,HSession hSession,const uint8_t * baseBuf,const int bytesIO)968 bool HdcSessionBase::DispatchSessionThreadCommand(uv_stream_t *uvpipe, HSession hSession, const uint8_t *baseBuf,
969 const int bytesIO)
970 {
971 bool ret = true;
972 uint8_t flag = *const_cast<uint8_t *>(baseBuf);
973
974 switch (flag) {
975 case SP_JDWP_NEWFD: {
976 JdwpNewFileDescriptor(baseBuf, bytesIO);
977 break;
978 }
979 default:
980 WRITE_LOG(LOG_WARN, "Not support session command");
981 break;
982 }
983 return ret;
984 }
985
ReadCtrlFromSession(uv_stream_t * uvpipe,ssize_t nread,const uv_buf_t * buf)986 void HdcSessionBase::ReadCtrlFromSession(uv_stream_t *uvpipe, ssize_t nread, const uv_buf_t *buf)
987 {
988 HSession hSession = (HSession)uvpipe->data;
989 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
990 while (true) {
991 if (nread < 0) {
992 constexpr int bufSize = 1024;
993 char buffer[bufSize] = { 0 };
994 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
995 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed,%s", buffer);
996 uv_read_stop(uvpipe);
997 break;
998 }
999 if (nread > 64) { // 64 : max length
1000 WRITE_LOG(LOG_WARN, "HdcSessionBase read overlap data");
1001 break;
1002 }
1003 // only one command, no need to split command from stream
1004 // if add more commands, consider the format command
1005 hSessionBase->DispatchSessionThreadCommand(uvpipe, hSession, (uint8_t *)buf->base, nread);
1006 break;
1007 }
1008 delete[] buf->base;
1009 }
1010
WorkThreadStartSession(HSession hSession)1011 bool HdcSessionBase::WorkThreadStartSession(HSession hSession)
1012 {
1013 bool regOK = false;
1014 int childRet = 0;
1015 if (hSession->connType == CONN_TCP) {
1016 HdcTCPBase *pTCPBase = (HdcTCPBase *)hSession->classModule;
1017 hSession->hChildWorkTCP.data = hSession;
1018 if (uv_tcp_init(&hSession->childLoop, &hSession->hChildWorkTCP) < 0) {
1019 WRITE_LOG(LOG_DEBUG, "HdcSessionBase SessionCtrl failed 1");
1020 return false;
1021 }
1022 if ((childRet = uv_tcp_open(&hSession->hChildWorkTCP, hSession->fdChildWorkTCP)) < 0) {
1023 constexpr int bufSize = 1024;
1024 char buf[bufSize] = { 0 };
1025 uv_strerror_r(childRet, buf, bufSize);
1026 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed 2,fd:%d,str:%s", hSession->fdChildWorkTCP, buf);
1027 return false;
1028 }
1029 Base::SetTcpOptions((uv_tcp_t *)&hSession->hChildWorkTCP);
1030 uv_read_start((uv_stream_t *)&hSession->hChildWorkTCP, AllocCallback, pTCPBase->ReadStream);
1031 regOK = true;
1032 #ifdef HDC_SUPPORT_UART
1033 } else if (hSession->connType == CONN_SERIAL) { // UART
1034 HdcUARTBase *pUARTBase = (HdcUARTBase *)hSession->classModule;
1035 WRITE_LOG(LOG_DEBUG, "UART ReadyForWorkThread");
1036 regOK = pUARTBase->ReadyForWorkThread(hSession);
1037 #endif
1038 } else { // USB
1039 HdcUSBBase *pUSBBase = (HdcUSBBase *)hSession->classModule;
1040 WRITE_LOG(LOG_DEBUG, "USB ReadyForWorkThread");
1041 regOK = pUSBBase->ReadyForWorkThread(hSession);
1042 }
1043
1044 if (regOK && hSession->serverOrDaemon) {
1045 // session handshake step1
1046 SessionHandShake handshake = {};
1047 handshake.banner = HANDSHAKE_MESSAGE;
1048 handshake.sessionId = hSession->sessionId;
1049 handshake.connectKey = hSession->connectKey;
1050 if (!hSession->isCheck) {
1051 handshake.version = Base::GetVersion() + HDC_MSG_HASH;
1052 WRITE_LOG(LOG_INFO, "set version = %s", handshake.version.c_str());
1053 }
1054 handshake.authType = AUTH_NONE;
1055 string hs = SerialStruct::SerializeToString(handshake);
1056 #ifdef HDC_SUPPORT_UART
1057 WRITE_LOG(LOG_DEBUG, "WorkThreadStartSession session %u auth %u send handshake hs: %s",
1058 hSession->sessionId, handshake.authType, hs.c_str());
1059 #endif
1060 Send(hSession->sessionId, 0, CMD_KERNEL_HANDSHAKE,
1061 reinterpret_cast<uint8_t *>(const_cast<char *>(hs.c_str())), hs.size());
1062 }
1063 return regOK;
1064 }
1065
BuildCtrlString(InnerCtrlCommand command,uint32_t channelId,uint8_t * data,int dataSize)1066 vector<uint8_t> HdcSessionBase::BuildCtrlString(InnerCtrlCommand command, uint32_t channelId, uint8_t *data,
1067 int dataSize)
1068 {
1069 vector<uint8_t> ret;
1070 while (true) {
1071 if (dataSize > BUF_SIZE_MICRO) {
1072 break;
1073 }
1074 CtrlStruct ctrl = {};
1075 ctrl.command = command;
1076 ctrl.channelId = channelId;
1077 ctrl.dataSize = dataSize;
1078 if (dataSize > 0 && data != nullptr && memcpy_s(ctrl.data, sizeof(ctrl.data), data, dataSize) != EOK) {
1079 break;
1080 }
1081 uint8_t *buf = reinterpret_cast<uint8_t *>(&ctrl);
1082 ret.insert(ret.end(), buf, buf + sizeof(CtrlStruct));
1083 break;
1084 }
1085 return ret;
1086 }
1087
DispatchMainThreadCommand(HSession hSession,const CtrlStruct * ctrl)1088 bool HdcSessionBase::DispatchMainThreadCommand(HSession hSession, const CtrlStruct *ctrl)
1089 {
1090 bool ret = true;
1091 uint32_t channelId = ctrl->channelId; // if send not set, it is zero
1092 switch (ctrl->command) {
1093 case SP_START_SESSION: {
1094 WRITE_LOG(LOG_DEBUG, "Dispatch MainThreadCommand START_SESSION sessionId:%u instance:%s",
1095 hSession->sessionId, hSession->serverOrDaemon ? "server" : "daemon");
1096 ret = WorkThreadStartSession(hSession);
1097 break;
1098 }
1099 case SP_STOP_SESSION: {
1100 WRITE_LOG(LOG_DEBUG, "Dispatch MainThreadCommand STOP_SESSION sessionId:%u", hSession->sessionId);
1101 auto closeSessionChildThreadTCPHandle = [](uv_handle_t *handle) -> void {
1102 HSession hSession = (HSession)handle->data;
1103 Base::TryCloseHandle((uv_handle_t *)handle);
1104 if (--hSession->uvChildRef == 0) {
1105 uv_stop(&hSession->childLoop);
1106 };
1107 };
1108 hSession->uvChildRef += 2;
1109 if (hSession->connType == CONN_TCP && hSession->hChildWorkTCP.loop) { // maybe not use it
1110 ++hSession->uvChildRef;
1111 Base::TryCloseHandle((uv_handle_t *)&hSession->hChildWorkTCP, true, closeSessionChildThreadTCPHandle);
1112 }
1113 Base::TryCloseHandle((uv_handle_t *)&hSession->ctrlPipe[STREAM_WORK], true,
1114 closeSessionChildThreadTCPHandle);
1115 Base::TryCloseHandle((uv_handle_t *)&hSession->dataPipe[STREAM_WORK], true,
1116 closeSessionChildThreadTCPHandle);
1117 break;
1118 }
1119 case SP_ATTACH_CHANNEL: {
1120 if (!serverOrDaemon) {
1121 break; // Only Server has this feature
1122 }
1123 AttachChannel(hSession, channelId);
1124 break;
1125 }
1126 case SP_DEATCH_CHANNEL: {
1127 if (!serverOrDaemon) {
1128 break; // Only Server has this feature
1129 }
1130 DeatchChannel(hSession, channelId);
1131 break;
1132 }
1133 default:
1134 WRITE_LOG(LOG_WARN, "Not support main command");
1135 ret = false;
1136 break;
1137 }
1138 return ret;
1139 }
1140
1141 // Several bytes of control instructions, generally do not stick
ReadCtrlFromMain(uv_stream_t * uvpipe,ssize_t nread,const uv_buf_t * buf)1142 void HdcSessionBase::ReadCtrlFromMain(uv_stream_t *uvpipe, ssize_t nread, const uv_buf_t *buf)
1143 {
1144 HSession hSession = (HSession)uvpipe->data;
1145 HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
1146 int formatCommandSize = sizeof(CtrlStruct);
1147 int index = 0;
1148 while (true) {
1149 if (nread < 0) {
1150 constexpr int bufSize = 1024;
1151 char buffer[bufSize] = { 0 };
1152 uv_strerror_r(static_cast<int>(nread), buffer, bufSize);
1153 WRITE_LOG(LOG_DEBUG, "SessionCtrl failed,%s", buffer);
1154 break;
1155 }
1156 if (nread % formatCommandSize != 0) {
1157 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain size failed, nread == %d", nread);
1158 break;
1159 }
1160 CtrlStruct *ctrl = reinterpret_cast<CtrlStruct *>(buf->base + index);
1161 if (!hSessionBase->DispatchMainThreadCommand(hSession, ctrl)) {
1162 WRITE_LOG(LOG_FATAL, "ReadCtrlFromMain failed sessionId:%u channelId:%u command:%u",
1163 hSession->sessionId, ctrl->channelId, ctrl->command);
1164 break;
1165 }
1166 index += sizeof(CtrlStruct);
1167 if (index >= nread) {
1168 break;
1169 }
1170 }
1171 delete[] buf->base;
1172 }
1173
ReChildLoopForSessionClear(HSession hSession)1174 void HdcSessionBase::ReChildLoopForSessionClear(HSession hSession)
1175 {
1176 // Restart loop close task
1177 ClearOwnTasks(hSession, 0);
1178 WRITE_LOG(LOG_INFO, "ReChildLoopForSessionClear sessionId:%u", hSession->sessionId);
1179 auto clearTaskForSessionFinish = [](uv_timer_t *handle) -> void {
1180 HSession hSession = (HSession)handle->data;
1181 for (auto v : *hSession->mapTask) {
1182 HTaskInfo hTask = (HTaskInfo)v.second;
1183 uint8_t level;
1184 if (hTask->closeRetryCount < GLOBAL_TIMEOUT / 2) {
1185 level = LOG_DEBUG;
1186 } else {
1187 level = LOG_WARN;
1188 }
1189 WRITE_LOG(level, "wait task free retry %d/%d, channelId:%u, sessionId:%u",
1190 hTask->closeRetryCount, GLOBAL_TIMEOUT, hTask->channelId, hTask->sessionId);
1191 if (hTask->closeRetryCount++ >= GLOBAL_TIMEOUT) {
1192 HdcSessionBase *thisClass = (HdcSessionBase *)hTask->ownerSessionClass;
1193 hSession = thisClass->AdminSession(OP_QUERY, hTask->sessionId, nullptr);
1194 thisClass->AdminTask(OP_VOTE_RESET, hSession, hTask->channelId, nullptr);
1195 }
1196 if (!hTask->taskFree)
1197 return;
1198 }
1199 // all task has been free
1200 uv_close((uv_handle_t *)handle, Base::CloseTimerCallback);
1201 uv_stop(&hSession->childLoop); // stop ReChildLoopForSessionClear pendding
1202 };
1203 Base::TimerUvTask(&hSession->childLoop, hSession, clearTaskForSessionFinish, (GLOBAL_TIMEOUT * TIME_BASE) / UV_DEFAULT_INTERVAL);
1204 uv_run(&hSession->childLoop, UV_RUN_DEFAULT);
1205 // clear
1206 Base::TryCloseLoop(&hSession->childLoop, "Session childUV");
1207 }
1208
SessionWorkThread(uv_work_t * arg)1209 void HdcSessionBase::SessionWorkThread(uv_work_t *arg)
1210 {
1211 int childRet = 0;
1212 HSession hSession = (HSession)arg->data;
1213 HdcSessionBase *thisClass = (HdcSessionBase *)hSession->classInstance;
1214 hSession->hWorkChildThread = uv_thread_self();
1215 if ((childRet = uv_tcp_init(&hSession->childLoop, &hSession->ctrlPipe[STREAM_WORK])) < 0) {
1216 constexpr int bufSize = 1024;
1217 char buf[bufSize] = { 0 };
1218 uv_strerror_r(childRet, buf, bufSize);
1219 WRITE_LOG(LOG_DEBUG, "SessionCtrl err1, %s", buf);
1220 }
1221 if ((childRet = uv_tcp_open(&hSession->ctrlPipe[STREAM_WORK], hSession->ctrlFd[STREAM_WORK])) < 0) {
1222 constexpr int bufSize = 1024;
1223 char buf[bufSize] = { 0 };
1224 uv_strerror_r(childRet, buf, bufSize);
1225 WRITE_LOG(LOG_DEBUG, "SessionCtrl err2, %s fd:%d", buf, hSession->ctrlFd[STREAM_WORK]);
1226 }
1227 uv_read_start((uv_stream_t *)&hSession->ctrlPipe[STREAM_WORK], Base::AllocBufferCallback, ReadCtrlFromMain);
1228 WRITE_LOG(LOG_DEBUG, "!!!Workthread run begin, sessionId:%u instance:%s", hSession->sessionId,
1229 thisClass->serverOrDaemon ? "server" : "daemon");
1230 uv_run(&hSession->childLoop, UV_RUN_DEFAULT); // work pendding
1231 WRITE_LOG(LOG_DEBUG, "!!!Workthread run again, sessionId:%u", hSession->sessionId);
1232 // main loop has exit
1233 thisClass->ReChildLoopForSessionClear(hSession); // work pending again
1234 hSession->childCleared = true;
1235 WRITE_LOG(LOG_DEBUG, "!!!Workthread run finish, sessionId:%u workthread:%p", hSession->sessionId, uv_thread_self());
1236 }
1237
1238 // clang-format off
LogMsg(const uint32_t sessionId,const uint32_t channelId,MessageLevel level,const char * msg,...)1239 void HdcSessionBase::LogMsg(const uint32_t sessionId, const uint32_t channelId,
1240 MessageLevel level, const char *msg, ...)
1241 // clang-format on
1242 {
1243 va_list vaArgs;
1244 va_start(vaArgs, msg);
1245 string log = Base::StringFormat(msg, vaArgs);
1246 va_end(vaArgs);
1247 vector<uint8_t> buf;
1248 buf.push_back(level);
1249 buf.insert(buf.end(), log.c_str(), log.c_str() + log.size());
1250 ServerCommand(sessionId, channelId, CMD_KERNEL_ECHO, buf.data(), buf.size());
1251 }
1252
NeedNewTaskInfo(const uint16_t command,bool & masterTask)1253 bool HdcSessionBase::NeedNewTaskInfo(const uint16_t command, bool &masterTask)
1254 {
1255 // referer from HdcServerForClient::DoCommandRemote
1256 bool ret = false;
1257 bool taskMasterInit = false;
1258 masterTask = false;
1259 switch (command) {
1260 case CMD_FILE_INIT:
1261 case CMD_FLASHD_FLASH_INIT:
1262 case CMD_FLASHD_UPDATE_INIT:
1263 case CMD_FLASHD_ERASE:
1264 case CMD_FLASHD_FORMAT:
1265 case CMD_FORWARD_INIT:
1266 case CMD_APP_INIT:
1267 case CMD_APP_UNINSTALL:
1268 case CMD_UNITY_BUGREPORT_INIT:
1269 case CMD_APP_SIDELOAD:
1270 taskMasterInit = true;
1271 break;
1272 default:
1273 break;
1274 }
1275 if (!serverOrDaemon
1276 && (command == CMD_SHELL_INIT || (command > CMD_UNITY_COMMAND_HEAD && command < CMD_UNITY_COMMAND_TAIL))) {
1277 // daemon's single side command
1278 ret = true;
1279 } else if (command == CMD_KERNEL_WAKEUP_SLAVETASK) {
1280 // slave tasks
1281 ret = true;
1282 } else if (taskMasterInit) {
1283 // task init command
1284 masterTask = true;
1285 ret = true;
1286 }
1287 return ret;
1288 }
1289 // Heavy and time-consuming work was putted in the new thread to do, and does
1290 // not occupy the main thread
DispatchTaskData(HSession hSession,const uint32_t channelId,const uint16_t command,uint8_t * payload,int payloadSize)1291 bool HdcSessionBase::DispatchTaskData(HSession hSession, const uint32_t channelId, const uint16_t command,
1292 uint8_t *payload, int payloadSize)
1293 {
1294 bool ret = true;
1295 HTaskInfo hTaskInfo = nullptr;
1296 bool masterTask = false;
1297 while (true) {
1298 // Some basic commands do not have a local task constructor. example: Interactive shell, some uinty commands
1299 if (NeedNewTaskInfo(command, masterTask)) {
1300 WRITE_LOG(LOG_DEBUG, "New HTaskInfo channelId:%u command:%u", channelId, command);
1301 hTaskInfo = new(std::nothrow) TaskInformation();
1302 if (hTaskInfo == nullptr) {
1303 WRITE_LOG(LOG_FATAL, "DispatchTaskData new hTaskInfo failed");
1304 break;
1305 }
1306 hTaskInfo->channelId = channelId;
1307 hTaskInfo->sessionId = hSession->sessionId;
1308 hTaskInfo->runLoop = &hSession->childLoop;
1309 hTaskInfo->serverOrDaemon = serverOrDaemon;
1310 hTaskInfo->masterSlave = masterTask;
1311 hTaskInfo->closeRetryCount = 0;
1312 hTaskInfo->channelTask = false;
1313
1314 int addTaskRetry = 3; // try 3 time
1315 while (addTaskRetry > 0) {
1316 if (AdminTask(OP_ADD, hSession, channelId, hTaskInfo)) {
1317 break;
1318 }
1319 sleep(1);
1320 --addTaskRetry;
1321 }
1322
1323 if (addTaskRetry == 0) {
1324 #ifndef HDC_HOST
1325 LogMsg(hTaskInfo->sessionId, hTaskInfo->channelId, MSG_FAIL, "hdc thread pool busy, may cause reset later");
1326 #endif
1327 delete hTaskInfo;
1328 hTaskInfo = nullptr;
1329 ret = false;
1330 break;
1331 }
1332 } else {
1333 hTaskInfo = AdminTask(OP_QUERY, hSession, channelId, nullptr);
1334 }
1335 if (!hTaskInfo || hTaskInfo->taskStop || hTaskInfo->taskFree) {
1336 WRITE_LOG(LOG_ALL, "Dead HTaskInfo, ignore, channelId:%u command:%u", channelId, command);
1337 break;
1338 }
1339 ret = RedirectToTask(hTaskInfo, hSession, channelId, command, payload, payloadSize);
1340 break;
1341 }
1342 return ret;
1343 }
1344
PostStopInstanceMessage(bool restart)1345 void HdcSessionBase::PostStopInstanceMessage(bool restart)
1346 {
1347 PushAsyncMessage(0, ASYNC_STOP_MAINLOOP, nullptr, 0);
1348 WRITE_LOG(LOG_DEBUG, "StopDaemon has sended restart %d", restart);
1349 wantRestart = restart;
1350 }
1351 } // namespace Hdc
1352