• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19     SetChannelTCPString(addrString);
20     isServerOrClient = serverOrClient;
21     loopMain = loopMainIn;
22     threadChanneMain = uv_thread_self();
23     uv_rwlock_init(&mainAsync);
24     uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25     uv_rwlock_init(&lockMapChannel);
26     sessionIsDead = false;
27 }
28 
~HdcChannelBase()29 HdcChannelBase::~HdcChannelBase()
30 {
31     ClearChannels();
32     // clear
33     if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
34         uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
35     }
36 
37     uv_rwlock_destroy(&mainAsync);
38     uv_rwlock_destroy(&lockMapChannel);
39 }
40 
GetChannelHandshake(string & connectKey) const41 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
42 {
43     vector<uint8_t> ret;
44     struct ChannelHandShake handshake = {};
45     if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
46         return ret;
47     }
48     if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
49         return ret;
50     }
51     ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
52     return ret;
53 }
54 
SetChannelTCPString(const string & addrString)55 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
56 {
57     bool ret = false;
58     while (true) {
59         if (addrString.find(":") == string::npos) {
60             break;
61         }
62         std::size_t found = addrString.find_last_of(":");
63         if (found == string::npos) {
64             break;
65         }
66 
67         string host = addrString.substr(0, found);
68         string port = addrString.substr(found + 1);
69 
70         channelPort = std::atoi(port.c_str());
71         sockaddr_in addrv4;
72         sockaddr_in6 addrv6;
73         if (!channelPort) {
74             break;
75         }
76 
77         if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
78             uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
79             break;
80         }
81         channelHost = host;
82         channelHostPort = addrString;
83         ret = true;
84         break;
85     }
86     if (!ret) {
87         channelPort = 0;
88         channelHost = STRING_EMPTY;
89         channelHostPort = STRING_EMPTY;
90     }
91     return ret;
92 }
93 
ClearChannels()94 void HdcChannelBase::ClearChannels()
95 {
96     for (auto v : mapChannel) {
97         HChannel hChannel = (HChannel)v.second;
98         if (!hChannel->isDead) {
99             FreeChannel(hChannel->channelId);
100         }
101     }
102 }
103 
WorkerPendding()104 void HdcChannelBase::WorkerPendding()
105 {
106     WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
107     uv_run(loopMain, UV_RUN_DEFAULT);
108     uv_loop_close(loopMain);
109 }
110 
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)111 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
112 {
113     StartTraceScope("HdcChannelBase::ReadStream");
114     int size = 0;
115     int indexBuf = 0;
116     int childRet = 0;
117     bool needExit = false;
118     HChannel hChannel = (HChannel)tcp->data;
119     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
120     uint32_t channelId = hChannel->channelId;
121 
122     if (nread == UV_ENOBUFS) {
123         WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
124         return;
125     } else if (nread == 0) {
126         // maybe just after accept, second client req
127         WRITE_LOG(LOG_FATAL, "ReadStream idle read channelId:%u", channelId);
128         return;
129     } else if (nread < 0) {
130         Base::TryCloseHandle((uv_handle_t *)tcp);
131         constexpr int bufSize = 1024;
132         char buffer[bufSize] = { 0 };
133         uv_err_name_r(nread, buffer, bufSize);
134         WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
135         needExit = true;
136         goto Finish;
137     } else {
138         hChannel->availTailIndex += nread;
139     }
140     while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
141         size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf));  // big endian
142         if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
143             WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
144             needExit = true;
145             break;
146         }
147         if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
148             break;
149         }
150         childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
151                                           DWORD_SERIALIZE_SIZE + indexBuf, size);
152         if (childRet < 0) {
153             WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
154                 childRet, channelId, hChannel->keepAlive);
155             if (!hChannel->keepAlive) {
156                 needExit = true;
157                 break;
158             }
159         }
160         // update io
161         hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
162         indexBuf += DWORD_SERIALIZE_SIZE + size;
163     }
164     if (indexBuf > 0 && hChannel->availTailIndex > 0) {
165         if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
166             needExit = true;
167             goto Finish;
168         }
169     }
170 
171 Finish:
172     if (needExit) {
173         thisClass->FreeChannel(hChannel->channelId);
174         WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
175     }
176 }
177 
WriteCallback(uv_write_t * req,int status)178 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
179 {
180     HChannel hChannel = (HChannel)req->handle->data;
181     --hChannel->ref;
182     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
183     if (status < 0) {
184         WRITE_LOG(LOG_WARN, "WriteCallback TryCloseHandle channelId:%u isDead:%d ref:%d",
185             hChannel->channelId, hChannel->isDead, int32_t(hChannel->ref));
186         Base::TryCloseHandle((uv_handle_t *)req->handle);
187         if (!hChannel->isDead && !hChannel->ref) {
188             thisClass->FreeChannel(hChannel->channelId);
189         }
190     }
191     delete[]((uint8_t *)req->data);
192     delete req;
193 }
194 
AsyncMainLoopTask(uv_idle_t * handle)195 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
196 {
197     AsyncParam *param = (AsyncParam *)handle->data;
198     HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
199 
200     switch (param->method) {
201         case ASYNC_FREE_CHANNEL: {
202             // alloc/release should pair in main thread.
203             thisClass->FreeChannel(param->sid);
204             break;
205         }
206         default:
207             break;
208     }
209     if (param->data) {
210         delete[]((uint8_t *)param->data);
211     }
212     delete param;
213     uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
214 }
215 
216 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
217 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
218 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)219 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
220 {
221     HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
222     if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
223         WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
224         return;
225     }
226     list<void *>::iterator i;
227     list<void *> &lst = thisClass->lstMainThreadOP;
228     uv_rwlock_wrlock(&thisClass->mainAsync);
229     for (i = lst.begin(); i != lst.end();) {
230         AsyncParam *param = (AsyncParam *)*i;
231         Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
232         i = lst.erase(i);
233     }
234     uv_rwlock_wrunlock(&thisClass->mainAsync);
235 }
236 
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)237 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
238                                       const int dataSize)
239 {
240     if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
241         WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
242         return;
243     }
244     auto param = new AsyncParam();
245     if (!param) {
246         return;
247     }
248     param->sid = channelId;  // Borrow SID storage
249     param->thisClass = this;
250     param->method = method;
251     if (dataSize > 0) {
252         param->dataSize = dataSize;
253         param->data = new uint8_t[param->dataSize]();
254         if (!param->data) {
255             delete param;
256             return;
257         }
258         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
259             delete[]((uint8_t *)param->data);
260             delete param;
261             return;
262         }
263     }
264     asyncMainLoop.data = this;
265     uv_rwlock_wrlock(&mainAsync);
266     lstMainThreadOP.push_back(param);
267     uv_rwlock_wrunlock(&mainAsync);
268     uv_async_send(&asyncMainLoop);
269 }
270 
271 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)272 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
273 {
274     StartTraceScope("HdcChannelBase::SendChannelWithCmd");
275     auto data = new uint8_t[size + sizeof(commandFlag)]();
276     if (!data) {
277         return;
278     }
279 
280     if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
281         delete[] data;
282         return;
283     }
284 
285     if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
286         delete[] data;
287         return;
288     }
289 
290     SendChannel(hChannel, data, size + sizeof(commandFlag));
291     delete[] data;
292 }
293 
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)294 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
295 {
296     StartTraceScope("HdcChannelBase::SendWithCmd");
297     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
298     if (!hChannel) {
299         WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
300         return;
301     }
302     do {
303         if (hChannel->isDead) {
304             WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
305             break;
306         }
307         SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
308     } while (false);
309     --hChannel->ref;
310 }
311 
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size)312 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
313 {
314     StartTraceScope("HdcChannelBase::SendChannel");
315     uv_stream_t *sendStream = nullptr;
316     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
317     auto data = new uint8_t[sizeNewBuf]();
318     if (!data) {
319         return;
320     }
321     *reinterpret_cast<uint32_t *>(data) = htonl(size);  // big endian
322     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
323         delete[] data;
324         return;
325     }
326     if (hChannel->hWorkThread == uv_thread_self()) {
327         sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
328     } else {
329         sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
330     }
331     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
332         ++hChannel->ref;
333         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
334     } else {
335         delete[] data;
336     }
337 }
338 
339 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)340 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
341 {
342     StartTraceScope("HdcChannelBase::Send");
343     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
344     if (!hChannel) {
345         WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
346         return;
347     }
348     do {
349         if (hChannel->isDead) {
350             WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
351             break;
352         }
353         SendChannel(hChannel, bufPtr, size);
354     } while (false);
355     --hChannel->ref;
356 }
357 
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)358 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
359 {
360     HChannel context = (HChannel)handle->data;
361     Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * 4);
362     buf->base = (char *)context->ioBuf + context->availTailIndex;
363     buf->len = context->bufSize - context->availTailIndex;
364 }
365 
GetChannelPseudoUid()366 uint32_t HdcChannelBase::GetChannelPseudoUid()
367 {
368     uint32_t uid = 0;
369     do {
370         uid = static_cast<uint32_t>(Base::GetRandom());
371     } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
372     return uid;
373 }
374 
MallocChannel(HChannel * hOutChannel)375 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
376 {
377 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
378     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
379     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
380 #endif
381     auto hChannel = new HdcChannel();
382     if (!hChannel) {
383         WRITE_LOG(LOG_FATAL, "malloc channel failed");
384         return 0;
385     }
386     hChannel->stdinTty.data = nullptr;
387     hChannel->stdoutTty.data = nullptr;
388     uint32_t channelId = GetChannelPseudoUid();
389     if (isServerOrClient) {
390         hChannel->serverOrClient = isServerOrClient;
391         ++channelId;  // Use different value for serverForClient&client in per process
392     }
393     uv_tcp_init(loopMain, &hChannel->hWorkTCP);
394     ++hChannel->uvHandleRef;
395     hChannel->hWorkThread = uv_thread_self();
396     hChannel->hWorkTCP.data = hChannel;
397     hChannel->clsChannel = this;
398     hChannel->channelId = channelId;
399     (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
400     AdminChannel(OP_ADD, channelId, hChannel);
401     *hOutChannel = hChannel;
402     WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
403     return channelId;
404 }
405 
406 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)407 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
408 {
409     HChannel hChannel = (HChannel)handle->data;
410     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
411     if (hChannel->uvHandleRef > 0) {
412         WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u",
413             hChannel->uvHandleRef, hChannel->channelId);
414         return;
415     }
416     thisClass->NotifyInstanceChannelFree(hChannel);
417     thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
418     WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u finish", hChannel->channelId);
419     if (!hChannel->serverOrClient) {
420         uv_stop(thisClass->loopMain);
421     }
422     delete hChannel;
423     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
424 }
425 
FreeChannelContinue(HChannel hChannel)426 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
427 {
428     auto closeChannelHandle = [](uv_handle_t *handle) -> void {
429         if (handle->data == nullptr) {
430             WRITE_LOG(LOG_WARN, "FreeChannelContinue handle->data is nullptr");
431             return;
432         }
433         HChannel channel = reinterpret_cast<HChannel>(handle->data);
434         --channel->uvHandleRef;
435         Base::TryCloseHandle((uv_handle_t *)handle);
436     };
437     hChannel->availTailIndex = 0;
438     if (hChannel->ioBuf) {
439         delete[] hChannel->ioBuf;
440         hChannel->ioBuf = nullptr;
441     }
442     if (!hChannel->serverOrClient) {
443         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
444         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
445     }
446     if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
447         --hChannel->uvHandleRef;
448     } else {
449         Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
450     }
451     Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
452 }
453 
FreeChannelOpeate(uv_timer_t * handle)454 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
455 {
456     HChannel hChannel = (HChannel)handle->data;
457     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
458     if (hChannel->ref > 0) {
459         return;
460     }
461     if (hChannel->hChildWorkTCP.loop) {
462         auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
463         thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
464         auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
465             HChannel hChannel = (HChannel)handle->data;
466             HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
467             if (!thisClass->sessionIsDead && !hChannel->childCleared) {
468                 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u",
469                     hChannel->childCleared, hChannel->channelId);
470                 return;
471             }
472             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
473             thisClass->FreeChannelContinue(hChannel);
474         };
475         Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
476     } else {
477         thisClass->FreeChannelContinue(hChannel);
478     }
479     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
480 }
481 
FreeChannel(const uint32_t channelId)482 void HdcChannelBase::FreeChannel(const uint32_t channelId)
483 {
484     if (threadChanneMain != uv_thread_self()) {
485         PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
486         WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
487         return;
488     }
489     HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
490     do {
491         if (!hChannel || hChannel->isDead) {
492             WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
493             break;
494         }
495         WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
496         Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
497         hChannel->isDead = true;
498     } while (false);
499 }
500 
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)501 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
502 {
503     HChannel hRet = nullptr;
504     switch (op) {
505         case OP_ADD:
506             uv_rwlock_wrlock(&lockMapChannel);
507             mapChannel[channelId] = hInput;
508             uv_rwlock_wrunlock(&lockMapChannel);
509             break;
510         case OP_REMOVE:
511             uv_rwlock_wrlock(&lockMapChannel);
512             mapChannel.erase(channelId);
513             uv_rwlock_wrunlock(&lockMapChannel);
514             break;
515         case OP_QUERY:
516             uv_rwlock_rdlock(&lockMapChannel);
517             if (mapChannel.count(channelId)) {
518                 hRet = mapChannel[channelId];
519             }
520             uv_rwlock_rdunlock(&lockMapChannel);
521             break;
522         case OP_QUERY_REF:
523             uv_rwlock_wrlock(&lockMapChannel);
524             if (mapChannel.count(channelId)) {
525                 hRet = mapChannel[channelId];
526                 ++hRet->ref;
527             }
528             uv_rwlock_wrunlock(&lockMapChannel);
529             break;
530         case OP_UPDATE:
531             uv_rwlock_wrlock(&lockMapChannel);
532             // remove old
533             mapChannel.erase(channelId);
534             mapChannel[hInput->channelId] = hInput;
535             uv_rwlock_wrunlock(&lockMapChannel);
536             break;
537         default:
538             break;
539     }
540     return hRet;
541 }
542 
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)543 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
544 {
545     StartTraceScope("HdcChannelBase::EchoToClient");
546     uv_stream_t *sendStream = nullptr;
547     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
548     auto data = new uint8_t[sizeNewBuf]();
549     if (!data) {
550         return;
551     }
552     *reinterpret_cast<uint32_t *>(data) = htonl(size);
553     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
554         delete[] data;
555         return;
556     }
557     sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
558     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
559         ++hChannel->ref;
560         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
561     } else {
562         WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
563         delete[] data;
564     }
565 }
566 
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)567 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
568 {
569     for (auto v : mapChannel) {
570         HChannel hChannel = (HChannel)v.second;
571         if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
572             WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
573             EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
574         }
575     }
576 }
577 }
578