1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19 SetChannelTCPString(addrString);
20 isServerOrClient = serverOrClient;
21 loopMain = loopMainIn;
22 threadChanneMain = uv_thread_self();
23 uv_rwlock_init(&mainAsync);
24 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25 uv_rwlock_init(&lockMapChannel);
26 }
27
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30 ClearChannels();
31 // clear
32 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33 uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34 }
35
36 uv_rwlock_destroy(&mainAsync);
37 uv_rwlock_destroy(&lockMapChannel);
38 }
39
GetChannelHandshake(string & connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42 vector<uint8_t> ret;
43 struct ChannelHandShake handshake = {};
44 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45 return ret;
46 }
47 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48 return ret;
49 }
50 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51 return ret;
52 }
53
SetChannelTCPString(const string & addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56 bool ret = false;
57 while (true) {
58 if (addrString.find(":") == string::npos) {
59 break;
60 }
61 std::size_t found = addrString.find_last_of(":");
62 if (found == string::npos) {
63 break;
64 }
65
66 string host = addrString.substr(0, found);
67 string port = addrString.substr(found + 1);
68
69 channelPort = std::atoi(port.c_str());
70 sockaddr_in addrv4;
71 sockaddr_in6 addrv6;
72 if (!channelPort) {
73 break;
74 }
75
76 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78 break;
79 }
80 channelHost = host;
81 channelHostPort = addrString;
82 ret = true;
83 break;
84 }
85 if (!ret) {
86 channelPort = 0;
87 channelHost = STRING_EMPTY;
88 channelHostPort = STRING_EMPTY;
89 }
90 return ret;
91 }
92
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95 for (auto v : mapChannel) {
96 HChannel hChannel = (HChannel)v.second;
97 if (!hChannel->isDead) {
98 FreeChannel(hChannel->channelId);
99 }
100 }
101 }
102
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106 uv_run(loopMain, UV_RUN_DEFAULT);
107 uv_loop_close(loopMain);
108 }
109
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112 StartTraceScope("HdcChannelBase::ReadStream");
113 int size = 0;
114 int indexBuf = 0;
115 int childRet = 0;
116 bool needExit = false;
117 HChannel hChannel = (HChannel)tcp->data;
118 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
119
120 if (nread == UV_ENOBUFS) {
121 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream Pipe IOBuf max");
122 return;
123 } else if (nread == 0) {
124 // maybe just after accept, second client req
125 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream idle read");
126 return;
127 } else if (nread < 0) {
128 Base::TryCloseHandle((uv_handle_t *)tcp);
129 constexpr int bufSize = 1024;
130 char buffer[bufSize] = { 0 };
131 uv_err_name_r(nread, buffer, bufSize);
132 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream failed2:%s", buffer);
133 needExit = true;
134 goto Finish;
135 } else {
136 hChannel->availTailIndex += nread;
137 }
138 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
139 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian
140 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
141 needExit = true;
142 break;
143 }
144 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
145 break;
146 }
147 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
148 DWORD_SERIALIZE_SIZE + indexBuf, size);
149 if (childRet < 0) {
150 if (!hChannel->keepAlive) {
151 needExit = true;
152 break;
153 }
154 }
155 // update io
156 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
157 indexBuf += DWORD_SERIALIZE_SIZE + size;
158 }
159 if (indexBuf > 0 && hChannel->availTailIndex > 0) {
160 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
161 needExit = true;
162 goto Finish;
163 }
164 }
165
166 Finish:
167 if (needExit) {
168 thisClass->FreeChannel(hChannel->channelId);
169 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish");
170 }
171 }
172
WriteCallback(uv_write_t * req,int status)173 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
174 {
175 HChannel hChannel = (HChannel)req->handle->data;
176 --hChannel->ref;
177 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
178 if (status < 0) {
179 Base::TryCloseHandle((uv_handle_t *)req->handle);
180 if (!hChannel->isDead && !hChannel->ref) {
181 thisClass->FreeChannel(hChannel->channelId);
182 WRITE_LOG(LOG_DEBUG, "WriteCallback TryCloseHandle");
183 }
184 }
185 delete[]((uint8_t *)req->data);
186 delete req;
187 }
188
AsyncMainLoopTask(uv_idle_t * handle)189 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
190 {
191 AsyncParam *param = (AsyncParam *)handle->data;
192 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
193
194 switch (param->method) {
195 case ASYNC_FREE_CHANNEL: {
196 // alloc/release should pair in main thread.
197 thisClass->FreeChannel(param->sid);
198 break;
199 }
200 default:
201 break;
202 }
203 if (param->data) {
204 delete[]((uint8_t *)param->data);
205 }
206 delete param;
207 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
208 }
209
210 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
211 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
212 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)213 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
214 {
215 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
216 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
217 return;
218 }
219 list<void *>::iterator i;
220 list<void *> &lst = thisClass->lstMainThreadOP;
221 uv_rwlock_wrlock(&thisClass->mainAsync);
222 for (i = lst.begin(); i != lst.end();) {
223 AsyncParam *param = (AsyncParam *)*i;
224 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
225 i = lst.erase(i);
226 }
227 uv_rwlock_wrunlock(&thisClass->mainAsync);
228 }
229
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)230 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
231 const int dataSize)
232 {
233 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
234 return;
235 }
236 auto param = new AsyncParam();
237 if (!param) {
238 return;
239 }
240 param->sid = channelId; // Borrow SID storage
241 param->thisClass = this;
242 param->method = method;
243 if (dataSize > 0) {
244 param->dataSize = dataSize;
245 param->data = new uint8_t[param->dataSize]();
246 if (!param->data) {
247 delete param;
248 return;
249 }
250 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
251 delete[]((uint8_t *)param->data);
252 delete param;
253 return;
254 }
255 }
256 asyncMainLoop.data = this;
257 uv_rwlock_wrlock(&mainAsync);
258 lstMainThreadOP.push_back(param);
259 uv_rwlock_wrunlock(&mainAsync);
260 uv_async_send(&asyncMainLoop);
261 }
262
263 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)264 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
265 {
266 StartTraceScope("HdcChannelBase::SendChannelWithCmd");
267 auto data = new uint8_t[size + sizeof(commandFlag)]();
268 if (!data) {
269 return;
270 }
271
272 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
273 delete[] data;
274 return;
275 }
276
277 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
278 delete[] data;
279 return;
280 }
281
282 SendChannel(hChannel, data, size + sizeof(commandFlag));
283 delete[] data;
284 }
285
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)286 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
287 {
288 StartTraceScope("HdcChannelBase::SendWithCmd");
289 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
290 if (!hChannel) {
291 return;
292 }
293 do {
294 if (hChannel->isDead) {
295 break;
296 }
297 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
298 } while (false);
299 --hChannel->ref;
300 }
301
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size)302 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
303 {
304 StartTraceScope("HdcChannelBase::SendChannel");
305 uv_stream_t *sendStream = nullptr;
306 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
307 auto data = new uint8_t[sizeNewBuf]();
308 if (!data) {
309 return;
310 }
311 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian
312 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
313 delete[] data;
314 return;
315 }
316 if (hChannel->hWorkThread == uv_thread_self()) {
317 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
318 } else {
319 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
320 }
321 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
322 ++hChannel->ref;
323 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
324 } else {
325 delete[] data;
326 }
327 }
328
329 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)330 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
331 {
332 StartTraceScope("HdcChannelBase::Send");
333 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
334 if (!hChannel) {
335 return;
336 }
337 do {
338 if (hChannel->isDead) {
339 break;
340 }
341 SendChannel(hChannel, bufPtr, size);
342 } while (false);
343 --hChannel->ref;
344 }
345
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)346 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
347 {
348 HChannel context = (HChannel)handle->data;
349 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * 4);
350 buf->base = (char *)context->ioBuf + context->availTailIndex;
351 buf->len = context->bufSize - context->availTailIndex;
352 }
353
GetChannelPseudoUid()354 uint32_t HdcChannelBase::GetChannelPseudoUid()
355 {
356 uint32_t uid = 0;
357 do {
358 uid = static_cast<uint32_t>(Base::GetRandom());
359 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
360 return uid;
361 }
362
MallocChannel(HChannel * hOutChannel)363 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
364 {
365 auto hChannel = new HdcChannel();
366 if (!hChannel) {
367 return 0;
368 }
369 hChannel->stdinTty.data = nullptr;
370 hChannel->stdoutTty.data = nullptr;
371 uint32_t channelId = GetChannelPseudoUid();
372 if (isServerOrClient) {
373 hChannel->serverOrClient = isServerOrClient;
374 ++channelId; // Use different value for serverForClient&client in per process
375 }
376 uv_tcp_init(loopMain, &hChannel->hWorkTCP);
377 ++hChannel->uvHandleRef;
378 hChannel->hWorkThread = uv_thread_self();
379 hChannel->hWorkTCP.data = hChannel;
380 hChannel->clsChannel = this;
381 hChannel->channelId = channelId;
382 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
383 AdminChannel(OP_ADD, channelId, hChannel);
384 *hOutChannel = hChannel;
385 WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
386 return channelId;
387 }
388
389 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)390 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
391 {
392 HChannel hChannel = (HChannel)handle->data;
393 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
394 if (hChannel->uvHandleRef > 0) {
395 return;
396 }
397 thisClass->NotifyInstanceChannelFree(hChannel);
398 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
399 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u finish", hChannel->channelId);
400 if (!hChannel->serverOrClient) {
401 uv_stop(thisClass->loopMain);
402 }
403 delete hChannel;
404 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
405 }
406
FreeChannelContinue(HChannel hChannel)407 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
408 {
409 auto closeChannelHandle = [](uv_handle_t *handle) -> void {
410 if (handle->data == nullptr) {
411 WRITE_LOG(LOG_WARN, "FreeChannelContinue handle->data is nullptr");
412 return;
413 }
414 HChannel channel = reinterpret_cast<HChannel>(handle->data);
415 --channel->uvHandleRef;
416 Base::TryCloseHandle((uv_handle_t *)handle);
417 };
418 hChannel->availTailIndex = 0;
419 if (hChannel->ioBuf) {
420 delete[] hChannel->ioBuf;
421 hChannel->ioBuf = nullptr;
422 }
423 if (!hChannel->serverOrClient) {
424 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
425 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
426 }
427 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
428 --hChannel->uvHandleRef;
429 } else {
430 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
431 }
432 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
433 }
434
FreeChannelOpeate(uv_timer_t * handle)435 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
436 {
437 HChannel hChannel = (HChannel)handle->data;
438 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
439 if (hChannel->ref > 0) {
440 return;
441 }
442 if (hChannel->hChildWorkTCP.loop) {
443 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
444 thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
445 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
446 HChannel hChannel = (HChannel)handle->data;
447 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
448 if (!hChannel->childCleared) {
449 return;
450 }
451 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
452 thisClass->FreeChannelContinue(hChannel);
453 };
454 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
455 } else {
456 thisClass->FreeChannelContinue(hChannel);
457 }
458 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
459 }
460
FreeChannel(const uint32_t channelId)461 void HdcChannelBase::FreeChannel(const uint32_t channelId)
462 {
463 if (threadChanneMain != uv_thread_self()) {
464 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
465 return;
466 }
467 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
468 do {
469 if (!hChannel || hChannel->isDead) {
470 break;
471 }
472 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
473 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately
474 hChannel->isDead = true;
475 } while (false);
476 }
477
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)478 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
479 {
480 HChannel hRet = nullptr;
481 switch (op) {
482 case OP_ADD:
483 uv_rwlock_wrlock(&lockMapChannel);
484 mapChannel[channelId] = hInput;
485 uv_rwlock_wrunlock(&lockMapChannel);
486 break;
487 case OP_REMOVE:
488 uv_rwlock_wrlock(&lockMapChannel);
489 mapChannel.erase(channelId);
490 uv_rwlock_wrunlock(&lockMapChannel);
491 break;
492 case OP_QUERY:
493 uv_rwlock_rdlock(&lockMapChannel);
494 if (mapChannel.count(channelId)) {
495 hRet = mapChannel[channelId];
496 }
497 uv_rwlock_rdunlock(&lockMapChannel);
498 break;
499 case OP_QUERY_REF:
500 uv_rwlock_wrlock(&lockMapChannel);
501 if (mapChannel.count(channelId)) {
502 hRet = mapChannel[channelId];
503 ++hRet->ref;
504 }
505 uv_rwlock_wrunlock(&lockMapChannel);
506 break;
507 case OP_UPDATE:
508 uv_rwlock_wrlock(&lockMapChannel);
509 // remove old
510 mapChannel.erase(channelId);
511 mapChannel[hInput->channelId] = hInput;
512 uv_rwlock_wrunlock(&lockMapChannel);
513 break;
514 default:
515 break;
516 }
517 return hRet;
518 }
519
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)520 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
521 {
522 StartTraceScope("HdcChannelBase::EchoToClient");
523 uv_stream_t *sendStream = nullptr;
524 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
525 auto data = new uint8_t[sizeNewBuf]();
526 if (!data) {
527 return;
528 }
529 *reinterpret_cast<uint32_t *>(data) = htonl(size);
530 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
531 delete[] data;
532 return;
533 }
534 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
535 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
536 ++hChannel->ref;
537 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
538 } else {
539 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
540 delete[] data;
541 }
542 }
543
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)544 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
545 {
546 for (auto v : mapChannel) {
547 HChannel hChannel = (HChannel)v.second;
548 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
549 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
550 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
551 }
552 }
553 }
554 }
555