• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19     SetChannelTCPString(addrString);
20     isServerOrClient = serverOrClient;
21     loopMain = loopMainIn;
22     threadChanneMain = uv_thread_self();
23     uv_rwlock_init(&mainAsync);
24     uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25     uv_rwlock_init(&lockMapChannel);
26     queuedPackages.store(0);
27 }
28 
~HdcChannelBase()29 HdcChannelBase::~HdcChannelBase()
30 {
31     ClearChannels();
32     // clear
33     if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
34         uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
35     }
36 
37     uv_rwlock_destroy(&mainAsync);
38     uv_rwlock_destroy(&lockMapChannel);
39 }
40 
GetChannelHandshake(string & connectKey) const41 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
42 {
43     vector<uint8_t> ret;
44     struct ChannelHandShake handshake = {};
45     if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
46         return ret;
47     }
48     if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
49         return ret;
50     }
51     ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
52     return ret;
53 }
54 
SetChannelTCPString(const string & addrString)55 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
56 {
57     bool ret = false;
58     while (true) {
59         if (addrString.find(":") == string::npos) {
60             break;
61         }
62         std::size_t found = addrString.find_last_of(":");
63         if (found == string::npos) {
64             break;
65         }
66 
67         string host = addrString.substr(0, found);
68         string port = addrString.substr(found + 1);
69 
70         channelPort = std::atoi(port.c_str());
71         sockaddr_in addrv4;
72         sockaddr_in6 addrv6;
73         if (!channelPort) {
74             break;
75         }
76 
77         if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
78             uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
79             break;
80         }
81         channelHost = host;
82         channelHostPort = addrString;
83         ret = true;
84         break;
85     }
86     if (!ret) {
87         channelPort = 0;
88         channelHost = STRING_EMPTY;
89         channelHostPort = STRING_EMPTY;
90     }
91     return ret;
92 }
93 
ClearChannels()94 void HdcChannelBase::ClearChannels()
95 {
96     for (auto v : mapChannel) {
97         HChannel hChannel = (HChannel)v.second;
98         if (!hChannel->isDead) {
99             FreeChannel(hChannel->channelId);
100         }
101     }
102 }
103 
WorkerPendding()104 void HdcChannelBase::WorkerPendding()
105 {
106     WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
107     uv_run(loopMain, UV_RUN_DEFAULT);
108     uv_loop_close(loopMain);
109 }
110 
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)111 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
112 {
113     StartTraceScope("HdcChannelBase::ReadStream");
114     int size = 0;
115     int indexBuf = 0;
116     int childRet = 0;
117     bool needExit = false;
118     HChannel hChannel = (HChannel)tcp->data;
119     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
120     uint32_t channelId = hChannel->channelId;
121 
122     if (nread == UV_ENOBUFS) {
123         WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
124         return;
125     } else if (nread == 0) {
126         // maybe just after accept, second client req
127         WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
128         return;
129     } else if (nread < 0) {
130         Base::TryCloseHandle((uv_handle_t *)tcp);
131         constexpr int bufSize = 1024;
132         char buffer[bufSize] = { 0 };
133         uv_err_name_r(nread, buffer, bufSize);
134         WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
135         needExit = true;
136         goto Finish;
137     } else {
138         hChannel->availTailIndex += nread;
139     }
140     while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
141         size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf));  // big endian
142         if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
143             WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
144             needExit = true;
145             break;
146         }
147         if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
148             break;
149         }
150         childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
151                                           DWORD_SERIALIZE_SIZE + indexBuf, size);
152         if (childRet < 0) {
153             WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
154                 childRet, channelId, hChannel->keepAlive);
155             if (!hChannel->keepAlive) {
156                 needExit = true;
157                 break;
158             }
159         }
160         // update io
161         hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
162         indexBuf += DWORD_SERIALIZE_SIZE + size;
163     }
164     if (indexBuf > 0 && hChannel->availTailIndex > 0) {
165         if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
166             needExit = true;
167             goto Finish;
168         }
169     }
170 
171 Finish:
172     if (needExit) {
173         thisClass->FreeChannel(hChannel->channelId);
174         WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
175     }
176 }
177 
FileCmdWriteCallback(uv_write_t * req,int status)178 void HdcChannelBase::FileCmdWriteCallback(uv_write_t *req, int status)
179 {
180 #ifdef HDC_HOST
181         HChannel hChannel = (HChannel)req->handle->data;
182         HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
183         thisClass->queuedPackages.fetch_sub(1, std::memory_order_relaxed);
184 #endif
185     WriteCallback(req, status);
186 }
187 
WriteCallback(uv_write_t * req,int status)188 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
189 {
190     HChannel hChannel = (HChannel)req->handle->data;
191     --hChannel->ref;
192     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
193     if (status < 0) {
194         hChannel->writeFailedTimes++;
195         Base::TryCloseHandle((uv_handle_t *)req->handle);
196         if (!hChannel->isDead && !hChannel->ref) {
197             thisClass->FreeChannel(hChannel->channelId);
198         }
199     }
200     delete[]((uint8_t *)req->data);
201     delete req;
202 }
203 
AsyncMainLoopTask(uv_idle_t * handle)204 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
205 {
206     AsyncParam *param = (AsyncParam *)handle->data;
207     HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
208 
209     switch (param->method) {
210         case ASYNC_FREE_CHANNEL: {
211             // alloc/release should pair in main thread.
212             thisClass->FreeChannel(param->sid);
213             break;
214         }
215         default:
216             break;
217     }
218     if (param->data) {
219         delete[]((uint8_t *)param->data);
220     }
221     delete param;
222     uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
223 }
224 
225 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
226 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
227 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)228 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
229 {
230     HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
231     if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
232         WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
233         return;
234     }
235     list<void *>::iterator i;
236     list<void *> &lst = thisClass->lstMainThreadOP;
237     uv_rwlock_wrlock(&thisClass->mainAsync);
238     for (i = lst.begin(); i != lst.end();) {
239         AsyncParam *param = (AsyncParam *)*i;
240         Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
241         i = lst.erase(i);
242     }
243     uv_rwlock_wrunlock(&thisClass->mainAsync);
244 }
245 
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)246 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
247                                       const int dataSize)
248 {
249     if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
250         WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
251         return;
252     }
253     auto param = new AsyncParam();
254     if (!param) {
255         return;
256     }
257     param->sid = channelId;  // Borrow SID storage
258     param->thisClass = this;
259     param->method = method;
260     if (dataSize > 0) {
261         param->dataSize = dataSize;
262         param->data = new uint8_t[param->dataSize]();
263         if (!param->data) {
264             delete param;
265             return;
266         }
267         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
268             delete[]((uint8_t *)param->data);
269             delete param;
270             return;
271         }
272     }
273     asyncMainLoop.data = this;
274     uv_rwlock_wrlock(&mainAsync);
275     lstMainThreadOP.push_back(param);
276     uv_rwlock_wrunlock(&mainAsync);
277     uv_async_send(&asyncMainLoop);
278 }
279 
280 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)281 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
282 {
283     StartTraceScope("HdcChannelBase::SendChannelWithCmd");
284     if (size < 0) {
285         WRITE_LOG(LOG_WARN, "SendChannelWithCmd size %d", size);
286         return;
287     }
288     auto data = new uint8_t[size + sizeof(commandFlag)]();
289     if (!data) {
290         WRITE_LOG(LOG_WARN, "malloc failed");
291         return;
292     }
293 
294     if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
295         delete[] data;
296         return;
297     }
298 
299     if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
300         delete[] data;
301         return;
302     }
303 
304     SendChannel(hChannel, data, size + sizeof(commandFlag), commandFlag);
305     delete[] data;
306 }
307 
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)308 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
309 {
310     StartTraceScope("HdcChannelBase::SendWithCmd");
311     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
312     if (!hChannel) {
313         WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
314         return;
315     }
316     do {
317         if (hChannel->isDead) {
318             WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
319             break;
320         }
321         SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
322     } while (false);
323     --hChannel->ref;
324 }
325 
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size,const uint16_t commandFlag)326 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size, const uint16_t commandFlag)
327 {
328     StartTraceScope("HdcChannelBase::SendChannel");
329     uv_stream_t *sendStream = nullptr;
330     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
331     auto data = new uint8_t[sizeNewBuf]();
332     if (!data) {
333         return;
334     }
335     *reinterpret_cast<uint32_t *>(data) = htonl(size);  // big endian
336     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
337         delete[] data;
338         return;
339     }
340     if (hChannel->hWorkThread == uv_thread_self()) {
341         sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
342     } else {
343         sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
344     }
345     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
346         ++hChannel->ref;
347         if (commandFlag == CMD_FILE_DATA || commandFlag == CMD_APP_DATA) {
348             Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)FileCmdWriteCallback, data);
349         } else {
350             Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
351         }
352     } else {
353         delete[] data;
354     }
355 }
356 
357 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)358 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
359 {
360     StartTraceScope("HdcChannelBase::Send");
361     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
362     if (!hChannel) {
363         WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
364         return;
365     }
366     do {
367         if (hChannel->isDead) {
368             WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
369             break;
370         }
371         SendChannel(hChannel, bufPtr, size);
372     } while (false);
373     --hChannel->ref;
374 }
375 
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)376 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
377 {
378     HChannel context = (HChannel)handle->data;
379     Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
380     buf->base = (char *)context->ioBuf + context->availTailIndex;
381     int size = context->bufSize - context->availTailIndex;
382     buf->len = std::min(size, static_cast<int>(sizeWanted));
383 }
384 
GetChannelPseudoUid()385 uint32_t HdcChannelBase::GetChannelPseudoUid()
386 {
387     uint32_t uid = 0;
388     do {
389         uid = Base::GetSecureRandom();
390     } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
391     return uid;
392 }
393 
MallocChannel(HChannel * hOutChannel)394 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
395 {
396 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
397     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
398     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
399 #endif
400     auto hChannel = new HdcChannel();
401     if (!hChannel) {
402         WRITE_LOG(LOG_FATAL, "malloc channel failed");
403         return 0;
404     }
405     hChannel->stdinTty.data = nullptr;
406     hChannel->stdoutTty.data = nullptr;
407     uint32_t channelId = GetChannelPseudoUid();
408     if (isServerOrClient) {
409         hChannel->serverOrClient = isServerOrClient;
410         ++channelId;  // Use different value for serverForClient&client in per process
411     }
412     uv_tcp_init(loopMain, &hChannel->hWorkTCP);
413     ++hChannel->uvHandleRef;
414     hChannel->hWorkThread = uv_thread_self();
415     hChannel->hWorkTCP.data = hChannel;
416     hChannel->clsChannel = this;
417     hChannel->channelId = channelId;
418     (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
419     AdminChannel(OP_ADD, channelId, hChannel);
420     *hOutChannel = hChannel;
421     if (isServerOrClient) {
422         WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId);
423     } else {
424         WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
425     }
426     return channelId;
427 }
428 
429 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)430 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
431 {
432     HChannel hChannel = (HChannel)handle->data;
433     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
434     if (hChannel->uvHandleRef > 0) {
435         if (hChannel->serverOrClient) {
436             WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
437                 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
438         } else {
439             WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
440                 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
441         }
442         return;
443     }
444     thisClass->NotifyInstanceChannelFree(hChannel);
445     thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
446 
447     if (!hChannel->serverOrClient) {
448         WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
449             hChannel->channelId, hChannel->targetSessionId);
450         uv_stop(thisClass->loopMain);
451     } else {
452         WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
453             hChannel->channelId, hChannel->targetSessionId);
454     }
455 #ifdef HDC_HOST
456     Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP);
457 #endif
458     delete hChannel;
459     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
460 }
461 
FreeChannelContinue(HChannel hChannel)462 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
463 {
464     auto closeChannelHandle = [](uv_handle_t *handle) -> void {
465         if (handle->data == nullptr) {
466             WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
467             return;
468         }
469         HChannel channel = reinterpret_cast<HChannel>(handle->data);
470         --channel->uvHandleRef;
471         Base::TryCloseHandle((uv_handle_t *)handle);
472     };
473     hChannel->availTailIndex = 0;
474     if (hChannel->ioBuf) {
475         delete[] hChannel->ioBuf;
476         hChannel->ioBuf = nullptr;
477     }
478     if (!hChannel->serverOrClient) {
479         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
480         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
481     }
482     if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
483         --hChannel->uvHandleRef;
484     } else {
485         Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
486     }
487     Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
488 }
489 
FreeChannelOpeate(uv_timer_t * handle)490 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
491 {
492     HChannel hChannel = (HChannel)handle->data;
493     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
494     if (hChannel->ref > 0) {
495         return;
496     }
497     thisClass->DispMntnInfo(hChannel);
498     if (hChannel->hChildWorkTCP.loop) {
499         auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
500         bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
501         if (!ret) {
502             WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
503                 hChannel->channelId, hChannel->targetSessionId);
504             hChannel->childCleared = true;
505         }
506         auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
507             HChannel hChannel = (HChannel)handle->data;
508             HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
509             if (!hChannel->childCleared) {
510                 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
511                     hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
512                 return;
513             }
514             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
515             thisClass->FreeChannelContinue(hChannel);
516         };
517         Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
518     } else {
519         thisClass->FreeChannelContinue(hChannel);
520     }
521     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
522 }
523 
FreeChannel(const uint32_t channelId)524 void HdcChannelBase::FreeChannel(const uint32_t channelId)
525 {
526     if (threadChanneMain != uv_thread_self()) {
527         PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
528         WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
529         return;
530     }
531     HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
532     do {
533         if (!hChannel || hChannel->isDead) {
534             WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
535             break;
536         }
537         WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
538         Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
539         hChannel->isDead = true;
540     } while (false);
541 }
542 
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)543 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
544 {
545     HChannel hRet = nullptr;
546     switch (op) {
547         case OP_ADD:
548             uv_rwlock_wrlock(&lockMapChannel);
549             mapChannel[channelId] = hInput;
550             uv_rwlock_wrunlock(&lockMapChannel);
551             break;
552         case OP_REMOVE:
553             uv_rwlock_wrlock(&lockMapChannel);
554             mapChannel.erase(channelId);
555             uv_rwlock_wrunlock(&lockMapChannel);
556             break;
557         case OP_QUERY:
558             uv_rwlock_rdlock(&lockMapChannel);
559             if (mapChannel.count(channelId)) {
560                 hRet = mapChannel[channelId];
561             }
562             uv_rwlock_rdunlock(&lockMapChannel);
563             break;
564         case OP_QUERY_REF:
565             uv_rwlock_wrlock(&lockMapChannel);
566             if (mapChannel.count(channelId)) {
567                 hRet = mapChannel[channelId];
568                 ++hRet->ref;
569             }
570             uv_rwlock_wrunlock(&lockMapChannel);
571             break;
572         case OP_UPDATE:
573             uv_rwlock_wrlock(&lockMapChannel);
574             // remove old
575             mapChannel.erase(channelId);
576             mapChannel[hInput->channelId] = hInput;
577             uv_rwlock_wrunlock(&lockMapChannel);
578             break;
579         default:
580             break;
581     }
582     return hRet;
583 }
584 
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)585 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
586 {
587     StartTraceScope("HdcChannelBase::EchoToClient");
588     uv_stream_t *sendStream = nullptr;
589     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
590     auto data = new uint8_t[sizeNewBuf]();
591     if (!data) {
592         return;
593     }
594     *reinterpret_cast<uint32_t *>(data) = htonl(size);
595     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
596         delete[] data;
597         return;
598     }
599     sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
600     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
601         ++hChannel->ref;
602         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
603     } else {
604         WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
605         delete[] data;
606     }
607 }
608 
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)609 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
610 {
611     for (auto v : mapChannel) {
612         HChannel hChannel = (HChannel)v.second;
613         if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
614             WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
615             EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
616         }
617     }
618 }
619 
DispMntnInfo(HChannel hChannel)620 void HdcChannelBase::DispMntnInfo(HChannel hChannel)
621 {
622     if (!hChannel) {
623         WRITE_LOG(LOG_WARN, "prt is null");
624         return;
625     }
626     WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
627         hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
628 }
629 }
630