1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19 SetChannelTCPString(addrString);
20 isServerOrClient = serverOrClient;
21 loopMain = loopMainIn;
22 threadChanneMain = uv_thread_self();
23 uv_rwlock_init(&mainAsync);
24 uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25 uv_rwlock_init(&lockMapChannel);
26 }
27
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30 ClearChannels();
31 // clear
32 if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33 uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34 }
35
36 uv_rwlock_destroy(&mainAsync);
37 uv_rwlock_destroy(&lockMapChannel);
38 }
39
GetChannelHandshake(string & connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42 vector<uint8_t> ret;
43 struct ChannelHandShake handshake = {};
44 if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45 return ret;
46 }
47 if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48 return ret;
49 }
50 ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51 return ret;
52 }
53
SetChannelTCPString(const string & addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56 bool ret = false;
57 while (true) {
58 if (addrString.find(":") == string::npos) {
59 break;
60 }
61 std::size_t found = addrString.find_last_of(":");
62 if (found == string::npos) {
63 break;
64 }
65
66 string host = addrString.substr(0, found);
67 string port = addrString.substr(found + 1);
68
69 channelPort = std::atoi(port.c_str());
70 sockaddr_in addrv4;
71 sockaddr_in6 addrv6;
72 if (!channelPort) {
73 break;
74 }
75
76 if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77 uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78 break;
79 }
80 channelHost = host;
81 channelHostPort = addrString;
82 ret = true;
83 break;
84 }
85 if (!ret) {
86 channelPort = 0;
87 channelHost = STRING_EMPTY;
88 channelHostPort = STRING_EMPTY;
89 }
90 return ret;
91 }
92
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95 for (auto v : mapChannel) {
96 HChannel hChannel = (HChannel)v.second;
97 if (!hChannel->isDead) {
98 FreeChannel(hChannel->channelId);
99 }
100 }
101 }
102
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105 WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106 uv_run(loopMain, UV_RUN_DEFAULT);
107 uv_loop_close(loopMain);
108 }
109
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112 int size = 0;
113 int indexBuf = 0;
114 int childRet = 0;
115 bool needExit = false;
116 HChannel hChannel = (HChannel)tcp->data;
117 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
118
119 if (nread == UV_ENOBUFS) {
120 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream Pipe IOBuf max");
121 return;
122 } else if (nread == 0) {
123 // maybe just after accept, second client req
124 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream idle read");
125 return;
126 } else if (nread < 0) {
127 Base::TryCloseHandle((uv_handle_t *)tcp);
128 constexpr int bufSize = 1024;
129 char buffer[bufSize] = { 0 };
130 uv_err_name_r(nread, buffer, bufSize);
131 WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream failed2:%s", buffer);
132 needExit = true;
133 goto Finish;
134 } else {
135 hChannel->availTailIndex += nread;
136 }
137 while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
138 size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf)); // big endian
139 if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
140 needExit = true;
141 break;
142 }
143 if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
144 break;
145 }
146 childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
147 DWORD_SERIALIZE_SIZE + indexBuf, size);
148 if (childRet < 0) {
149 if (!hChannel->keepAlive) {
150 needExit = true;
151 break;
152 }
153 }
154 // update io
155 hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
156 indexBuf += DWORD_SERIALIZE_SIZE + size;
157 }
158 if (indexBuf > 0 && hChannel->availTailIndex > 0) {
159 if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
160 needExit = true;
161 goto Finish;
162 }
163 }
164
165 Finish:
166 if (needExit) {
167 thisClass->FreeChannel(hChannel->channelId);
168 WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish");
169 }
170 }
171
WriteCallback(uv_write_t * req,int status)172 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
173 {
174 HChannel hChannel = (HChannel)req->handle->data;
175 --hChannel->ref;
176 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
177 if (status < 0) {
178 Base::TryCloseHandle((uv_handle_t *)req->handle);
179 if (!hChannel->isDead && !hChannel->ref) {
180 thisClass->FreeChannel(hChannel->channelId);
181 WRITE_LOG(LOG_DEBUG, "WriteCallback TryCloseHandle");
182 }
183 }
184 delete[]((uint8_t *)req->data);
185 delete req;
186 }
187
AsyncMainLoopTask(uv_idle_t * handle)188 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
189 {
190 AsyncParam *param = (AsyncParam *)handle->data;
191 HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
192
193 switch (param->method) {
194 case ASYNC_FREE_CHANNEL: {
195 // alloc/release should pair in main thread.
196 thisClass->FreeChannel(param->sid);
197 break;
198 }
199 default:
200 break;
201 }
202 if (param->data) {
203 delete[]((uint8_t *)param->data);
204 }
205 delete param;
206 uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
207 }
208
209 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
210 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
211 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)212 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
213 {
214 HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
215 if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
216 return;
217 }
218 list<void *>::iterator i;
219 list<void *> &lst = thisClass->lstMainThreadOP;
220 uv_rwlock_wrlock(&thisClass->mainAsync);
221 for (i = lst.begin(); i != lst.end();) {
222 AsyncParam *param = (AsyncParam *)*i;
223 Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
224 i = lst.erase(i);
225 }
226 uv_rwlock_wrunlock(&thisClass->mainAsync);
227 }
228
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)229 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
230 const int dataSize)
231 {
232 if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
233 return;
234 }
235 auto param = new AsyncParam();
236 if (!param) {
237 return;
238 }
239 param->sid = channelId; // Borrow SID storage
240 param->thisClass = this;
241 param->method = method;
242 if (dataSize > 0) {
243 param->dataSize = dataSize;
244 param->data = new uint8_t[param->dataSize]();
245 if (!param->data) {
246 delete param;
247 return;
248 }
249 if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
250 delete[]((uint8_t *)param->data);
251 delete param;
252 return;
253 }
254 }
255 asyncMainLoop.data = this;
256 uv_rwlock_wrlock(&mainAsync);
257 lstMainThreadOP.push_back(param);
258 uv_rwlock_wrunlock(&mainAsync);
259 uv_async_send(&asyncMainLoop);
260 }
261
262 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)263 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
264 {
265 auto data = new uint8_t[size + sizeof(commandFlag)]();
266 if (!data) {
267 return;
268 }
269
270 if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
271 delete[] data;
272 return;
273 }
274
275 if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
276 delete[] data;
277 return;
278 }
279
280 SendChannel(hChannel, data, size + sizeof(commandFlag));
281 delete[] data;
282 }
283
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)284 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
285 {
286 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
287 if (!hChannel) {
288 return;
289 }
290 do {
291 if (hChannel->isDead) {
292 break;
293 }
294 SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
295 } while (false);
296 --hChannel->ref;
297 }
298
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size)299 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
300 {
301 uv_stream_t *sendStream = nullptr;
302 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
303 auto data = new uint8_t[sizeNewBuf]();
304 if (!data) {
305 return;
306 }
307 *reinterpret_cast<uint32_t *>(data) = htonl(size); // big endian
308 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
309 delete[] data;
310 return;
311 }
312 if (hChannel->hWorkThread == uv_thread_self()) {
313 sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
314 } else {
315 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
316 }
317 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
318 ++hChannel->ref;
319 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
320 } else {
321 delete[] data;
322 }
323 }
324
325 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)326 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
327 {
328 HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
329 if (!hChannel) {
330 return;
331 }
332 do {
333 if (hChannel->isDead) {
334 break;
335 }
336 SendChannel(hChannel, bufPtr, size);
337 } while (false);
338 --hChannel->ref;
339 }
340
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)341 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
342 {
343 HChannel context = (HChannel)handle->data;
344 Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * 4);
345 buf->base = (char *)context->ioBuf + context->availTailIndex;
346 buf->len = context->bufSize - context->availTailIndex;
347 }
348
GetChannelPseudoUid()349 uint32_t HdcChannelBase::GetChannelPseudoUid()
350 {
351 uint32_t uid = 0;
352 do {
353 uid = static_cast<uint32_t>(Base::GetRandom());
354 } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
355 return uid;
356 }
357
MallocChannel(HChannel * hOutChannel)358 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
359 {
360 auto hChannel = new HdcChannel();
361 if (!hChannel) {
362 return 0;
363 }
364 hChannel->stdinTty.data = nullptr;
365 hChannel->stdoutTty.data = nullptr;
366 uint32_t channelId = GetChannelPseudoUid();
367 if (isServerOrClient) {
368 hChannel->serverOrClient = isServerOrClient;
369 ++channelId; // Use different value for serverForClient&client in per process
370 }
371 uv_tcp_init(loopMain, &hChannel->hWorkTCP);
372 ++hChannel->uvHandleRef;
373 hChannel->hWorkThread = uv_thread_self();
374 hChannel->hWorkTCP.data = hChannel;
375 hChannel->clsChannel = this;
376 hChannel->channelId = channelId;
377 (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
378 AdminChannel(OP_ADD, channelId, hChannel);
379 *hOutChannel = hChannel;
380 WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
381 return channelId;
382 }
383
384 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)385 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
386 {
387 HChannel hChannel = (HChannel)handle->data;
388 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
389 if (hChannel->uvHandleRef > 0) {
390 return;
391 }
392 thisClass->NotifyInstanceChannelFree(hChannel);
393 thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
394 WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u finish", hChannel->channelId);
395 if (!hChannel->serverOrClient) {
396 uv_stop(thisClass->loopMain);
397 }
398 delete hChannel;
399 Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
400 }
401
FreeChannelContinue(HChannel hChannel)402 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
403 {
404 auto closeChannelHandle = [](uv_handle_t *handle) -> void {
405 if (handle->data == nullptr) {
406 WRITE_LOG(LOG_WARN, "FreeChannelContinue handle->data is nullptr");
407 return;
408 }
409 HChannel hChannel = reinterpret_cast<HChannel>(handle->data);
410 --hChannel->uvHandleRef;
411 Base::TryCloseHandle((uv_handle_t *)handle);
412 };
413 hChannel->availTailIndex = 0;
414 if (hChannel->ioBuf) {
415 delete[] hChannel->ioBuf;
416 hChannel->ioBuf = nullptr;
417 }
418 if (!hChannel->serverOrClient) {
419 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
420 Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
421 }
422 if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
423 --hChannel->uvHandleRef;
424 } else {
425 Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
426 }
427 Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
428 }
429
FreeChannelOpeate(uv_timer_t * handle)430 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
431 {
432 HChannel hChannel = (HChannel)handle->data;
433 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
434 if (hChannel->ref > 0) {
435 return;
436 }
437 if (hChannel->hChildWorkTCP.loop) {
438 auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
439 thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
440 auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
441 HChannel hChannel = (HChannel)handle->data;
442 HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
443 if (!hChannel->childCleared) {
444 return;
445 }
446 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
447 thisClass->FreeChannelContinue(hChannel);
448 };
449 Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
450 } else {
451 thisClass->FreeChannelContinue(hChannel);
452 }
453 Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
454 }
455
FreeChannel(const uint32_t channelId)456 void HdcChannelBase::FreeChannel(const uint32_t channelId)
457 {
458 if (threadChanneMain != uv_thread_self()) {
459 PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
460 return;
461 }
462 HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
463 do {
464 if (!hChannel || hChannel->isDead) {
465 break;
466 }
467 WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
468 Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT); // do immediately
469 hChannel->isDead = true;
470 } while (false);
471 }
472
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)473 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
474 {
475 HChannel hRet = nullptr;
476 switch (op) {
477 case OP_ADD:
478 uv_rwlock_wrlock(&lockMapChannel);
479 mapChannel[channelId] = hInput;
480 uv_rwlock_wrunlock(&lockMapChannel);
481 break;
482 case OP_REMOVE:
483 uv_rwlock_wrlock(&lockMapChannel);
484 mapChannel.erase(channelId);
485 uv_rwlock_wrunlock(&lockMapChannel);
486 break;
487 case OP_QUERY:
488 uv_rwlock_rdlock(&lockMapChannel);
489 if (mapChannel.count(channelId)) {
490 hRet = mapChannel[channelId];
491 }
492 uv_rwlock_rdunlock(&lockMapChannel);
493 break;
494 case OP_QUERY_REF:
495 uv_rwlock_wrlock(&lockMapChannel);
496 if (mapChannel.count(channelId)) {
497 hRet = mapChannel[channelId];
498 ++hRet->ref;
499 }
500 uv_rwlock_wrunlock(&lockMapChannel);
501 break;
502 case OP_UPDATE:
503 uv_rwlock_wrlock(&lockMapChannel);
504 // remove old
505 mapChannel.erase(channelId);
506 mapChannel[hInput->channelId] = hInput;
507 uv_rwlock_wrunlock(&lockMapChannel);
508 break;
509 default:
510 break;
511 }
512 return hRet;
513 }
514
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)515 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
516 {
517 uv_stream_t *sendStream = nullptr;
518 int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
519 auto data = new uint8_t[sizeNewBuf]();
520 if (!data) {
521 return;
522 }
523 *reinterpret_cast<uint32_t *>(data) = htonl(size);
524 if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
525 delete[] data;
526 return;
527 }
528 sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
529 if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
530 ++hChannel->ref;
531 Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
532 } else {
533 WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
534 delete[] data;
535 }
536 }
537
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)538 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
539 {
540 for (auto v : mapChannel) {
541 HChannel hChannel = (HChannel)v.second;
542 if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
543 WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
544 EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
545 }
546 }
547 }
548 }
549