1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 : loopMainStatus(loopMainIn, "ChannelBaseMainLoop")
19 {
20 SetChannelTCPString(addrString);
21 isServerOrClient = serverOrClient;
22 loopMain = loopMainIn;
23 loopMainStatus.StartReportTimer();
24 threadChanneMain = uv_thread_self();
25 uv_rwlock_init(&mainAsync);
26 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
27 uv_rwlock_init(&lockMapChannel);
28 queuedPackages.store(0);
29 }
30
~HdcChannelBase()31 HdcChannelBase::~HdcChannelBase()
32 {
33 ClearChannels();
34 // clear
35 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
36 uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
37 }
38
39 uv_rwlock_destroy(&mainAsync);
40 uv_rwlock_destroy(&lockMapChannel);
41 }
42
GetChannelHandshake(string & connectKey) const43 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
44 {
45 vector<uint8_t> ret;
46 struct ChannelHandShake handshake = {};
47 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
48 return ret;
49 }
50 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
51 return ret;
52 }
53 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
54 return ret;
55 }
56
SetChannelTCPString(const string & addrString)57 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
58 {
59 bool ret = false;
60 while (true) {
61 if (addrString.find(":") == string::npos) {
62 break;
63 }
64 std::size_t found = addrString.find_last_of(":");
65 if (found == string::npos) {
66 break;
67 }
68
69 string host = addrString.substr(0, found);
70 string port = addrString.substr(found + 1);
71
72 channelPort = std::atoi(port.c_str());
73 sockaddr_in addrv4;
74 sockaddr_in6 addrv6;
75 if (!channelPort) {
76 break;
77 }
78
79 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
80 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
81 break;
82 }
83 channelHost = host;
84 channelHostPort = addrString;
85 ret = true;
86 break;
87 }
88 if (!ret) {
89 channelPort = 0;
90 channelHost = STRING_EMPTY;
91 channelHostPort = STRING_EMPTY;
92 }
93 return ret;
94 }
95
ClearChannels()96 void HdcChannelBase::ClearChannels()
97 {
98 for (auto v : mapChannel) {
99 HChannel hChannel = (HChannel)v.second;
100 if (!hChannel->isDead) {
101 FreeChannel(hChannel->channelId);
102 }
103 }
104 }
105
WorkerPendding()106 void HdcChannelBase::WorkerPendding()
107 {
108 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
109 uv_run(loopMain, UV_RUN_DEFAULT);
110 uv_loop_close(loopMain);
111 }
112
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)113 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
114 {
115 StartTraceScope("HdcChannelBase::ReadStream");
116 int size = 0;
117 int indexBuf = 0;
118 int childRet = 0;
119 bool needExit = false;
120 HChannel hChannel = (HChannel)tcp->data;
121 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
122 uint32_t channelId = hChannel->channelId;
123 CALLSTAT_GUARD(*(hChannel->loopStatus), tcp->loop, "HdcChannelBase::ReadStream");
124
125 if (nread == UV_ENOBUFS) {
126 WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
127 #ifdef HDC_HOST
128 char buffer[BUF_SIZE_DEFAULT] = { 0 };
129 uv_strerror_r(nread, buffer, BUF_SIZE_DEFAULT);
130 thisClass->FillChannelResult(hChannel, false, buffer);
131 #endif
132 return;
133 } else if (nread == 0) {
134 // maybe just after accept, second client req
135 WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
136 return;
137 } else if (nread < 0) {
138 Base::TryCloseHandle((uv_handle_t *)tcp);
139 constexpr int bufSize = 1024;
140 char buffer[bufSize] = { 0 };
141 uv_err_name_r(nread, buffer, bufSize);
142 WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
143 #ifdef HDC_HOST
144 thisClass->FillChannelResult(hChannel, false, buffer);
145 #endif
146 needExit = true;
147 goto Finish;
148 } else {
149 hChannel->availTailIndex += nread;
150 }
151 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
152 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian
153 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
154 WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
155 #ifdef HDC_HOST
156 thisClass->FillChannelResult(hChannel, false,
157 "parse error: size field is too big");
158 #endif
159 needExit = true;
160 break;
161 }
162 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
163 break;
164 }
165 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
166 DWORD_SERIALIZE_SIZE + indexBuf, size);
167 if (childRet < 0) {
168 WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
169 childRet, channelId, hChannel->keepAlive);
170 if (!hChannel->keepAlive) {
171 needExit = true;
172 break;
173 }
174 }
175 // update io
176 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
177 indexBuf += DWORD_SERIALIZE_SIZE + size;
178 }
179 if (indexBuf > 0 && hChannel->availTailIndex > 0) {
180 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
181 needExit = true;
182 goto Finish;
183 }
184 }
185
186 Finish:
187 if (needExit) {
188 thisClass->FreeChannel(hChannel->channelId);
189 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
190 } else {
191 #ifdef HDC_HOST
192 hChannel->isSuccess = (hChannel->faultInfo.size() == 0);
193 #endif
194 }
195 }
196
FileCmdWriteCallback(uv_write_t * req,int status)197 void HdcChannelBase::FileCmdWriteCallback(uv_write_t *req, int status)
198 {
199 #ifdef HDC_HOST
200 HChannel hChannel = (HChannel)req->handle->data;
201 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
202 thisClass->queuedPackages.fetch_sub(1, std::memory_order_relaxed);
203 #endif
204 WriteCallback(req, status);
205 }
206
WriteCallback(uv_write_t * req,int status)207 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
208 {
209 HChannel hChannel = (HChannel)req->handle->data;
210 --hChannel->ref;
211 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
212 CALLSTAT_GUARD(*(hChannel->loopStatus), req->handle->loop, "HdcChannelBase::WriteCallback");
213 if (status < 0) {
214 hChannel->writeFailedTimes++;
215 Base::TryCloseHandle((uv_handle_t *)req->handle);
216 if (!hChannel->isDead && !hChannel->ref) {
217 thisClass->FreeChannel(hChannel->channelId);
218 }
219 }
220 delete[]((uint8_t *)req->data);
221 delete req;
222 }
223
AsyncMainLoopTask(uv_idle_t * handle)224 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
225 {
226 AsyncParam *param = (AsyncParam *)handle->data;
227 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
228 CALLSTAT_GUARD(thisClass->loopMainStatus, handle->loop, "HdcChannelBase::AsyncMainLoopTask");
229 switch (param->method) {
230 case ASYNC_FREE_CHANNEL: {
231 // alloc/release should pair in main thread.
232 thisClass->FreeChannel(param->sid);
233 break;
234 }
235 default:
236 break;
237 }
238 if (param->data) {
239 delete[]((uint8_t *)param->data);
240 }
241 delete param;
242 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
243 }
244
245 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
246 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
247 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)248 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
249 {
250 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
251 CALLSTAT_GUARD(thisClass->loopMainStatus, handle->loop, "HdcChannelBase::MainAsyncCallback");
252 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
253 WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
254 return;
255 }
256 list<void *>::iterator i;
257 list<void *> &lst = thisClass->lstMainThreadOP;
258 uv_rwlock_wrlock(&thisClass->mainAsync);
259 for (i = lst.begin(); i != lst.end();) {
260 AsyncParam *param = (AsyncParam *)*i;
261 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
262 i = lst.erase(i);
263 }
264 uv_rwlock_wrunlock(&thisClass->mainAsync);
265 }
266
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)267 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
268 const int dataSize)
269 {
270 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
271 WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
272 return;
273 }
274 auto param = new AsyncParam();
275 if (!param) {
276 return;
277 }
278 param->sid = channelId; // Borrow SID storage
279 param->thisClass = this;
280 param->method = method;
281 if (dataSize > 0) {
282 param->dataSize = dataSize;
283 param->data = new uint8_t[param->dataSize]();
284 if (!param->data) {
285 delete param;
286 return;
287 }
288 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
289 delete[]((uint8_t *)param->data);
290 delete param;
291 return;
292 }
293 }
294 asyncMainLoop.data = this;
295 uv_rwlock_wrlock(&mainAsync);
296 lstMainThreadOP.push_back(param);
297 uv_rwlock_wrunlock(&mainAsync);
298 uv_async_send(&asyncMainLoop);
299 }
300
301 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)302 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
303 {
304 StartTraceScope("HdcChannelBase::SendChannelWithCmd");
305 if (size < 0) {
306 WRITE_LOG(LOG_WARN, "SendChannelWithCmd size %d", size);
307 return;
308 }
309 auto data = new uint8_t[size + sizeof(commandFlag)]();
310 if (!data) {
311 WRITE_LOG(LOG_WARN, "malloc failed");
312 return;
313 }
314
315 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
316 delete[] data;
317 WRITE_LOG(LOG_DEBUG, "memcpy_s failed commandFlag:%u", commandFlag);
318 return;
319 }
320
321 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
322 delete[] data;
323 WRITE_LOG(LOG_DEBUG, "memcpy_s bufPtr failed size:%d", size);
324 return;
325 }
326
327 SendChannel(hChannel, data, size + sizeof(commandFlag), commandFlag);
328 delete[] data;
329 }
330
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)331 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
332 {
333 StartTraceScope("HdcChannelBase::SendWithCmd");
334 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
335 if (!hChannel) {
336 WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
337 return;
338 }
339 do {
340 if (hChannel->isDead) {
341 WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
342 break;
343 }
344 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
345 } while (false);
346 --hChannel->ref;
347 }
348
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size,const uint16_t commandFlag)349 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size, const uint16_t commandFlag)
350 {
351 StartTraceScope("HdcChannelBase::SendChannel");
352 uv_stream_t *sendStream = nullptr;
353 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
354 auto data = new uint8_t[sizeNewBuf]();
355 if (!data) {
356 WRITE_LOG(LOG_DEBUG, "new data nullptr sizeNewBuf:%d", sizeNewBuf);
357 return;
358 }
359 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian
360 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
361 delete[] data;
362 WRITE_LOG(LOG_DEBUG, "memcpy_s failed size:%d", size);
363 return;
364 }
365
366 #ifdef HOST_OHOS
367 if (hChannel->hWorkThread == uv_thread_self()) {
368 if (!hChannel->isUds) {
369 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
370 } else {
371 sendStream = (uv_stream_t *)&hChannel->hWorkUds;
372 }
373 } else {
374 if (!hChannel->isUds) {
375 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
376 } else {
377 sendStream = (uv_stream_t *)&hChannel->hChildWorkUds;
378 }
379 }
380 #else
381 if (hChannel->hWorkThread == uv_thread_self()) {
382 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
383 } else {
384 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
385 }
386 #endif
387 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
388 ++hChannel->ref;
389 if (commandFlag == CMD_FILE_DATA || commandFlag == CMD_APP_DATA) {
390 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)FileCmdWriteCallback, data);
391 } else {
392 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
393 }
394 } else {
395 delete[] data;
396 }
397 }
398
399 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)400 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
401 {
402 StartTraceScope("HdcChannelBase::Send");
403 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
404 if (!hChannel) {
405 WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
406 return;
407 }
408 do {
409 if (hChannel->isDead) {
410 WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
411 break;
412 }
413 SendChannel(hChannel, bufPtr, size);
414 } while (false);
415 --hChannel->ref;
416 }
417
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)418 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
419 {
420 HChannel context = (HChannel)handle->data;
421 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
422 buf->base = (char *)context->ioBuf + context->availTailIndex;
423 int size = context->bufSize - context->availTailIndex;
424 buf->len = std::min(size, static_cast<int>(sizeWanted));
425 }
426
GetChannelPseudoUid()427 uint32_t HdcChannelBase::GetChannelPseudoUid()
428 {
429 uint32_t uid = 0;
430 do {
431 uid = Base::GetSecureRandom();
432 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
433 return uid;
434 }
435
436 #ifdef HOST_OHOS
MallocChannel(HChannel * hOutChannel)437 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
438 {
439 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
440 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
441 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
442 #endif
443 auto hChannel = new HdcChannel();
444 if (!hChannel || !(*hOutChannel)) {
445 WRITE_LOG(LOG_FATAL, "malloc channel failed");
446 return 0;
447 }
448 hChannel->isUds = (*hOutChannel)->isUds;
449 hChannel->stdinTty.data = nullptr;
450 hChannel->stdoutTty.data = nullptr;
451 uint32_t channelId = GetChannelPseudoUid();
452 if (isServerOrClient) {
453 hChannel->serverOrClient = isServerOrClient;
454 ++channelId; // Use different value for serverForClient&client in per process
455 }
456 if (!hChannel->isUds) {
457 int rc = uv_tcp_init(loopMain, &hChannel->hWorkTCP);
458 if (rc < 0) {
459 WRITE_LOG(LOG_FATAL, "MallocChannel uv_tcp_init failed, rc:%d cid:%u", rc, channelId);
460 }
461 hChannel->hWorkTCP.data = hChannel;
462 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
463 } else {
464 int rc = uv_pipe_init(loopMain, &hChannel->hWorkUds, 0);
465 if (rc < 0) {
466 WRITE_LOG(LOG_FATAL, "MallocChannel uv_pipe_init failed, rc:%d cid:%u", rc, channelId);
467 }
468 hChannel->hWorkUds.data = hChannel;
469 (void)memset_s(&hChannel->hChildWorkUds, sizeof(hChannel->hChildWorkUds), 0, sizeof(uv_pipe_t));
470 }
471 ++hChannel->uvHandleRef;
472 hChannel->hWorkThread = uv_thread_self();
473 hChannel->clsChannel = this;
474 hChannel->channelId = channelId;
475 hChannel->loopStatus = &loopMainStatus;
476 AdminChannel(OP_ADD, channelId, hChannel);
477 delete *hOutChannel;
478 *hOutChannel = hChannel;
479 WRITE_LOG(isServerOrClient ? LOG_INFO : LOG_DEBUG, "Mallocchannel:%u", channelId);
480 return channelId;
481 }
482 #else
MallocChannel(HChannel * hOutChannel)483 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
484 {
485 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
486 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
487 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
488 #endif
489 auto hChannel = new HdcChannel();
490 if (!hChannel) {
491 WRITE_LOG(LOG_FATAL, "malloc channel failed");
492 return 0;
493 }
494 hChannel->stdinTty.data = nullptr;
495 hChannel->stdoutTty.data = nullptr;
496 uint32_t channelId = GetChannelPseudoUid();
497 if (isServerOrClient) {
498 hChannel->serverOrClient = isServerOrClient;
499 ++channelId; // Use different value for serverForClient&client in per process
500 }
501 int rc = uv_tcp_init(loopMain, &hChannel->hWorkTCP);
502 if (rc < 0) {
503 WRITE_LOG(LOG_FATAL, "MallocChannel uv_tcp_init failed, rc:%d cid:%u", rc, channelId);
504 }
505
506 ++hChannel->uvHandleRef;
507 hChannel->hWorkThread = uv_thread_self();
508 hChannel->hWorkTCP.data = hChannel;
509 hChannel->clsChannel = this;
510 hChannel->channelId = channelId;
511 hChannel->loopStatus = &loopMainStatus;
512 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
513 AdminChannel(OP_ADD, channelId, hChannel);
514 *hOutChannel = hChannel;
515 WRITE_LOG(isServerOrClient ? LOG_INFO : LOG_DEBUG, "Mallocchannel:%u", channelId);
516 return channelId;
517 }
518 #endif
519
520 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)521 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
522 {
523 HChannel hChannel = (HChannel)handle->data;
524 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
525 if (hChannel->uvHandleRef > 0) {
526 if (hChannel->serverOrClient) {
527 WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
528 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
529 } else {
530 WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
531 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
532 }
533 return;
534 }
535 thisClass->NotifyInstanceChannelFree(hChannel);
536 #ifdef HDC_HOST
537 hChannel->endTime = Base::GetRuntimeMSec();
538 if (hChannel->serverOrClient) {
539 thisClass->AdminChannel(OP_PRINT, hChannel->channelId, nullptr);
540 }
541 #endif
542 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
543
544 if (!hChannel->serverOrClient) {
545 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
546 hChannel->channelId, hChannel->targetSessionId);
547 uv_stop(thisClass->loopMain);
548 } else {
549 WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
550 hChannel->channelId, hChannel->targetSessionId);
551 }
552 #ifdef HDC_HOST
553 auto deleteChannel = [](uv_handle_t *handle) -> void {
554 if (handle->data == nullptr) {
555 return;
556 }
557 HChannel hChannel = reinterpret_cast<HChannel>(handle->data);
558 delete hChannel;
559 };
560 Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP, true, deleteChannel);
561 #else
562 delete hChannel;
563 #endif
564 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
565 }
566
FreeChannelContinue(HChannel hChannel)567 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
568 {
569 StartTraceScope("HdcChannelBase::FreeChannelContinue");
570 auto closeChannelHandle = [](uv_handle_t *handle) -> void {
571 if (handle->data == nullptr) {
572 WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
573 return;
574 }
575 HChannel channel = reinterpret_cast<HChannel>(handle->data);
576 --channel->uvHandleRef;
577 Base::TryCloseHandle((uv_handle_t *)handle);
578 };
579 hChannel->availTailIndex = 0;
580 if (hChannel->ioBuf) {
581 delete[] hChannel->ioBuf;
582 hChannel->ioBuf = nullptr;
583 }
584 if (!hChannel->serverOrClient) {
585 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
586 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
587 }
588 #ifdef HOST_OHOS
589 if (!hChannel->isUds) {
590 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
591 --hChannel->uvHandleRef;
592 } else {
593 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
594 }
595 } else {
596 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkUds)) {
597 --hChannel->uvHandleRef;
598 } else {
599 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkUds, closeChannelHandle);
600 }
601 }
602 #else
603 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
604 --hChannel->uvHandleRef;
605 } else {
606 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
607 }
608 #endif
609 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
610 }
611
612 #ifdef HOST_OHOS
FreeChannelOpeate(uv_timer_t * handle)613 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
614 {
615 StartTraceScope("HdcChannelBase::FreeChannelOpeate");
616 HChannel hChannel = (HChannel)handle->data;
617 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
618 if (hChannel->ref > 0) {
619 return;
620 }
621 thisClass->DispMntnInfo(hChannel);
622 if (hChannel->hChildWorkTCP.loop || hChannel->hChildWorkUds.loop) {
623 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
624 bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
625 if (!ret) {
626 WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
627 hChannel->channelId, hChannel->targetSessionId);
628 hChannel->childCleared = true;
629 }
630 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
631 HChannel hChannel = (HChannel)handle->data;
632 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
633 if (!hChannel->childCleared) {
634 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
635 hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
636 return;
637 }
638 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
639 thisClass->FreeChannelContinue(hChannel);
640 };
641 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
642 } else {
643 thisClass->FreeChannelContinue(hChannel);
644 }
645 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
646 }
647 #else
FreeChannelOpeate(uv_timer_t * handle)648 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
649 {
650 StartTraceScope("HdcChannelBase::FreeChannelOpeate");
651 HChannel hChannel = (HChannel)handle->data;
652 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
653 if (hChannel->ref > 0) {
654 return;
655 }
656 thisClass->DispMntnInfo(hChannel);
657 if (hChannel->hChildWorkTCP.loop) {
658 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
659 bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
660 if (!ret) {
661 WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
662 hChannel->channelId, hChannel->targetSessionId);
663 hChannel->childCleared = true;
664 }
665 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
666 HChannel hChannel = (HChannel)handle->data;
667 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
668 if (!hChannel->childCleared) {
669 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
670 hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
671 return;
672 }
673 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
674 thisClass->FreeChannelContinue(hChannel);
675 };
676 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
677 } else {
678 thisClass->FreeChannelContinue(hChannel);
679 }
680 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
681 }
682 #endif
683
FreeChannel(const uint32_t channelId)684 void HdcChannelBase::FreeChannel(const uint32_t channelId)
685 {
686 StartTraceScope("HdcChannelBase::FreeChannel");
687 if (threadChanneMain != uv_thread_self()) {
688 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
689 WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
690 return;
691 }
692 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
693 do {
694 if (!hChannel || hChannel->isDead) {
695 WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
696 break;
697 }
698 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
699 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately
700 hChannel->isDead = true;
701 } while (false);
702 }
703
704 #ifdef HDC_HOST
PrintChannel(const uint32_t channelId)705 void HdcChannelBase::PrintChannel(const uint32_t channelId)
706 {
707 uv_rwlock_rdlock(&lockMapChannel);
708 for (auto v : mapChannel) {
709 HChannel hChannel = (HChannel)v.second;
710 if (hChannel->channelId == channelId) {
711 auto str = hChannel->ToDisplayChannelStr();
712 WRITE_LOG(LOG_INFO, "%s", str.c_str());
713 break;
714 }
715 }
716 uv_rwlock_rdunlock(&lockMapChannel);
717 }
718 #endif
719
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)720 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
721 {
722 StartTraceScope("HdcChannelBase::AdminChannel");
723 HChannel hRet = nullptr;
724 switch (op) {
725 case OP_PRINT:
726 #ifdef HDC_HOST
727 PrintChannel(channelId);
728 #endif
729 break;
730 case OP_ADD:
731 uv_rwlock_wrlock(&lockMapChannel);
732 mapChannel[channelId] = hInput;
733 uv_rwlock_wrunlock(&lockMapChannel);
734 break;
735 case OP_REMOVE:
736 uv_rwlock_wrlock(&lockMapChannel);
737 mapChannel.erase(channelId);
738 uv_rwlock_wrunlock(&lockMapChannel);
739 break;
740 case OP_QUERY:
741 uv_rwlock_rdlock(&lockMapChannel);
742 if (mapChannel.count(channelId)) {
743 hRet = mapChannel[channelId];
744 }
745 uv_rwlock_rdunlock(&lockMapChannel);
746 break;
747 case OP_QUERY_REF:
748 uv_rwlock_wrlock(&lockMapChannel);
749 if (mapChannel.count(channelId)) {
750 hRet = mapChannel[channelId];
751 ++hRet->ref;
752 }
753 uv_rwlock_wrunlock(&lockMapChannel);
754 break;
755 case OP_UPDATE:
756 uv_rwlock_wrlock(&lockMapChannel);
757 // remove old
758 mapChannel.erase(channelId);
759 mapChannel[hInput->channelId] = hInput;
760 uv_rwlock_wrunlock(&lockMapChannel);
761 break;
762 default:
763 break;
764 }
765 return hRet;
766 }
767
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)768 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
769 {
770 StartTraceScope("HdcChannelBase::EchoToClient");
771 uv_stream_t *sendStream = nullptr;
772 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
773 auto data = new uint8_t[sizeNewBuf]();
774 if (!data) {
775 return;
776 }
777 *reinterpret_cast<uint32_t *>(data) = htonl(size);
778 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
779 delete[] data;
780 return;
781 }
782 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
783 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
784 ++hChannel->ref;
785 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
786 } else {
787 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
788 delete[] data;
789 }
790 }
791
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)792 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
793 {
794 for (auto v : mapChannel) {
795 HChannel hChannel = (HChannel)v.second;
796 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
797 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
798 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
799 }
800 }
801 }
802
DispMntnInfo(HChannel hChannel)803 void HdcChannelBase::DispMntnInfo(HChannel hChannel)
804 {
805 if (!hChannel) {
806 WRITE_LOG(LOG_WARN, "prt is null");
807 return;
808 }
809 WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
810 hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
811 }
812 }
813