• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19     SetChannelTCPString(addrString);
20     isServerOrClient = serverOrClient;
21     loopMain = loopMainIn;
22     threadChanneMain = uv_thread_self();
23     uv_rwlock_init(&mainAsync);
24     uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25     uv_rwlock_init(&lockMapChannel);
26 }
27 
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30     ClearChannels();
31     // clear
32     if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33         uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34     }
35 
36     uv_rwlock_destroy(&mainAsync);
37     uv_rwlock_destroy(&lockMapChannel);
38 }
39 
GetChannelHandshake(string & connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42     vector<uint8_t> ret;
43     struct ChannelHandShake handshake = {};
44     if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45         return ret;
46     }
47     if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48         return ret;
49     }
50     ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51     return ret;
52 }
53 
SetChannelTCPString(const string & addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56     bool ret = false;
57     while (true) {
58         if (addrString.find(":") == string::npos) {
59             break;
60         }
61         std::size_t found = addrString.find_last_of(":");
62         if (found == string::npos) {
63             break;
64         }
65 
66         string host = addrString.substr(0, found);
67         string port = addrString.substr(found + 1);
68 
69         channelPort = std::atoi(port.c_str());
70         sockaddr_in addrv4;
71         sockaddr_in6 addrv6;
72         if (!channelPort) {
73             break;
74         }
75 
76         if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77             uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78             break;
79         }
80         channelHost = host;
81         channelHostPort = addrString;
82         ret = true;
83         break;
84     }
85     if (!ret) {
86         channelPort = 0;
87         channelHost = STRING_EMPTY;
88         channelHostPort = STRING_EMPTY;
89     }
90     return ret;
91 }
92 
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95     for (auto v : mapChannel) {
96         HChannel hChannel = (HChannel)v.second;
97         if (!hChannel->isDead) {
98             FreeChannel(hChannel->channelId);
99         }
100     }
101 }
102 
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105     WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106     uv_run(loopMain, UV_RUN_DEFAULT);
107     uv_loop_close(loopMain);
108 }
109 
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112     StartTraceScope("HdcChannelBase::ReadStream");
113     int size = 0;
114     int indexBuf = 0;
115     int childRet = 0;
116     bool needExit = false;
117     HChannel hChannel = (HChannel)tcp->data;
118     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
119 
120     if (nread == UV_ENOBUFS) {
121         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream Pipe IOBuf max");
122         return;
123     } else if (nread == 0) {
124         // maybe just after accept, second client req
125         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream idle read");
126         return;
127     } else if (nread < 0) {
128         Base::TryCloseHandle((uv_handle_t *)tcp);
129         constexpr int bufSize = 1024;
130         char buffer[bufSize] = { 0 };
131         uv_err_name_r(nread, buffer, bufSize);
132         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream failed2:%s", buffer);
133         needExit = true;
134         goto Finish;
135     } else {
136         hChannel->availTailIndex += nread;
137     }
138     while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
139         size = ntohl(*reinterpret_cast<uint32_t *>(hChannel->ioBuf + indexBuf));  // big endian
140         if (size <= 0 || static_cast<uint32_t>(size) > HDC_BUF_MAX_BYTES) {
141             needExit = true;
142             break;
143         }
144         if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
145             break;
146         }
147         childRet = thisClass->ReadChannel(hChannel, reinterpret_cast<uint8_t *>(hChannel->ioBuf) +
148                                           DWORD_SERIALIZE_SIZE + indexBuf, size);
149         if (childRet < 0) {
150             if (!hChannel->keepAlive) {
151                 needExit = true;
152                 break;
153             }
154         }
155         // update io
156         hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
157         indexBuf += DWORD_SERIALIZE_SIZE + size;
158     }
159     if (indexBuf > 0 && hChannel->availTailIndex > 0) {
160         if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
161             needExit = true;
162             goto Finish;
163         }
164     }
165 
166 Finish:
167     if (needExit) {
168         thisClass->FreeChannel(hChannel->channelId);
169         WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish");
170     }
171 }
172 
WriteCallback(uv_write_t * req,int status)173 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
174 {
175     HChannel hChannel = (HChannel)req->handle->data;
176     --hChannel->ref;
177     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
178     if (status < 0) {
179         Base::TryCloseHandle((uv_handle_t *)req->handle);
180         if (!hChannel->isDead && !hChannel->ref) {
181             thisClass->FreeChannel(hChannel->channelId);
182             WRITE_LOG(LOG_DEBUG, "WriteCallback TryCloseHandle");
183         }
184     }
185     delete[]((uint8_t *)req->data);
186     delete req;
187 }
188 
AsyncMainLoopTask(uv_idle_t * handle)189 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
190 {
191     AsyncParam *param = (AsyncParam *)handle->data;
192     HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
193 
194     switch (param->method) {
195         case ASYNC_FREE_CHANNEL: {
196             // alloc/release should pair in main thread.
197             thisClass->FreeChannel(param->sid);
198             break;
199         }
200         default:
201             break;
202     }
203     if (param->data) {
204         delete[]((uint8_t *)param->data);
205     }
206     delete param;
207     uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
208 }
209 
210 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
211 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
212 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)213 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
214 {
215     HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
216     if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
217         return;
218     }
219     list<void *>::iterator i;
220     list<void *> &lst = thisClass->lstMainThreadOP;
221     uv_rwlock_wrlock(&thisClass->mainAsync);
222     for (i = lst.begin(); i != lst.end();) {
223         AsyncParam *param = (AsyncParam *)*i;
224         Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
225         i = lst.erase(i);
226     }
227     uv_rwlock_wrunlock(&thisClass->mainAsync);
228 }
229 
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)230 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
231                                       const int dataSize)
232 {
233     if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
234         return;
235     }
236     auto param = new AsyncParam();
237     if (!param) {
238         return;
239     }
240     param->sid = channelId;  // Borrow SID storage
241     param->thisClass = this;
242     param->method = method;
243     if (dataSize > 0) {
244         param->dataSize = dataSize;
245         param->data = new uint8_t[param->dataSize]();
246         if (!param->data) {
247             delete param;
248             return;
249         }
250         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
251             delete[]((uint8_t *)param->data);
252             delete param;
253             return;
254         }
255     }
256     asyncMainLoop.data = this;
257     uv_rwlock_wrlock(&mainAsync);
258     lstMainThreadOP.push_back(param);
259     uv_rwlock_wrunlock(&mainAsync);
260     uv_async_send(&asyncMainLoop);
261 }
262 
263 // add commandflag ahead real buf data
SendChannelWithCmd(HChannel hChannel,const uint16_t commandFlag,uint8_t * bufPtr,const int size)264 void HdcChannelBase::SendChannelWithCmd(HChannel hChannel, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
265 {
266     StartTraceScope("HdcChannelBase::SendChannelWithCmd");
267     auto data = new uint8_t[size + sizeof(commandFlag)]();
268     if (!data) {
269         return;
270     }
271 
272     if (memcpy_s(data, size + sizeof(commandFlag), &commandFlag, sizeof(commandFlag))) {
273         delete[] data;
274         return;
275     }
276 
277     if (size > 0 && memcpy_s(data + sizeof(commandFlag), size, bufPtr, size)) {
278         delete[] data;
279         return;
280     }
281 
282     SendChannel(hChannel, data, size + sizeof(commandFlag));
283     delete[] data;
284 }
285 
SendWithCmd(const uint32_t channelId,const uint16_t commandFlag,uint8_t * bufPtr,const int size)286 void HdcChannelBase::SendWithCmd(const uint32_t channelId, const uint16_t commandFlag, uint8_t *bufPtr, const int size)
287 {
288     StartTraceScope("HdcChannelBase::SendWithCmd");
289     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
290     if (!hChannel) {
291         return;
292     }
293     do {
294         if (hChannel->isDead) {
295             break;
296         }
297         SendChannelWithCmd(hChannel, commandFlag, bufPtr, size);
298     } while (false);
299     --hChannel->ref;
300 }
301 
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size)302 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
303 {
304     StartTraceScope("HdcChannelBase::SendChannel");
305     uv_stream_t *sendStream = nullptr;
306     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
307     auto data = new uint8_t[sizeNewBuf]();
308     if (!data) {
309         return;
310     }
311     *reinterpret_cast<uint32_t *>(data) = htonl(size);  // big endian
312     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
313         delete[] data;
314         return;
315     }
316     if (hChannel->hWorkThread == uv_thread_self()) {
317         sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
318     } else {
319         sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
320     }
321     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
322         ++hChannel->ref;
323         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
324     } else {
325         delete[] data;
326     }
327 }
328 
329 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)330 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
331 {
332     StartTraceScope("HdcChannelBase::Send");
333     HChannel hChannel = reinterpret_cast<HChannel>(AdminChannel(OP_QUERY_REF, channelId, nullptr));
334     if (!hChannel) {
335         return;
336     }
337     do {
338         if (hChannel->isDead) {
339             break;
340         }
341         SendChannel(hChannel, bufPtr, size);
342     } while (false);
343     --hChannel->ref;
344 }
345 
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)346 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
347 {
348     HChannel context = (HChannel)handle->data;
349     Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * 4);
350     buf->base = (char *)context->ioBuf + context->availTailIndex;
351     buf->len = context->bufSize - context->availTailIndex;
352 }
353 
GetChannelPseudoUid()354 uint32_t HdcChannelBase::GetChannelPseudoUid()
355 {
356     uint32_t uid = 0;
357     do {
358         uid = static_cast<uint32_t>(Base::GetRandom());
359     } while (AdminChannel(OP_QUERY, uid, nullptr) != nullptr);
360     return uid;
361 }
362 
MallocChannel(HChannel * hOutChannel)363 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
364 {
365     auto hChannel = new HdcChannel();
366     if (!hChannel) {
367         return 0;
368     }
369     hChannel->stdinTty.data = nullptr;
370     hChannel->stdoutTty.data = nullptr;
371     uint32_t channelId = GetChannelPseudoUid();
372     if (isServerOrClient) {
373         hChannel->serverOrClient = isServerOrClient;
374         ++channelId;  // Use different value for serverForClient&client in per process
375     }
376     uv_tcp_init(loopMain, &hChannel->hWorkTCP);
377     ++hChannel->uvHandleRef;
378     hChannel->hWorkThread = uv_thread_self();
379     hChannel->hWorkTCP.data = hChannel;
380     hChannel->clsChannel = this;
381     hChannel->channelId = channelId;
382     (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
383     AdminChannel(OP_ADD, channelId, hChannel);
384     *hOutChannel = hChannel;
385     WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
386     return channelId;
387 }
388 
389 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)390 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
391 {
392     HChannel hChannel = (HChannel)handle->data;
393     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
394     if (hChannel->uvHandleRef > 0) {
395         return;
396     }
397     thisClass->NotifyInstanceChannelFree(hChannel);
398     thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
399     WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u finish", hChannel->channelId);
400     if (!hChannel->serverOrClient) {
401         uv_stop(thisClass->loopMain);
402     }
403     delete hChannel;
404     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
405 }
406 
FreeChannelContinue(HChannel hChannel)407 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
408 {
409     auto closeChannelHandle = [](uv_handle_t *handle) -> void {
410         if (handle->data == nullptr) {
411             WRITE_LOG(LOG_WARN, "FreeChannelContinue handle->data is nullptr");
412             return;
413         }
414         HChannel channel = reinterpret_cast<HChannel>(handle->data);
415         --channel->uvHandleRef;
416         Base::TryCloseHandle((uv_handle_t *)handle);
417     };
418     hChannel->availTailIndex = 0;
419     if (hChannel->ioBuf) {
420         delete[] hChannel->ioBuf;
421         hChannel->ioBuf = nullptr;
422     }
423     if (!hChannel->serverOrClient) {
424         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
425         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
426     }
427     if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
428         --hChannel->uvHandleRef;
429     } else {
430         Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
431     }
432     Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
433 }
434 
FreeChannelOpeate(uv_timer_t * handle)435 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
436 {
437     HChannel hChannel = (HChannel)handle->data;
438     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
439     if (hChannel->ref > 0) {
440         return;
441     }
442     if (hChannel->hChildWorkTCP.loop) {
443         auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
444         thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
445         auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
446             HChannel hChannel = (HChannel)handle->data;
447             HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
448             if (!hChannel->childCleared) {
449                 return;
450             }
451             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
452             thisClass->FreeChannelContinue(hChannel);
453         };
454         Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
455     } else {
456         thisClass->FreeChannelContinue(hChannel);
457     }
458     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
459 }
460 
FreeChannel(const uint32_t channelId)461 void HdcChannelBase::FreeChannel(const uint32_t channelId)
462 {
463     if (threadChanneMain != uv_thread_self()) {
464         PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
465         return;
466     }
467     HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
468     do {
469         if (!hChannel || hChannel->isDead) {
470             break;
471         }
472         WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
473         Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
474         hChannel->isDead = true;
475     } while (false);
476 }
477 
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)478 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
479 {
480     HChannel hRet = nullptr;
481     switch (op) {
482         case OP_ADD:
483             uv_rwlock_wrlock(&lockMapChannel);
484             mapChannel[channelId] = hInput;
485             uv_rwlock_wrunlock(&lockMapChannel);
486             break;
487         case OP_REMOVE:
488             uv_rwlock_wrlock(&lockMapChannel);
489             mapChannel.erase(channelId);
490             uv_rwlock_wrunlock(&lockMapChannel);
491             break;
492         case OP_QUERY:
493             uv_rwlock_rdlock(&lockMapChannel);
494             if (mapChannel.count(channelId)) {
495                 hRet = mapChannel[channelId];
496             }
497             uv_rwlock_rdunlock(&lockMapChannel);
498             break;
499         case OP_QUERY_REF:
500             uv_rwlock_wrlock(&lockMapChannel);
501             if (mapChannel.count(channelId)) {
502                 hRet = mapChannel[channelId];
503                 ++hRet->ref;
504             }
505             uv_rwlock_wrunlock(&lockMapChannel);
506             break;
507         case OP_UPDATE:
508             uv_rwlock_wrlock(&lockMapChannel);
509             // remove old
510             mapChannel.erase(channelId);
511             mapChannel[hInput->channelId] = hInput;
512             uv_rwlock_wrunlock(&lockMapChannel);
513             break;
514         default:
515             break;
516     }
517     return hRet;
518 }
519 
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)520 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
521 {
522     StartTraceScope("HdcChannelBase::EchoToClient");
523     uv_stream_t *sendStream = nullptr;
524     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
525     auto data = new uint8_t[sizeNewBuf]();
526     if (!data) {
527         return;
528     }
529     *reinterpret_cast<uint32_t *>(data) = htonl(size);
530     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
531         delete[] data;
532         return;
533     }
534     sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
535     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
536         ++hChannel->ref;
537         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
538     } else {
539         WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
540         delete[] data;
541     }
542 }
543 
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)544 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
545 {
546     for (auto v : mapChannel) {
547         HChannel hChannel = (HChannel)v.second;
548         if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
549             WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
550             EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
551         }
552     }
553 }
554 }
555