1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19 SetChannelTCPString(addrString);
20 isServerOrClient = serverOrClient;
21 loopMain = loopMainIn;
22 threadChanneMain = uv_thread_self();
23 uv_rwlock_init(&mainAsync);
24 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25 uv_rwlock_init(&lockMapChannel);
26 queuedPackages.store(0);
27 }
28
~HdcChannelBase()29 HdcChannelBase::~HdcChannelBase()
30 {
31 ClearChannels();
32 // clear
33 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
34 uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
35 }
36
37 uv_rwlock_destroy(&mainAsync);
38 uv_rwlock_destroy(&lockMapChannel);
39 }
40
GetChannelHandshake(string & connectKey) const41 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
42 {
43 vector<uint8_t> ret;
44 struct ChannelHandShake handshake = {};
45 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
46 return ret;
47 }
48 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
49 return ret;
50 }
51 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
52 return ret;
53 }
54
SetChannelTCPString(const string & addrString)55 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
56 {
57 bool ret = false;
58 while (true) {
59 if (addrString.find(":") == string::npos) {
60 break;
61 }
62 std::size_t found = addrString.find_last_of(":");
63 if (found == string::npos) {
64 break;
65 }
66
67 string host = addrString.substr(0, found);
68 string port = addrString.substr(found + 1);
69
70 channelPort = std::atoi(port.c_str());
71 sockaddr_in addrv4;
72 sockaddr_in6 addrv6;
73 if (!channelPort) {
74 break;
75 }
76
77 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
78 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
79 break;
80 }
81 channelHost = host;
82 channelHostPort = addrString;
83 ret = true;
84 break;
85 }
86 if (!ret) {
87 channelPort = 0;
88 channelHost = STRING_EMPTY;
89 channelHostPort = STRING_EMPTY;
90 }
91 return ret;
92 }
93
ClearChannels()94 void HdcChannelBase::ClearChannels()
95 {
96 for (auto v : mapChannel) {
97 HChannel hChannel = (HChannel)v.second;
98 if (!hChannel->isDead) {
99 FreeChannel(hChannel->channelId);
100 }
101 }
102 }
103
WorkerPendding()104 void HdcChannelBase::WorkerPendding()
105 {
106 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
107 uv_run(loopMain, UV_RUN_DEFAULT);
108 uv_loop_close(loopMain);
109 }
110
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)111 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
112 {
113 StartTraceScope("HdcChannelBase::ReadStream");
114 int size = 0;
115 int indexBuf = 0;
116 int childRet = 0;
117 bool needExit = false;
118 HChannel hChannel = (HChannel)tcp->data;
119 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
120 uint32_t channelId = hChannel->channelId;
121
122 if (nread == UV_ENOBUFS) {
123 WRITE_LOG(LOG_FATAL, "ReadStream nobufs channelId:%u", channelId);
124 return;
125 } else if (nread == 0) {
126 // maybe just after accept, second client req
127 WRITE_LOG(LOG_DEBUG, "ReadStream idle read channelId:%u", channelId);
128 return;
129 } else if (nread < 0) {
130 Base::TryCloseHandle((uv_handle_t *)tcp);
131 constexpr int bufSize = 1024;
132 char buffer[bufSize] = { 0 };
133 uv_err_name_r(nread, buffer, bufSize);
134 WRITE_LOG(LOG_DEBUG, "ReadStream channelId:%u failed:%s", channelId, buffer);
135 needExit = true;
136 goto Finish;
137 } else {
138 hChannel->availTailIndex += nread;
139 }
140 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
141 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian
142 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
143 WRITE_LOG(LOG_FATAL, "ReadStream size:%d channelId:%u", size, channelId);
144 needExit = true;
145 break;
146 }
147 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
148 break;
149 }
150 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
151 DWORD_SERIALIZE_SIZE + indexBuf, size);
152 if (childRet < 0) {
153 WRITE_LOG(LOG_WARN, "ReadStream childRet:%d channelId:%u keepAlive:%d",
154 childRet, channelId, hChannel->keepAlive);
155 if (!hChannel->keepAlive) {
156 needExit = true;
157 break;
158 }
159 }
160 // update io
161 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
162 indexBuf += DWORD_SERIALIZE_SIZE + size;
163 }
164 if (indexBuf > 0 && hChannel->availTailIndex > 0) {
165 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
166 needExit = true;
167 goto Finish;
168 }
169 }
170
171 Finish:
172 if (needExit) {
173 thisClass->FreeChannel(hChannel->channelId);
174 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish channelId:%u", channelId);
175 }
176 }
177
FileCmdWriteCallback(uv_write_t * req,int status)178 void HdcChannelBase::FileCmdWriteCallback(uv_write_t *req, int status)
179 {
180 #ifdef HDC_HOST
181 HChannel hChannel = (HChannel)req->handle->data;
182 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
183 thisClass->queuedPackages.fetch_sub(1, std::memory_order_relaxed);
184 #endif
185 WriteCallback(req, status);
186 }
187
WriteCallback(uv_write_t * req,int status)188 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
189 {
190 HChannel hChannel = (HChannel)req->handle->data;
191 --hChannel->ref;
192 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
193 if (status < 0) {
194 hChannel->writeFailedTimes++;
195 Base::TryCloseHandle((uv_handle_t *)req->handle);
196 if (!hChannel->isDead && !hChannel->ref) {
197 thisClass->FreeChannel(hChannel->channelId);
198 }
199 }
200 delete[]((uint8_t *)req->data);
201 delete req;
202 }
203
AsyncMainLoopTask(uv_idle_t * handle)204 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
205 {
206 AsyncParam *param = (AsyncParam *)handle->data;
207 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
208
209 switch (param->method) {
210 case ASYNC_FREE_CHANNEL: {
211 // alloc/release should pair in main thread.
212 thisClass->FreeChannel(param->sid);
213 break;
214 }
215 default:
216 break;
217 }
218 if (param->data) {
219 delete[]((uint8_t *)param->data);
220 }
221 delete param;
222 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
223 }
224
225 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
226 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
227 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)228 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
229 {
230 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
231 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
232 WRITE_LOG(LOG_WARN, "MainAsyncCallback uv_is_closing loopMain");
233 return;
234 }
235 list<void *>::iterator i;
236 list<void *> &lst = thisClass->lstMainThreadOP;
237 uv_rwlock_wrlock(&thisClass->mainAsync);
238 for (i = lst.begin(); i != lst.end();) {
239 AsyncParam *param = (AsyncParam *)*i;
240 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
241 i = lst.erase(i);
242 }
243 uv_rwlock_wrunlock(&thisClass->mainAsync);
244 }
245
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)246 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
247 const int dataSize)
248 {
249 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
250 WRITE_LOG(LOG_WARN, "PushAsyncMessage uv_is_closing asyncMainLoop");
251 return;
252 }
253 auto param = new AsyncParam();
254 if (!param) {
255 return;
256 }
257 param->sid = channelId; // Borrow SID storage
258 param->thisClass = this;
259 param->method = method;
260 if (dataSize > 0) {
261 param->dataSize = dataSize;
262 param->data = new uint8_t[param->dataSize]();
263 if (!param->data) {
264 delete param;
265 return;
266 }
267 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
268 delete[]((uint8_t *)param->data);
269 delete param;
270 return;
271 }
272 }
273 asyncMainLoop.data = this;
274 uv_rwlock_wrlock(&mainAsync);
275 lstMainThreadOP.push_back(param);
276 uv_rwlock_wrunlock(&mainAsync);
277 uv_async_send(&asyncMainLoop);
278 }
279
280 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)281 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
282 {
283 StartTraceScope("HdcChannelBase::SendChannelWithCmd");
284 if (size < 0) {
285 WRITE_LOG(LOG_WARN, "SendChannelWithCmd size %d", size);
286 return;
287 }
288 auto data = new uint8_t[size + sizeof(commandFlag)]();
289 if (!data) {
290 WRITE_LOG(LOG_WARN, "malloc failed");
291 return;
292 }
293
294 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
295 delete[] data;
296 return;
297 }
298
299 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
300 delete[] data;
301 return;
302 }
303
304 SendChannel(hChannel, data, size + sizeof(commandFlag), commandFlag);
305 delete[] data;
306 }
307
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)308 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
309 {
310 StartTraceScope("HdcChannelBase::SendWithCmd");
311 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
312 if (!hChannel) {
313 WRITE_LOG(LOG_FATAL, "SendWithCmd hChannel nullptr channelId:%u", channelId);
314 return;
315 }
316 do {
317 if (hChannel->isDead) {
318 WRITE_LOG(LOG_FATAL, "SendWithCmd isDead channelId:%u", channelId);
319 break;
320 }
321 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
322 } while (false);
323 --hChannel->ref;
324 }
325
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size,const uint16_t commandFlag)326 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size, const uint16_t commandFlag)
327 {
328 StartTraceScope("HdcChannelBase::SendChannel");
329 uv_stream_t *sendStream = nullptr;
330 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
331 auto data = new uint8_t[sizeNewBuf]();
332 if (!data) {
333 return;
334 }
335 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian
336 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
337 delete[] data;
338 return;
339 }
340 if (hChannel->hWorkThread == uv_thread_self()) {
341 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
342 } else {
343 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
344 }
345 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
346 ++hChannel->ref;
347 if (commandFlag == CMD_FILE_DATA || commandFlag == CMD_APP_DATA) {
348 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)FileCmdWriteCallback, data);
349 } else {
350 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
351 }
352 } else {
353 delete[] data;
354 }
355 }
356
357 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)358 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
359 {
360 StartTraceScope("HdcChannelBase::Send");
361 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
362 if (!hChannel) {
363 WRITE_LOG(LOG_FATAL, "Send hChannel nullptr channelId:%u", channelId);
364 return;
365 }
366 do {
367 if (hChannel->isDead) {
368 WRITE_LOG(LOG_FATAL, "Send isDead channelId:%u", channelId);
369 break;
370 }
371 SendChannel(hChannel, bufPtr, size);
372 } while (false);
373 --hChannel->ref;
374 }
375
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)376 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
377 {
378 HChannel context = (HChannel)handle->data;
379 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * BUF_EXTEND_SIZE);
380 buf->base = (char *)context->ioBuf + context->availTailIndex;
381 int size = context->bufSize - context->availTailIndex;
382 buf->len = std::min(size, static_cast<int>(sizeWanted));
383 }
384
GetChannelPseudoUid()385 uint32_t HdcChannelBase::GetChannelPseudoUid()
386 {
387 uint32_t uid = 0;
388 do {
389 uid = Base::GetSecureRandom();
390 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
391 return uid;
392 }
393
MallocChannel(HChannel * hOutChannel)394 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
395 {
396 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
397 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
398 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
399 #endif
400 auto hChannel = new HdcChannel();
401 if (!hChannel) {
402 WRITE_LOG(LOG_FATAL, "malloc channel failed");
403 return 0;
404 }
405 hChannel->stdinTty.data = nullptr;
406 hChannel->stdoutTty.data = nullptr;
407 uint32_t channelId = GetChannelPseudoUid();
408 if (isServerOrClient) {
409 hChannel->serverOrClient = isServerOrClient;
410 ++channelId; // Use different value for serverForClient&client in per process
411 }
412 uv_tcp_init(loopMain, &hChannel->hWorkTCP);
413 ++hChannel->uvHandleRef;
414 hChannel->hWorkThread = uv_thread_self();
415 hChannel->hWorkTCP.data = hChannel;
416 hChannel->clsChannel = this;
417 hChannel->channelId = channelId;
418 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
419 AdminChannel(OP_ADD, channelId, hChannel);
420 *hOutChannel = hChannel;
421 if (isServerOrClient) {
422 WRITE_LOG(LOG_INFO, "Mallocchannel:%u", channelId);
423 } else {
424 WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
425 }
426 return channelId;
427 }
428
429 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)430 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
431 {
432 HChannel hChannel = (HChannel)handle->data;
433 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
434 if (hChannel->uvHandleRef > 0) {
435 if (hChannel->serverOrClient) {
436 WRITE_LOG(LOG_INFO, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
437 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
438 } else {
439 WRITE_LOG(LOG_DEBUG, "FreeChannelFinally uvHandleRef:%d channelId:%u sid:%u",
440 hChannel->uvHandleRef, hChannel->channelId, hChannel->targetSessionId);
441 }
442 return;
443 }
444 thisClass->NotifyInstanceChannelFree(hChannel);
445 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
446
447 if (!hChannel->serverOrClient) {
448 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u sid:%u finish",
449 hChannel->channelId, hChannel->targetSessionId);
450 uv_stop(thisClass->loopMain);
451 } else {
452 WRITE_LOG(LOG_INFO, "!!!FreeChannelFinally channelId:%u sid:%u finish",
453 hChannel->channelId, hChannel->targetSessionId);
454 }
455 #ifdef HDC_HOST
456 Base::TryCloseHandle((const uv_handle_t *)&hChannel->hChildWorkTCP);
457 #endif
458 delete hChannel;
459 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
460 }
461
FreeChannelContinue(HChannel hChannel)462 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
463 {
464 auto closeChannelHandle = [](uv_handle_t *handle) -> void {
465 if (handle->data == nullptr) {
466 WRITE_LOG(LOG_DEBUG, "FreeChannelContinue handle->data is nullptr");
467 return;
468 }
469 HChannel channel = reinterpret_cast<HChannel>(handle->data);
470 --channel->uvHandleRef;
471 Base::TryCloseHandle((uv_handle_t *)handle);
472 };
473 hChannel->availTailIndex = 0;
474 if (hChannel->ioBuf) {
475 delete[] hChannel->ioBuf;
476 hChannel->ioBuf = nullptr;
477 }
478 if (!hChannel->serverOrClient) {
479 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
480 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
481 }
482 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
483 --hChannel->uvHandleRef;
484 } else {
485 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
486 }
487 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
488 }
489
FreeChannelOpeate(uv_timer_t * handle)490 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
491 {
492 HChannel hChannel = (HChannel)handle->data;
493 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
494 if (hChannel->ref > 0) {
495 return;
496 }
497 thisClass->DispMntnInfo(hChannel);
498 if (hChannel->hChildWorkTCP.loop) {
499 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
500 bool ret = thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
501 if (!ret) {
502 WRITE_LOG(LOG_WARN, "FreeChannelOpeate deatch failed channelId:%u sid:%u",
503 hChannel->channelId, hChannel->targetSessionId);
504 hChannel->childCleared = true;
505 }
506 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
507 HChannel hChannel = (HChannel)handle->data;
508 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
509 if (!hChannel->childCleared) {
510 WRITE_LOG(LOG_WARN, "FreeChannelOpeate childCleared:%d channelId:%u sid:%u",
511 hChannel->childCleared, hChannel->channelId, hChannel->targetSessionId);
512 return;
513 }
514 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
515 thisClass->FreeChannelContinue(hChannel);
516 };
517 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
518 } else {
519 thisClass->FreeChannelContinue(hChannel);
520 }
521 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
522 }
523
FreeChannel(const uint32_t channelId)524 void HdcChannelBase::FreeChannel(const uint32_t channelId)
525 {
526 if (threadChanneMain != uv_thread_self()) {
527 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
528 WRITE_LOG(LOG_INFO, "FreeChannel not uv_thread_self channelid:%u", channelId);
529 return;
530 }
531 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
532 do {
533 if (!hChannel || hChannel->isDead) {
534 WRITE_LOG(LOG_WARN, "FreeChannel hChannel nullptr or isDead channelid:%u", channelId);
535 break;
536 }
537 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
538 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately
539 hChannel->isDead = true;
540 } while (false);
541 }
542
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)543 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
544 {
545 HChannel hRet = nullptr;
546 switch (op) {
547 case OP_ADD:
548 uv_rwlock_wrlock(&lockMapChannel);
549 mapChannel[channelId] = hInput;
550 uv_rwlock_wrunlock(&lockMapChannel);
551 break;
552 case OP_REMOVE:
553 uv_rwlock_wrlock(&lockMapChannel);
554 mapChannel.erase(channelId);
555 uv_rwlock_wrunlock(&lockMapChannel);
556 break;
557 case OP_QUERY:
558 uv_rwlock_rdlock(&lockMapChannel);
559 if (mapChannel.count(channelId)) {
560 hRet = mapChannel[channelId];
561 }
562 uv_rwlock_rdunlock(&lockMapChannel);
563 break;
564 case OP_QUERY_REF:
565 uv_rwlock_wrlock(&lockMapChannel);
566 if (mapChannel.count(channelId)) {
567 hRet = mapChannel[channelId];
568 ++hRet->ref;
569 }
570 uv_rwlock_wrunlock(&lockMapChannel);
571 break;
572 case OP_UPDATE:
573 uv_rwlock_wrlock(&lockMapChannel);
574 // remove old
575 mapChannel.erase(channelId);
576 mapChannel[hInput->channelId] = hInput;
577 uv_rwlock_wrunlock(&lockMapChannel);
578 break;
579 default:
580 break;
581 }
582 return hRet;
583 }
584
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)585 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
586 {
587 StartTraceScope("HdcChannelBase::EchoToClient");
588 uv_stream_t *sendStream = nullptr;
589 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
590 auto data = new uint8_t[sizeNewBuf]();
591 if (!data) {
592 return;
593 }
594 *reinterpret_cast<uint32_t *>(data) = htonl(size);
595 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
596 delete[] data;
597 return;
598 }
599 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
600 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
601 ++hChannel->ref;
602 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
603 } else {
604 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
605 delete[] data;
606 }
607 }
608
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)609 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
610 {
611 for (auto v : mapChannel) {
612 HChannel hChannel = (HChannel)v.second;
613 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
614 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
615 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
616 }
617 }
618 }
619
DispMntnInfo(HChannel hChannel)620 void HdcChannelBase::DispMntnInfo(HChannel hChannel)
621 {
622 if (!hChannel) {
623 WRITE_LOG(LOG_WARN, "prt is null");
624 return;
625 }
626 WRITE_LOG(LOG_DEBUG, "channel info: id:%u isDead:%d ref:%u, writeFailedTimes:%u",
627 hChannel->channelId, hChannel->isDead, uint32_t(hChannel->ref), uint32_t(hChannel->writeFailedTimes));
628 }
629 }
630