1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "forward.h"
16 #include "base.h"
17
18 namespace Hdc {
19 #ifdef HDC_HOST
20 static string g_forwardListenIp = DEFAULT_SERVER_ADDR_IP;
21 #endif
HdcForwardBase(HTaskInfo hTaskInfo)22 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo)
23 : HdcTaskBase(hTaskInfo)
24 {
25 fds[0] = -1;
26 fds[1] = -1;
27 }
28
~HdcForwardBase()29 HdcForwardBase::~HdcForwardBase()
30 {
31 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase channelId:%u", taskInfo->channelId);
32 };
33
ReadyForRelease()34 bool HdcForwardBase::ReadyForRelease()
35 {
36 if (!HdcTaskBase::ReadyForRelease()) {
37 WRITE_LOG(LOG_WARN, "not ready for release channelId:%u", taskInfo->channelId);
38 return false;
39 }
40 return true;
41 }
42
StopTask()43 void HdcForwardBase::StopTask()
44 {
45 ctxPointMutex.lock();
46 vector<HCtxForward> ctxs;
47 map<uint32_t, HCtxForward>::iterator iter;
48 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) {
49 HCtxForward ctx = iter->second;
50 ctxs.push_back(ctx);
51 }
52 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other.
53 mapCtxPoint.clear();
54 ctxPointMutex.unlock();
55 for (auto ctx: ctxs) {
56 FreeContext(ctx, 0, false);
57 }
58 }
59
OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)60 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client)
61 {
62 HCtxForward ctxListen = (HCtxForward)server->data;
63 char buf[BUF_SIZE_DEFAULT] = { 0 };
64 bool ret = false;
65 while (true) {
66 if (uv_accept(server, client)) {
67 WRITE_LOG(LOG_FATAL, "OnAccept uv_accept failed Listenid:%u type:%d remoteParamenters:%s cid:%u sid:%u",
68 ctxListen->id, ctxListen->type, ctxListen->remoteParamenters.c_str(), taskInfo->channelId,
69 taskInfo->sessionId);
70 break;
71 }
72 ctxClient->type = ctxListen->type;
73 ctxClient->remoteParamenters = ctxListen->remoteParamenters;
74 int maxSize = sizeof(buf) - forwardParameterBufSize;
75 // clang-format off
76 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s",
77 ctxClient->remoteParamenters.c_str()) < 0) {
78 WRITE_LOG(LOG_FATAL, "OnAccept prepare buffer failed Listenid:%u type:%d remotePara:%s cid:%u sid:%u",
79 ctxListen->id, ctxListen->type, ctxListen->remoteParamenters.c_str(), taskInfo->channelId,
80 taskInfo->sessionId);
81 break;
82 }
83 WRITE_LOG(LOG_INFO, "Forward OnAccept ctxclientid:%u type:%d remoteParamenters:%s cid:%u sid:%u",
84 ctxClient->id, ctxClient->type, ctxClient->remoteParamenters.c_str(), taskInfo->channelId,
85 taskInfo->sessionId);
86 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf),
87 strlen(buf + forwardParameterBufSize) + 9); // 9: pre 8bytes preserve for param bits
88 ret = true;
89 break;
90 }
91 if (!ret) {
92 FreeContext(ctxClient, 0, false);
93 }
94 }
95
ListenCallback(uv_stream_t * server,const int status)96 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status)
97 {
98 HCtxForward ctxListen = (HCtxForward)server->data;
99 HdcForwardBase *thisClass = ctxListen->thisClass;
100 uv_stream_t *client = nullptr;
101 CALLSTAT_GUARD(*(ctxListen->thisClass->loopTaskStatus), server->loop, "HdcForwardBase::ListenCallback");
102
103 if (status == -1 || !ctxListen->ready) {
104 WRITE_LOG(LOG_FATAL, "ListenCallback status:%d id:%u ready:%d cid:%u sid:%u",
105 status, ctxListen->id, ctxListen->ready, thisClass->taskInfo->channelId, thisClass->taskInfo->sessionId);
106 thisClass->FreeContext(ctxListen, 0, false);
107 thisClass->TaskFinish();
108 return;
109 }
110 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true);
111 if (!ctxClient) {
112 WRITE_LOG(LOG_FATAL, "ListenCallback ctxClient is null, ctxListenid:%u cid:%u sid:%u",
113 ctxListen->id, thisClass->taskInfo->channelId, thisClass->taskInfo->sessionId);
114 return;
115 }
116 if (ctxListen->type == FORWARD_TCP) {
117 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp);
118 client = (uv_stream_t *)&ctxClient->tcp;
119 } else {
120 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
121 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0);
122 client = (uv_stream_t *)&ctxClient->pipe;
123 }
124 thisClass->OnAccept(server, ctxClient, client);
125 }
126
MallocContext(bool masterSlave)127 void *HdcForwardBase::MallocContext(bool masterSlave)
128 {
129 HCtxForward ctx = nullptr;
130 if ((ctx = new ContextForward()) == nullptr) {
131 return nullptr;
132 }
133 ctx->id = Base::GetRandomU32();
134 ctx->masterSlave = masterSlave;
135 ctx->thisClass = this;
136 ctx->fd = -1;
137 ctx->fdClass = nullptr;
138 ctx->tcp.data = ctx;
139 ctx->pipe.data = ctx;
140 AdminContext(OP_ADD, ctx->id, ctx);
141 refCount++;
142 return ctx;
143 }
144
FreeContextCallBack(HCtxForward ctx)145 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx)
146 {
147 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) {
148 HCtxForward ctx = (HCtxForward)data;
149 AdminContext(OP_REMOVE, ctx->id, nullptr);
150 if (ctx != nullptr) {
151 WRITE_LOG(LOG_DEBUG, "Finally to delete id:%u", ctx->id);
152 delete ctx;
153 ctx = nullptr;
154 }
155 if (refCount > 0) {
156 --refCount;
157 }
158 });
159 }
160
FreeJDWP(HCtxForward ctx)161 void HdcForwardBase::FreeJDWP(HCtxForward ctx)
162 {
163 Base::CloseFd(ctx->fd);
164 if (ctx->fdClass) {
165 ctx->fdClass->StopWorkOnThread(false, nullptr);
166
167 auto funcReqClose = [](uv_idle_t *handle) -> void {
168 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
169 HCtxForward ctx = (HCtxForward)handle->data;
170 ctx->thisClass->FreeContextCallBack(ctx);
171 delete (uv_idle_t *)handle;
172 };
173 HCtxForward context = (HCtxForward)handle->data;
174 if (context->fdClass->ReadyForRelease()) {
175 delete context->fdClass;
176 context->fdClass = nullptr;
177 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
178 }
179 };
180 Base::IdleUvTask(loopTask, ctx, funcReqClose);
181 } else {
182 auto funcReqClose = [](uv_idle_t *handle) -> void {
183 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
184 HCtxForward ctx = (HCtxForward)handle->data;
185 ctx->thisClass->FreeContextCallBack(ctx);
186 delete (uv_idle_t *)handle;
187 };
188 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
189 };
190 Base::IdleUvTask(loopTask, ctx, funcReqClose);
191 }
192 }
193
FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)194 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote)
195 {
196 std::lock_guard<std::mutex> lock(ctxFreeMutex);
197 HCtxForward ctx = nullptr;
198 if (!ctxIn) {
199 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
200 WRITE_LOG(LOG_DEBUG, "FreeContext Query id:%u failed cid:%u sid:%u", id, taskInfo->channelId,
201 taskInfo->sessionId);
202 return;
203 }
204 } else {
205 ctx = ctxIn;
206 }
207 WRITE_LOG(LOG_INFO, "FreeContext id:%u, bNotifyRemote:%d, finish:%d cid:%u sid:%u",
208 ctx->id, bNotifyRemote, ctx->finish, taskInfo->channelId, taskInfo->sessionId);
209 if (ctx->finish) {
210 return;
211 }
212 if (bNotifyRemote) {
213 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0);
214 }
215 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void {
216 HCtxForward ctx = (HCtxForward)handle->data;
217 ctx->thisClass->FreeContextCallBack(ctx);
218 };
219 switch (ctx->type) {
220 case FORWARD_TCP:
221 case FORWARD_JDWP:
222 case FORWARD_ARK:
223 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose);
224 break;
225 case FORWARD_ABSTRACT:
226 case FORWARD_RESERVED:
227 case FORWARD_FILESYSTEM:
228 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose);
229 break;
230 case FORWARD_DEVICE: {
231 FreeJDWP(ctx);
232 break;
233 }
234 default:
235 break;
236 }
237 ctx->finish = true;
238 }
239
SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)240 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize)
241 {
242 StartTraceScope("HdcForwardBase::SendToTask");
243 bool ret = false;
244 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO
245 if (bufSize > Base::GetMaxBufSizeStable() * BUF_MULTIPLE) {
246 WRITE_LOG(LOG_FATAL, "SendToTask bufSize:%d", bufSize);
247 return false;
248 }
249 auto newBuf = new uint8_t[bufSize + BUF_EXTEND_SIZE];
250 if (!newBuf) {
251 return false;
252 }
253 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid);
254 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + BUF_EXTEND_SIZE, bufSize, bufPtr, bufSize) != EOK) {
255 delete[] newBuf;
256 return false;
257 }
258 ret = SendToAnother(command, newBuf, bufSize + BUF_EXTEND_SIZE);
259 delete[] newBuf;
260 return ret;
261 }
262
263 // Forward flow is small and frequency is fast
AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)264 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf)
265 {
266 size_t size = sizeSuggested;
267 if (size > MAX_USBFFS_BULK) {
268 size = MAX_USBFFS_BULK;
269 }
270 buf->base = (char *)new char[size];
271 if (buf->base) {
272 buf->len = (size > 0) ? (size - 1) : 0;
273 } else {
274 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null");
275 }
276 }
277
ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)278 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
279 {
280 HCtxForward ctx = (HCtxForward)stream->data;
281 CALLSTAT_GUARD(*(ctx->thisClass->loopTaskStatus), stream->loop, "HdcForwardBase::ReadForwardBuf");
282 if (nread < 0) {
283 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:%zd id:%u cid:%u sid:%u", nread, ctx->id,
284 ctx->thisClass->taskInfo->channelId, ctx->thisClass->taskInfo->sessionId);
285 ctx->thisClass->FreeContext(ctx, 0, true);
286 delete[] buf->base;
287 return;
288 }
289 if (nread == 0) {
290 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:0 id:%u cid:%u sid:%u", nread, ctx->id,
291 ctx->thisClass->taskInfo->channelId, ctx->thisClass->taskInfo->sessionId);
292 delete[] buf->base;
293 return;
294 }
295 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread);
296 // clear
297 delete[] buf->base;
298 }
299
ConnectTarget(uv_connect_t * connection,int status)300 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status)
301 {
302 HCtxForward ctx = (HCtxForward)connection->data;
303 HdcForwardBase *thisClass = ctx->thisClass;
304 CALLSTAT_GUARD(*(ctx->thisClass->loopTaskStatus), connection->handle->loop, "HdcForwardBase::ConnectTarget");
305 delete connection;
306 if (status < 0) {
307 constexpr int bufSize = 1024;
308 char buf[bufSize] = { 0 };
309 uv_err_name_r(status, buf, bufSize);
310 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf);
311 }
312 thisClass->SetupPointContinue(ctx, status);
313 }
314
CheckNodeInfo(const char * nodeInfo,string as[2])315 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2])
316 {
317 if (nodeInfo == nullptr) {
318 return false;
319 }
320 string str = nodeInfo;
321 size_t strLen = str.size();
322 if (strLen < 1) {
323 return false;
324 }
325 size_t pos = str.find(':');
326 if (pos != string::npos) {
327 if (pos == 0 || pos == strLen - 1) {
328 return false;
329 }
330 as[0] = str.substr(0, pos);
331 as[1] = str.substr(pos + 1);
332 } else {
333 return false;
334 }
335 if (as[0] == "tcp") {
336 if (as[1].size() > std::to_string(MAX_IP_PORT).size()) {
337 return false;
338 }
339 int port = atoi(as[1].c_str());
340 if (port <= 0 || port > MAX_IP_PORT) {
341 return false;
342 }
343 }
344 return true;
345 }
346
SetupPointContinue(HCtxForward ctx,int status)347 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status)
348 {
349 if (ctx->checkPoint) {
350 // send to active
351 uint8_t flag = status > 0;
352 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1);
353 FreeContext(ctx, 0, false);
354 return true;
355 }
356 if (status < 0) {
357 FreeContext(ctx, 0, true);
358 return false;
359 }
360 // send to active
361 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) {
362 WRITE_LOG(LOG_FATAL, "SetupPointContinue SendToTask failed id:%u", ctx->id);
363 FreeContext(ctx, 0, true);
364 return false;
365 }
366 return DoForwardBegin(ctx);
367 }
368
DetechForwardType(HCtxForward ctxPoint)369 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint)
370 {
371 string &sFType = ctxPoint->localArgs[0];
372 string &sNodeCfg = ctxPoint->localArgs[1];
373 // string to enum
374 if (sFType == "tcp") {
375 ctxPoint->type = FORWARD_TCP;
376 } else if (sFType == "dev") {
377 ctxPoint->type = FORWARD_DEVICE;
378 } else if (sFType == "localabstract") {
379 // daemon shell: /system/bin/socat abstract-listen:linux-abstract -
380 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract
381 // host: hdc fport tcp:8080 localabstract:linux-abstract
382 ctxPoint->type = FORWARD_ABSTRACT;
383 } else if (sFType == "localreserved") {
384 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg;
385 ctxPoint->type = FORWARD_RESERVED;
386 } else if (sFType == "localfilesystem") {
387 sNodeCfg = filesystemSocketPrefix + sNodeCfg;
388 ctxPoint->type = FORWARD_FILESYSTEM;
389 } else if (sFType == "jdwp") {
390 ctxPoint->type = FORWARD_JDWP;
391 } else if (sFType == "ark") {
392 ctxPoint->type = FORWARD_ARK;
393 } else {
394 return false;
395 }
396 return true;
397 }
398
399 #ifdef HDC_HOST
SetForwardListenIP(string & ip)400 void HdcForwardBase::SetForwardListenIP(string &ip)
401 {
402 g_forwardListenIp = ip;
403 }
404
SetupTcpListenAllIp(HCtxForward ctxPoint,int port,int & rc)405 bool HdcForwardBase::SetupTcpListenAllIp(HCtxForward ctxPoint, int port, int &rc)
406 {
407 struct sockaddr_in addrv4;
408 struct sockaddr_in6 addrv6;
409 char buffer[BUF_SIZE_DEFAULT] = { 0 };
410 rc = uv_ip6_addr(g_forwardListenIp.c_str(), port, &addrv6);
411 if (rc != 0) {
412 uv_strerror_r(rc, buffer, BUF_SIZE_DEFAULT);
413 WRITE_LOG(LOG_FATAL, "SetupTcpListenAllIp uv_ip6_addr %d %s", rc, buffer);
414 return false;
415 }
416 rc = uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addrv6, 0);
417 if (rc != 0) {
418 WRITE_LOG(LOG_WARN, "SetupTcpListenAllIp uv_tcp_bind ipv6 %d", rc);
419 if (rc == -EAFNOSUPPORT) {
420 size_t index = g_forwardListenIp.find(IPV4_MAPPING_PREFIX);
421 size_t size = IPV4_MAPPING_PREFIX.size();
422 if (index == std::string::npos) {
423 return false;
424 }
425 std::string ipv4 = g_forwardListenIp.substr(index + size);
426 uv_ip4_addr(ipv4.c_str(), port, &addrv4);
427 rc = uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addrv4, 0);
428 if (rc != 0) {
429 uv_strerror_r(rc, buffer, BUF_SIZE_DEFAULT);
430 WRITE_LOG(LOG_FATAL, "SetupTcpListenAllIp uv_tcp_bind ipv4 %s failed %d %s",
431 ipv4.c_str(), rc, buffer);
432 return false;
433 }
434 } else {
435 uv_strerror_r(rc, buffer, BUF_SIZE_DEFAULT);
436 WRITE_LOG(LOG_FATAL, "SetupTcpListenAllIp uv_tcp_bind %d %s", rc, buffer);
437 return false;
438 }
439 }
440 return true;
441 }
442 #endif
443
SetupTCPPoint(HCtxForward ctxPoint)444 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint)
445 {
446 string &sNodeCfg = ctxPoint->localArgs[1];
447 int port = atoi(sNodeCfg.c_str());
448 ctxPoint->tcp.data = ctxPoint;
449 int rc = uv_tcp_init(loopTask, &ctxPoint->tcp);
450 if (rc < 0) {
451 WRITE_LOG(LOG_FATAL, "SetupTCPPoint uv_tcp_init failed, rc:%d port:%d ctxid:%u cid:%u sid:%u", rc, port,
452 ctxPoint->id, taskInfo->channelId, taskInfo->sessionId);
453 return false;
454 }
455 struct sockaddr_in addr;
456 if (ctxPoint->masterSlave) {
457 #ifdef HDC_HOST
458 bool ret = SetupTcpListenAllIp(ctxPoint, port, rc);
459 WRITE_LOG(LOG_DEBUG, "SetupTcpListenAllIp ret:%d, rc:%d", ret, rc);
460 #else
461 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
462 rc = uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0);
463 #endif
464 if (rc < 0) {
465 WRITE_LOG(LOG_FATAL, "SetupTCPPoint master uv_tcp_bind failed, rc:%d port:%d ctxid:%u cid:%u sid:%u",
466 rc, port, ctxPoint->id, taskInfo->channelId, taskInfo->sessionId);
467 return false;
468 }
469 WRITE_LOG(LOG_INFO, "SetupTCPPoint master uv_listen port:%d ctxid:%u cid:%u sid:%u", port, ctxPoint->id,
470 taskInfo->channelId, taskInfo->sessionId);
471 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, UV_LISTEN_LBACKOG, ListenCallback)) {
472 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg;
473 WRITE_LOG(LOG_FATAL, "SetupTCPPoint master uv_listen failed port:%d ctxid:%u cid:%u sid:%u",
474 port, ctxPoint->id, taskInfo->channelId, taskInfo->sessionId);
475 return false;
476 }
477 } else {
478 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
479 uv_connect_t *conn = new(std::nothrow) uv_connect_t();
480 if (conn == nullptr) {
481 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed port:%d ctxid:%u cid:%u sid:%u", port, ctxPoint->id,
482 taskInfo->channelId, taskInfo->sessionId);
483 return false;
484 }
485 conn->data = ctxPoint;
486 WRITE_LOG(LOG_INFO, "SetupTCPPoint slave uv_tcp_connect port:%d ctxid:%u cid:%u sid:%u", port, ctxPoint->id,
487 taskInfo->channelId, taskInfo->sessionId);
488 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget);
489 }
490 return true;
491 }
492
SetupDevicePoint(HCtxForward ctxPoint)493 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint)
494 {
495 uint8_t flag = 1;
496 string &sNodeCfg = ctxPoint->localArgs[1];
497 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg);
498 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) {
499 ctxPoint->lastError = "Open unix-dev failed";
500 flag = -1;
501 }
502 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool {
503 HCtxForward ctx = (HCtxForward)a;
504 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c);
505 };
506 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool {
507 HCtxForward ctx = (HCtxForward)a;
508 WRITE_LOG(LOG_DEBUG, "funcFinish id:%u ret:%d reason:%s", ctx->id, b, c.c_str());
509 FreeContext(ctx, 0, true);
510 return false;
511 };
512 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead,
513 funcFinish, true);
514 if (ctxPoint->fdClass == nullptr) {
515 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed");
516 return false;
517 }
518 SetupPointContinue(ctxPoint, flag);
519 return true;
520 }
521
LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)522 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg)
523 {
524 bool abstractRet = false;
525 #ifndef _WIN32
526 int s = 0;
527 do {
528 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
529 break;
530 }
531 fcntl(s, F_SETFD, FD_CLOEXEC);
532 struct sockaddr_un addr;
533 Base::ZeroStruct(addr);
534 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1;
535 addr.sun_family = AF_LOCAL;
536 addr.sun_path[0] = 0;
537
538 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) {
539 break;
540 };
541 struct timeval timeout = { 3, 0 };
542 setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
543 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) {
544 WRITE_LOG(LOG_FATAL, "LocalAbstractConnect failed errno:%d", errno);
545 break;
546 }
547 if (uv_pipe_open(pipe, s)) {
548 break;
549 }
550 abstractRet = true;
551 } while (false);
552 if (!abstractRet) {
553 Base::CloseFd(s);
554 }
555 #endif
556 return abstractRet;
557 }
558
SetupFilePoint(HCtxForward ctxPoint)559 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint)
560 {
561 string &sNodeCfg = ctxPoint->localArgs[1];
562 ctxPoint->pipe.data = ctxPoint;
563 uv_pipe_init(loopTask, &ctxPoint->pipe, 0);
564 if (ctxPoint->masterSlave) {
565 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) {
566 unlink(sNodeCfg.c_str());
567 }
568 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) {
569 ctxPoint->lastError = "Unix pipe bind failed";
570 return false;
571 }
572 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, UV_LISTEN_LBACKOG, ListenCallback)) {
573 ctxPoint->lastError = "Unix pipe listen failed";
574 return false;
575 }
576 } else {
577 if (ctxPoint->type == FORWARD_ABSTRACT) {
578 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg);
579 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1);
580 if (!abstractRet) {
581 ctxPoint->lastError = "LocalAbstractConnect failed";
582 return false;
583 }
584 } else {
585 uv_connect_t *connect = new(std::nothrow) uv_connect_t();
586 if (connect == nullptr) {
587 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed");
588 return false;
589 }
590 connect->data = ctxPoint;
591 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget);
592 }
593 }
594 return true;
595 }
596
SetupPoint(HCtxForward ctxPoint)597 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint)
598 {
599 bool ret = true;
600 if (!DetechForwardType(ctxPoint)) {
601 return false;
602 }
603 switch (ctxPoint->type) {
604 case FORWARD_TCP:
605 if (!SetupTCPPoint(ctxPoint)) {
606 ret = false;
607 };
608 break;
609 #ifndef _WIN32
610 case FORWARD_DEVICE:
611 if (!SetupDevicePoint(ctxPoint)) {
612 ret = false;
613 };
614 break;
615 case FORWARD_JDWP:
616 if (!SetupJdwpPoint(ctxPoint)) {
617 ret = false;
618 };
619 break;
620 case FORWARD_ABSTRACT:
621 case FORWARD_RESERVED:
622 case FORWARD_FILESYSTEM:
623 if (!SetupFilePoint(ctxPoint)) {
624 ret = false;
625 };
626 break;
627 #else
628 case FORWARD_DEVICE:
629 case FORWARD_JDWP:
630 case FORWARD_ABSTRACT:
631 case FORWARD_RESERVED:
632 case FORWARD_FILESYSTEM:
633 ctxPoint->lastError = "Not supoort forward-type";
634 ret = false;
635 break;
636 #endif
637 default:
638 ctxPoint->lastError = "Not supoort forward-type";
639 ret = false;
640 break;
641 }
642 return ret;
643 }
644
BeginForward(const string & command,string & sError)645 bool HdcForwardBase::BeginForward(const string &command, string &sError)
646 {
647 bool ret = false;
648 int argc = 0;
649 char bufString[BUF_SIZE_SMALL] = "";
650 HCtxForward ctxPoint = (HCtxForward)MallocContext(true);
651 if (!ctxPoint) {
652 WRITE_LOG(LOG_FATAL, "MallocContext failed");
653 return false;
654 }
655 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc);
656 if (argv == nullptr) {
657 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed");
658 return false;
659 }
660 while (true) {
661 if (argc < CMD_ARG1_COUNT) {
662 break;
663 }
664 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) {
665 break;
666 }
667 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) {
668 break;
669 }
670 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) {
671 break;
672 }
673 ctxPoint->remoteParamenters = argv[1];
674 if (!SetupPoint(ctxPoint)) {
675 break;
676 }
677
678 ret = true;
679 break;
680 }
681 sError = ctxPoint->lastError;
682 if (ret) {
683 // First 8-byte parameter bit
684 int maxBufSize = sizeof(bufString) - forwardParameterBufSize;
685 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) {
686 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString),
687 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1);
688 taskCommand = command;
689 }
690 }
691 delete[](reinterpret_cast<char *>(argv));
692 return ret;
693 }
694
FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)695 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf)
696 {
697 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE;
698 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn));
699 return true;
700 }
701
SlaveConnect(uint8_t * bufCmd,const int bufSize,bool bCheckPoint,string & sError)702 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, const int bufSize, bool bCheckPoint, string &sError)
703 {
704 if (bufSize <= DWORD_SERIALIZE_SIZE + forwardParameterBufSize) {
705 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward header");
706 return false;
707 }
708 bool ret = false;
709 char *content = nullptr;
710 uint32_t idSlaveOld = 0;
711 HCtxForward ctxPoint = (HCtxForward)MallocContext(false);
712 if (!ctxPoint) {
713 WRITE_LOG(LOG_FATAL, "MallocContext failed");
714 return false;
715 }
716 idSlaveOld = ctxPoint->id;
717 ctxPoint->checkPoint = bCheckPoint;
718 // refresh another id,8byte param
719 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content));
720 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint);
721 content += forwardParameterBufSize;
722 if (!CheckNodeInfo(content, ctxPoint->localArgs)) {
723 WRITE_LOG(LOG_FATAL, "SlaveConnect CheckNodeInfo failed content:%s", content);
724 goto Finish;
725 }
726 if (!DetechForwardType(ctxPoint)) {
727 WRITE_LOG(LOG_FATAL, "SlaveConnect DetechForwardType failed content:%s", content);
728 goto Finish;
729 }
730 WRITE_LOG(LOG_INFO, "SlaveConnect ctxid:%u type:%d cid:%u sid:%u", ctxPoint->id, ctxPoint->type,
731 taskInfo->channelId, taskInfo->sessionId);
732 if (ctxPoint->type == FORWARD_ARK) {
733 if (ctxPoint->checkPoint) {
734 if (!SetupArkPoint(ctxPoint)) {
735 sError = ctxPoint->lastError;
736 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupArkPoint failed content:%s", content);
737 goto Finish;
738 }
739 } else {
740 SetupPointContinue(ctxPoint, 0);
741 }
742 ret = true;
743 } else {
744 if (!ctxPoint->checkPoint) {
745 if (!SetupPoint(ctxPoint)) {
746 sError = ctxPoint->lastError;
747 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupPoint failed content:%s", content);
748 goto Finish;
749 }
750 } else {
751 SetupPointContinue(ctxPoint, 0);
752 }
753 ret = true;
754 }
755 Finish:
756 if (!ret) {
757 FreeContext(ctxPoint, 0, true);
758 }
759 return ret;
760 }
761
DoForwardBegin(HCtxForward ctx)762 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx)
763 {
764 switch (ctx->type) {
765 case FORWARD_TCP:
766 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm
767 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
768 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
769 break;
770 case FORWARD_ARK:
771 WRITE_LOG(LOG_DEBUG, "DoForwardBegin ark socketpair id:%u fds[0]:%d", ctx->id, fds[0]);
772 uv_tcp_init(loopTask, &ctx->tcp);
773 uv_tcp_open(&ctx->tcp, fds[0]);
774 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
775 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
776 break;
777 case FORWARD_ABSTRACT:
778 case FORWARD_RESERVED:
779 case FORWARD_FILESYSTEM:
780 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf);
781 break;
782 case FORWARD_DEVICE: {
783 ctx->fdClass->StartWorkOnThread();
784 break;
785 }
786 default:
787 break;
788 }
789 ctx->ready = true;
790 return true;
791 }
792
AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)793 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput)
794 {
795 ctxPointMutex.lock();
796 void *hRet = nullptr;
797 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint;
798 switch (op) {
799 case OP_ADD:
800 mapCtx[id] = hInput;
801 break;
802 case OP_REMOVE:
803 mapCtx.erase(id);
804 break;
805 case OP_QUERY:
806 if (mapCtx.count(id)) {
807 hRet = mapCtx[id];
808 }
809 break;
810 case OP_UPDATE:
811 mapCtx.erase(id);
812 mapCtx[hInput->id] = hInput;
813 break;
814 default:
815 break;
816 }
817 ctxPointMutex.unlock();
818 return hRet;
819 }
820
SendCallbackForwardBuf(uv_write_t * req,int status)821 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status)
822 {
823 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data;
824 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward);
825 if (status < 0 && !ctx->finish) {
826 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status);
827 ctx->thisClass->FreeContext(ctx, 0, true);
828 }
829 delete[] ctxIO->bufIO;
830 delete ctxIO;
831 delete req;
832 }
833
SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)834 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size)
835 {
836 int nRet = 0;
837 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
838 WRITE_LOG(LOG_WARN, "SendForwardBuf size:%d > HDC_BUF_MAX_BYTES, ctxId:%u", size, ctx->id);
839 return -1;
840 }
841 if (size <= 0) {
842 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d, ctxId:%u", size, ctx->id);
843 return -1;
844 }
845 auto pDynBuf = new(std::nothrow) uint8_t[size];
846 if (!pDynBuf) {
847 WRITE_LOG(LOG_WARN, "SendForwardBuf new DynBuf failed size:%d, ctxId:%u", size, ctx->id);
848 return -1;
849 }
850 (void)memcpy_s(pDynBuf, size, bufPtr, size);
851 if (ctx->type == FORWARD_DEVICE) {
852 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size);
853 } else {
854 auto ctxIO = new ContextForwardIO();
855 if (!ctxIO) {
856 WRITE_LOG(LOG_WARN, "SendForwardBuf new ContextForwardIO failed, ctxId:%u", ctx->id);
857 delete[] pDynBuf;
858 return -1;
859 }
860 ctxIO->ctxForward = ctx;
861 ctxIO->bufIO = pDynBuf;
862 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP || ctx->type == FORWARD_ARK) {
863 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr,
864 (void *)SendCallbackForwardBuf, (void *)ctxIO);
865 } else {
866 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
867 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr,
868 (void *)SendCallbackForwardBuf, (void *)ctxIO);
869 }
870 if (nRet < 0) {
871 WRITE_LOG(LOG_WARN, "SendForwardBuf SendToStreamEx ret:%d, size:%d ctxId:%u type:%d",
872 nRet, size, ctx->id, ctx->type);
873 delete[] pDynBuf;
874 delete ctxIO;
875 }
876 }
877 return nRet;
878 }
879
CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)880 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload)
881 {
882 bool ret = true;
883 bool bCheck = static_cast<bool>(payload);
884 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed");
885 if (bCheck) {
886 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|";
887 mapInfo += taskCommand;
888 ctx->ready = true;
889 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())),
890 mapInfo.size() + 1);
891 } else {
892 ret = false;
893 FreeContext(ctx, 0, false);
894 }
895 return ret;
896 }
897
ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)898 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
899 {
900 if (payloadSize <= DWORD_SERIALIZE_SIZE && command != CMD_FORWARD_FREE_CONTEXT
901 && command != CMD_FORWARD_ACTIVE_MASTER) {
902 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward command header");
903 return false;
904 }
905 bool ret = true;
906 uint8_t *pContent = nullptr;
907 int sizeContent = 0;
908 uint32_t id = 0;
909 HCtxForward ctx = nullptr;
910 FilterCommand(payload, &id, &pContent);
911 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE;
912 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
913 return true;
914 }
915 switch (command) {
916 case CMD_FORWARD_CHECK_RESULT: {
917 ret = CommandForwardCheckResult(ctx, pContent);
918 break;
919 }
920 case CMD_FORWARD_ACTIVE_MASTER: {
921 ret = DoForwardBegin(ctx);
922 break;
923 }
924 case CMD_FORWARD_DATA: {
925 if (ctx->finish) {
926 break;
927 }
928 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) {
929 WRITE_LOG(LOG_WARN, "ForwardCommandDispatch SendForwardBuf rc < 0, ctxid:%u cid:%u sid:%u", id,
930 taskInfo->channelId, taskInfo->sessionId);
931 FreeContext(ctx, 0, true);
932 }
933 break;
934 }
935 case CMD_FORWARD_FREE_CONTEXT: {
936 FreeContext(ctx, 0, false);
937 break;
938 }
939 default:
940 ret = false;
941 break;
942 }
943 if (!ret) {
944 if (ctx) {
945 FreeContext(ctx, 0, true);
946 } else {
947 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free");
948 TaskFinish();
949 }
950 }
951 return ret;
952 }
953
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)954 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
955 {
956 if (command != CMD_FORWARD_DATA) {
957 WRITE_LOG(LOG_WARN, "CommandDispatch command:%d payloadSize:%d cid:%u sid:%u", command, payloadSize,
958 taskInfo->channelId, taskInfo->sessionId);
959 }
960 bool ret = true;
961 string sError;
962 // prepare
963 if (command == CMD_FORWARD_INIT) {
964 string strCommand(reinterpret_cast<char *>(payload), payloadSize);
965 if (!BeginForward(strCommand, sError)) {
966 ret = false;
967 goto Finish;
968 }
969 return true;
970 } else if (command == CMD_FORWARD_CHECK) {
971 // Detect remote if it's reachable
972 if (!SlaveConnect(payload, payloadSize, true, sError)) {
973 ret = false;
974 goto Finish;
975 }
976 return true;
977 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) {
978 // slave connect target port when activating
979 if (!SlaveConnect(payload, payloadSize, false, sError)) {
980 ret = false;
981 goto Finish;
982 }
983 return true;
984 }
985 if (!ForwardCommandDispatch(command, payload, payloadSize)) {
986 ret = false;
987 goto Finish;
988 }
989 Finish:
990 if (!ret) {
991 if (!sError.size()) {
992 LogMsg(MSG_FAIL, "Forward parament failed");
993 } else {
994 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str()));
995 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str()));
996 }
997 }
998 return ret;
999 }
1000 } // namespace Hdc
1001