• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "channel.h"
16 namespace Hdc {
HdcChannelBase(const bool serverOrClient,const string & addrString,uv_loop_t * loopMainIn)17 HdcChannelBase::HdcChannelBase(const bool serverOrClient, const string &addrString, uv_loop_t *loopMainIn)
18 {
19     SetChannelTCPString(addrString);
20     isServerOrClient = serverOrClient;
21     loopMain = loopMainIn;
22     threadChanneMain = uv_thread_self();
23     uv_rwlock_init(&mainAsync);
24     uv_async_init(loopMain, &asyncMainLoop, MainAsyncCallback);
25     uv_rwlock_init(&lockMapChannel);
26 }
27 
~HdcChannelBase()28 HdcChannelBase::~HdcChannelBase()
29 {
30     ClearChannels();
31     // clear
32     if (!uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
33         uv_close((uv_handle_t *)&asyncMainLoop, nullptr);
34     }
35 
36     uv_rwlock_destroy(&mainAsync);
37     uv_rwlock_destroy(&lockMapChannel);
38 }
39 
GetChannelHandshake(string & connectKey) const40 vector<uint8_t> HdcChannelBase::GetChannelHandshake(string &connectKey) const
41 {
42     vector<uint8_t> ret;
43     struct ChannelHandShake handshake = {};
44     if (strcpy_s(handshake.banner, sizeof(handshake.banner), HANDSHAKE_MESSAGE.c_str()) != EOK) {
45         return ret;
46     }
47     if (strcpy_s(handshake.connectKey, sizeof(handshake.connectKey), connectKey.c_str()) != EOK) {
48         return ret;
49     }
50     ret.insert(ret.begin(), (uint8_t *)&handshake, (uint8_t *)&handshake + sizeof(ChannelHandShake));
51     return ret;
52 }
53 
SetChannelTCPString(const string & addrString)54 bool HdcChannelBase::SetChannelTCPString(const string &addrString)
55 {
56     bool ret = false;
57     while (true) {
58         if (addrString.find(":") == string::npos) {
59             break;
60         }
61         std::size_t found = addrString.find_last_of(":");
62         if (found == string::npos) {
63             break;
64         }
65 
66         string host = addrString.substr(0, found);
67         string port = addrString.substr(found + 1);
68 
69         channelPort = std::atoi(port.c_str());
70         sockaddr_in addrv4;
71         sockaddr_in6 addrv6;
72         if (!channelPort) {
73             break;
74         }
75 
76         if (uv_ip6_addr(host.c_str(), channelPort, &addrv6) != 0 &&
77             uv_ip4_addr(host.c_str(), channelPort, &addrv4) != 0) {
78             break;
79         }
80         channelHost = host;
81         channelHostPort = addrString;
82         ret = true;
83         break;
84     }
85     if (!ret) {
86         channelPort = 0;
87         channelHost = STRING_EMPTY;
88         channelHostPort = STRING_EMPTY;
89     }
90     return ret;
91 }
92 
ClearChannels()93 void HdcChannelBase::ClearChannels()
94 {
95     for (auto v : mapChannel) {
96         HChannel hChannel = (HChannel)v.second;
97         if (!hChannel->isDead) {
98             FreeChannel(hChannel->channelId);
99         }
100     }
101 }
102 
WorkerPendding()103 void HdcChannelBase::WorkerPendding()
104 {
105     WRITE_LOG(LOG_DEBUG, "Begin host channel pendding");
106     uv_run(loopMain, UV_RUN_DEFAULT);
107     uv_loop_close(loopMain);
108 }
109 
ReadStream(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)110 void HdcChannelBase::ReadStream(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
111 {
112     int size = 0;
113     int indexBuf = 0;
114     int childRet = 0;
115     bool needExit = false;
116     HChannel hChannel = (HChannel)tcp->data;
117     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
118 
119     if (nread == UV_ENOBUFS) {
120         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream Pipe IOBuf max");
121         return;
122     } else if (nread == 0) {
123         // maybe just afer accept, second client req
124         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream idle read");
125         return;
126     } else if (nread < 0) {
127         Base::TryCloseHandle((uv_handle_t *)tcp);
128         constexpr int bufSize = 1024;
129         char buffer[bufSize] = { 0 };
130         uv_err_name_r(nread, buffer, bufSize);
131         WRITE_LOG(LOG_DEBUG, "HdcChannelBase::ReadStream failed2:%s", buffer);
132         needExit = true;
133         goto Finish;
134     } else {
135         hChannel->availTailIndex += nread;
136     }
137     while (hChannel->availTailIndex > DWORD_SERIALIZE_SIZE) {
138         size = ntohl(*(uint32_t *)(hChannel->ioBuf + indexBuf));  // big endian
139         if (size <= 0 || (uint32_t)size > HDC_BUF_MAX_BYTES) {
140             needExit = true;
141             break;
142         }
143         if (hChannel->availTailIndex - DWORD_SERIALIZE_SIZE < size) {
144             break;
145         }
146         childRet = thisClass->ReadChannel(hChannel, (uint8_t *)hChannel->ioBuf + DWORD_SERIALIZE_SIZE + indexBuf, size);
147         if (childRet < 0) {
148             if (!hChannel->keepAlive) {
149                 needExit = true;
150                 break;
151             }
152         }
153         // update io
154         hChannel->availTailIndex -= (DWORD_SERIALIZE_SIZE + size);
155         indexBuf += DWORD_SERIALIZE_SIZE + size;
156     }
157     if (indexBuf > 0 && hChannel->availTailIndex > 0) {
158         if (memmove_s(hChannel->ioBuf, hChannel->bufSize, hChannel->ioBuf + indexBuf, hChannel->availTailIndex)) {
159             needExit = true;
160             goto Finish;
161         }
162     }
163 
164 Finish:
165     if (needExit) {
166         thisClass->FreeChannel(hChannel->channelId);
167         WRITE_LOG(LOG_DEBUG, "Read Stream needExit, FreeChannel finish");
168     }
169 }
170 
WriteCallback(uv_write_t * req,int status)171 void HdcChannelBase::WriteCallback(uv_write_t *req, int status)
172 {
173     HChannel hChannel = (HChannel)req->handle->data;
174     --hChannel->ref;
175     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
176     if (status < 0) {
177         Base::TryCloseHandle((uv_handle_t *)req->handle);
178         if (!hChannel->isDead && !hChannel->ref) {
179             thisClass->FreeChannel(hChannel->channelId);
180             WRITE_LOG(LOG_DEBUG, "WriteCallback TryCloseHandle");
181         }
182     }
183     delete[]((uint8_t *)req->data);
184     delete req;
185 }
186 
AsyncMainLoopTask(uv_idle_t * handle)187 void HdcChannelBase::AsyncMainLoopTask(uv_idle_t *handle)
188 {
189     AsyncParam *param = (AsyncParam *)handle->data;
190     HdcChannelBase *thisClass = (HdcChannelBase *)param->thisClass;
191 
192     switch (param->method) {
193         case ASYNC_FREE_CHANNEL: {
194             // alloc/release should pair in main thread.
195             thisClass->FreeChannel(param->sid);
196             break;
197         }
198         default:
199             break;
200     }
201     if (param->data) {
202         delete[]((uint8_t *)param->data);
203     }
204     delete param;
205     uv_close((uv_handle_t *)handle, Base::CloseIdleCallback);
206 }
207 
208 // multiple uv_async_send() calls may be merged by libuv,so not each call will yield callback as expected.
209 // eg: if uv_async_send() 5 times before callback calling,it will be called only once.
210 // if uv_async_send() is called again after callback calling, it will be called again.
MainAsyncCallback(uv_async_t * handle)211 void HdcChannelBase::MainAsyncCallback(uv_async_t *handle)
212 {
213     HdcChannelBase *thisClass = (HdcChannelBase *)handle->data;
214     if (uv_is_closing((uv_handle_t *)thisClass->loopMain)) {
215         return;
216     }
217     list<void *>::iterator i;
218     list<void *> &lst = thisClass->lstMainThreadOP;
219     uv_rwlock_wrlock(&thisClass->mainAsync);
220     for (i = lst.begin(); i != lst.end();) {
221         AsyncParam *param = (AsyncParam *)*i;
222         Base::IdleUvTask(thisClass->loopMain, param, AsyncMainLoopTask);
223         i = lst.erase(i);
224     }
225     uv_rwlock_wrunlock(&thisClass->mainAsync);
226 }
227 
PushAsyncMessage(const uint32_t channelId,const uint8_t method,const void * data,const int dataSize)228 void HdcChannelBase::PushAsyncMessage(const uint32_t channelId, const uint8_t method, const void *data,
229                                       const int dataSize)
230 {
231     if (uv_is_closing((uv_handle_t *)&asyncMainLoop)) {
232         return;
233     }
234     auto param = new AsyncParam();
235     if (!param) {
236         return;
237     }
238     param->sid = channelId;  // Borrow SID storage
239     param->thisClass = this;
240     param->method = method;
241     if (dataSize > 0) {
242         param->dataSize = dataSize;
243         param->data = new uint8_t[param->dataSize]();
244         if (!param->data) {
245             delete param;
246             return;
247         }
248         if (memcpy_s((uint8_t *)param->data, param->dataSize, data, dataSize)) {
249             delete[]((uint8_t *)param->data);
250             delete param;
251             return;
252         }
253     }
254     asyncMainLoop.data = this;
255     uv_rwlock_wrlock(&mainAsync);
256     lstMainThreadOP.push_back(param);
257     uv_rwlock_wrunlock(&mainAsync);
258     uv_async_send(&asyncMainLoop);
259 }
260 
SendChannel(HChannel hChannel,uint8_t * bufPtr,const int size)261 void HdcChannelBase::SendChannel(HChannel hChannel, uint8_t *bufPtr, const int size)
262 {
263     uv_stream_t *sendStream = nullptr;
264     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
265     auto data = new uint8_t[sizeNewBuf]();
266     if (!data) {
267         return;
268     }
269     *(uint32_t *)data = htonl(size);  // big endian
270     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
271         delete[] data;
272         return;
273     }
274     if (hChannel->hWorkThread == uv_thread_self()) {
275         sendStream = (uv_stream_t *)&hChannel->hWorkTCP;
276     } else {
277         sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
278     }
279     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
280         ++hChannel->ref;
281         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
282     } else {
283         delete[] data;
284     }
285 }
286 
287 // works only in current working thread
Send(const uint32_t channelId,uint8_t * bufPtr,const int size)288 void HdcChannelBase::Send(const uint32_t channelId, uint8_t *bufPtr, const int size)
289 {
290     HChannel hChannel = (HChannel)AdminChannel(OP_QUERY_REF, channelId, nullptr);
291     if (!hChannel) {
292         return;
293     }
294     do {
295         if (hChannel->isDead) {
296             break;
297         }
298         SendChannel(hChannel, bufPtr, size);
299     } while (false);
300     --hChannel->ref;
301 }
302 
AllocCallback(uv_handle_t * handle,size_t sizeWanted,uv_buf_t * buf)303 void HdcChannelBase::AllocCallback(uv_handle_t *handle, size_t sizeWanted, uv_buf_t *buf)
304 {
305     HChannel context = (HChannel)handle->data;
306     Base::ReallocBuf(&context->ioBuf, &context->bufSize, Base::GetMaxBufSize() * 4);
307     buf->base = (char *)context->ioBuf + context->availTailIndex;
308     buf->len = context->bufSize - context->availTailIndex;
309 }
310 
GetChannelPseudoUid()311 uint32_t HdcChannelBase::GetChannelPseudoUid()
312 {
313     uint32_t uid = 0;
314     HChannel hInput = nullptr;
315     do {
316         uid = static_cast<uint32_t>(Base::GetRandom());
317     } while ((hInput = AdminChannel(OP_QUERY, uid, nullptr)) != nullptr);
318     return uid;
319 }
320 
MallocChannel(HChannel * hOutChannel)321 uint32_t HdcChannelBase::MallocChannel(HChannel *hOutChannel)
322 {
323     auto hChannel = new HdcChannel();
324     if (!hChannel) {
325         return 0;
326     }
327     uint32_t channelId = GetChannelPseudoUid();
328     if (isServerOrClient) {
329         hChannel->serverOrClient = isServerOrClient;
330         ++channelId;  // Use different value for serverForClient&client in per process
331     }
332     uv_tcp_init(loopMain, &hChannel->hWorkTCP);
333     ++hChannel->uvHandleRef;
334     hChannel->hWorkThread = uv_thread_self();
335     hChannel->hWorkTCP.data = hChannel;
336     hChannel->clsChannel = this;
337     hChannel->channelId = channelId;
338     (void)memset_s(&hChannel->hChildWorkTCP, sizeof(hChannel->hChildWorkTCP), 0, sizeof(uv_tcp_t));
339     AdminChannel(OP_ADD, channelId, hChannel);
340     *hOutChannel = hChannel;
341     WRITE_LOG(LOG_DEBUG, "Mallocchannel:%u", channelId);
342     return channelId;
343 }
344 
345 // work when libuv-handle at struct of HdcSession has all callback finished
FreeChannelFinally(uv_idle_t * handle)346 void HdcChannelBase::FreeChannelFinally(uv_idle_t *handle)
347 {
348     HChannel hChannel = (HChannel)handle->data;
349     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
350     if (hChannel->uvHandleRef > 0) {
351         return;
352     }
353     thisClass->NotifyInstanceChannelFree(hChannel);
354     thisClass->AdminChannel(OP_REMOVE, hChannel->channelId, nullptr);
355     WRITE_LOG(LOG_DEBUG, "!!!FreeChannelFinally channelId:%u finish", hChannel->channelId);
356     if (!hChannel->serverOrClient) {
357         uv_stop(thisClass->loopMain);
358     }
359     delete hChannel;
360     Base::TryCloseHandle((const uv_handle_t *)handle, Base::CloseIdleCallback);
361 }
362 
FreeChannelContinue(HChannel hChannel)363 void HdcChannelBase::FreeChannelContinue(HChannel hChannel)
364 {
365     auto closeChannelHandle = [](uv_handle_t *handle) -> void {
366         HChannel hChannel = (HChannel)handle->data;
367         --hChannel->uvHandleRef;
368         Base::TryCloseHandle((uv_handle_t *)handle);
369     };
370     hChannel->availTailIndex = 0;
371     if (hChannel->ioBuf) {
372         delete[] hChannel->ioBuf;
373         hChannel->ioBuf = nullptr;
374     }
375     if (!hChannel->serverOrClient) {
376         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdinTty, closeChannelHandle);
377         Base::TryCloseHandle((uv_handle_t *)&hChannel->stdoutTty, closeChannelHandle);
378     }
379     if (uv_is_closing((const uv_handle_t *)&hChannel->hWorkTCP)) {
380         --hChannel->uvHandleRef;
381     } else {
382         Base::TryCloseHandle((uv_handle_t *)&hChannel->hWorkTCP, closeChannelHandle);
383     }
384     Base::IdleUvTask(loopMain, hChannel, FreeChannelFinally);
385 }
386 
FreeChannelOpeate(uv_timer_t * handle)387 void HdcChannelBase::FreeChannelOpeate(uv_timer_t *handle)
388 {
389     HChannel hChannel = (HChannel)handle->data;
390     HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
391     if (hChannel->ref > 0) {
392         return;
393     }
394     if (hChannel->hChildWorkTCP.loop) {
395         auto ctrl = HdcSessionBase::BuildCtrlString(SP_DEATCH_CHANNEL, hChannel->channelId, nullptr, 0);
396         thisClass->ChannelSendSessionCtrlMsg(ctrl, hChannel->targetSessionId);
397         auto callbackCheckFreeChannelContinue = [](uv_timer_t *handle) -> void {
398             HChannel hChannel = (HChannel)handle->data;
399             HdcChannelBase *thisClass = (HdcChannelBase *)hChannel->clsChannel;
400             if (!hChannel->childCleared) {
401                 return;
402             }
403             Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
404             thisClass->FreeChannelContinue(hChannel);
405         };
406         Base::TimerUvTask(thisClass->loopMain, hChannel, callbackCheckFreeChannelContinue);
407     } else {
408         thisClass->FreeChannelContinue(hChannel);
409     }
410     Base::TryCloseHandle((uv_handle_t *)handle, Base::CloseTimerCallback);
411 }
412 
FreeChannel(const uint32_t channelId)413 void HdcChannelBase::FreeChannel(const uint32_t channelId)
414 {
415     if (threadChanneMain != uv_thread_self()) {
416         PushAsyncMessage(channelId, ASYNC_FREE_CHANNEL, nullptr, 0);
417         return;
418     }
419     HChannel hChannel = AdminChannel(OP_QUERY, channelId, nullptr);
420     do {
421         if (!hChannel || hChannel->isDead) {
422             break;
423         }
424         WRITE_LOG(LOG_DEBUG, "Begin to free channel, channelid:%u", channelId);
425         Base::TimerUvTask(loopMain, hChannel, FreeChannelOpeate, MINOR_TIMEOUT);  // do immediately
426         hChannel->isDead = true;
427     } while (false);
428 }
429 
AdminChannel(const uint8_t op,const uint32_t channelId,HChannel hInput)430 HChannel HdcChannelBase::AdminChannel(const uint8_t op, const uint32_t channelId, HChannel hInput)
431 {
432     HChannel hRet = nullptr;
433     switch (op) {
434         case OP_ADD:
435             uv_rwlock_wrlock(&lockMapChannel);
436             mapChannel[channelId] = hInput;
437             uv_rwlock_wrunlock(&lockMapChannel);
438             break;
439         case OP_REMOVE:
440             uv_rwlock_wrlock(&lockMapChannel);
441             mapChannel.erase(channelId);
442             uv_rwlock_wrunlock(&lockMapChannel);
443             break;
444         case OP_QUERY:
445             uv_rwlock_rdlock(&lockMapChannel);
446             if (mapChannel.count(channelId)) {
447                 hRet = mapChannel[channelId];
448             }
449             uv_rwlock_rdunlock(&lockMapChannel);
450             break;
451         case OP_QUERY_REF:
452             uv_rwlock_wrlock(&lockMapChannel);
453             if (mapChannel.count(channelId)) {
454                 hRet = mapChannel[channelId];
455                 ++hRet->ref;
456             }
457             uv_rwlock_wrunlock(&lockMapChannel);
458             break;
459         case OP_UPDATE:
460             uv_rwlock_wrlock(&lockMapChannel);
461             // remove old
462             mapChannel.erase(channelId);
463             mapChannel[hInput->channelId] = hInput;
464             uv_rwlock_wrunlock(&lockMapChannel);
465             break;
466         default:
467             break;
468     }
469     return hRet;
470 }
471 
EchoToClient(HChannel hChannel,uint8_t * bufPtr,const int size)472 void HdcChannelBase::EchoToClient(HChannel hChannel, uint8_t *bufPtr, const int size)
473 {
474     uv_stream_t *sendStream = nullptr;
475     int sizeNewBuf = size + DWORD_SERIALIZE_SIZE;
476     auto data = new uint8_t[sizeNewBuf]();
477     if (!data) {
478         return;
479     }
480     *(uint32_t *)data = htonl(size);
481     if (memcpy_s(data + DWORD_SERIALIZE_SIZE, sizeNewBuf - DWORD_SERIALIZE_SIZE, bufPtr, size)) {
482         delete[] data;
483         return;
484     }
485     sendStream = (uv_stream_t *)&hChannel->hChildWorkTCP;
486     if (!uv_is_closing((const uv_handle_t *)sendStream) && uv_is_writable(sendStream)) {
487         ++hChannel->ref;
488         Base::SendToStreamEx(sendStream, data, sizeNewBuf, nullptr, (void *)WriteCallback, data);
489     } else {
490         WRITE_LOG(LOG_WARN, "EchoToClient, channelId:%u is unwritable.", hChannel->channelId);
491         delete[] data;
492     }
493 }
494 
EchoToAllChannelsViaSessionId(uint32_t targetSessionId,const string & echo)495 void HdcChannelBase::EchoToAllChannelsViaSessionId(uint32_t targetSessionId, const string &echo)
496 {
497     for (auto v : mapChannel) {
498         HChannel hChannel = (HChannel)v.second;
499         if (!hChannel->isDead && hChannel->targetSessionId == targetSessionId) {
500             WRITE_LOG(LOG_INFO, "%s:%u %s", __FUNCTION__, targetSessionId, echo.c_str());
501             EchoToClient(hChannel, (uint8_t *)echo.c_str(), echo.size());
502         }
503     }
504 }
505 }
506