1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "forward.h"
16 #include "base.h"
17
18 namespace Hdc {
HdcForwardBase(HTaskInfo hTaskInfo)19 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo)
20 : HdcTaskBase(hTaskInfo)
21 {
22 fds[0] = -1;
23 fds[1] = -1;
24 }
25
~HdcForwardBase()26 HdcForwardBase::~HdcForwardBase()
27 {
28 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase channelId:%u", taskInfo->channelId);
29 };
30
ReadyForRelease()31 bool HdcForwardBase::ReadyForRelease()
32 {
33 if (!HdcTaskBase::ReadyForRelease()) {
34 WRITE_LOG(LOG_WARN, "not ready for release channelId:%u", taskInfo->channelId);
35 return false;
36 }
37 return true;
38 }
39
StopTask()40 void HdcForwardBase::StopTask()
41 {
42 ctxPointMutex.lock();
43 vector<HCtxForward> ctxs;
44 map<uint32_t, HCtxForward>::iterator iter;
45 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) {
46 HCtxForward ctx = iter->second;
47 ctxs.push_back(ctx);
48 }
49 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other.
50 mapCtxPoint.clear();
51 ctxPointMutex.unlock();
52 for (auto ctx: ctxs) {
53 FreeContext(ctx, 0, false);
54 }
55 }
56
OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)57 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client)
58 {
59 HCtxForward ctxListen = (HCtxForward)server->data;
60 char buf[BUF_SIZE_DEFAULT] = { 0 };
61 bool ret = false;
62 while (true) {
63 if (uv_accept(server, client)) {
64 WRITE_LOG(LOG_FATAL, "uv_accept id:%u type:%d remoteParamenters:%s",
65 ctxListen->id, ctxListen->type, ctxListen->remoteParamenters.c_str());
66 break;
67 }
68 ctxClient->type = ctxListen->type;
69 ctxClient->remoteParamenters = ctxListen->remoteParamenters;
70 int maxSize = sizeof(buf) - forwardParameterBufSize;
71 // clang-format off
72 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s",
73 ctxClient->remoteParamenters.c_str()) < 0) {
74 break;
75 }
76 WRITE_LOG(LOG_DEBUG, "OnAccept id:%u type:%d remoteParamenters:%s",
77 ctxClient->id, ctxClient->type, ctxClient->remoteParamenters.c_str());
78 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf),
79 strlen(buf + forwardParameterBufSize) + 9); // 9: pre 8bytes preserve for param bits
80 ret = true;
81 break;
82 }
83 if (!ret) {
84 FreeContext(ctxClient, 0, false);
85 }
86 }
87
ListenCallback(uv_stream_t * server,const int status)88 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status)
89 {
90 HCtxForward ctxListen = (HCtxForward)server->data;
91 HdcForwardBase *thisClass = ctxListen->thisClass;
92 uv_stream_t *client = nullptr;
93
94 if (status == -1 || !ctxListen->ready) {
95 WRITE_LOG(LOG_FATAL, "ListenCallback status:%d id:%u ready:%d",
96 status, ctxListen->id, ctxListen->ready);
97 thisClass->FreeContext(ctxListen, 0, false);
98 thisClass->TaskFinish();
99 return;
100 }
101 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true);
102 if (!ctxClient) {
103 return;
104 }
105 if (ctxListen->type == FORWARD_TCP) {
106 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp);
107 client = (uv_stream_t *)&ctxClient->tcp;
108 } else {
109 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
110 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0);
111 client = (uv_stream_t *)&ctxClient->pipe;
112 }
113 thisClass->OnAccept(server, ctxClient, client);
114 }
115
MallocContext(bool masterSlave)116 void *HdcForwardBase::MallocContext(bool masterSlave)
117 {
118 HCtxForward ctx = nullptr;
119 if ((ctx = new ContextForward()) == nullptr) {
120 return nullptr;
121 }
122 ctx->id = Base::GetRandomU32();
123 ctx->masterSlave = masterSlave;
124 ctx->thisClass = this;
125 ctx->fd = -1;
126 ctx->fdClass = nullptr;
127 ctx->tcp.data = ctx;
128 ctx->pipe.data = ctx;
129 AdminContext(OP_ADD, ctx->id, ctx);
130 refCount++;
131 return ctx;
132 }
133
FreeContextCallBack(HCtxForward ctx)134 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx)
135 {
136 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) {
137 HCtxForward ctx = (HCtxForward)data;
138 AdminContext(OP_REMOVE, ctx->id, nullptr);
139 if (ctx != nullptr) {
140 WRITE_LOG(LOG_DEBUG, "Finally to delete id:%u", ctx->id);
141 delete ctx;
142 ctx = nullptr;
143 }
144 if (refCount > 0) {
145 --refCount;
146 }
147 });
148 }
149
FreeJDWP(HCtxForward ctx)150 void HdcForwardBase::FreeJDWP(HCtxForward ctx)
151 {
152 Base::CloseFd(ctx->fd);
153 if (ctx->fdClass) {
154 ctx->fdClass->StopWorkOnThread(false, nullptr);
155
156 auto funcReqClose = [](uv_idle_t *handle) -> void {
157 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
158 HCtxForward ctx = (HCtxForward)handle->data;
159 ctx->thisClass->FreeContextCallBack(ctx);
160 delete (uv_idle_t *)handle;
161 };
162 HCtxForward context = (HCtxForward)handle->data;
163 if (context->fdClass->ReadyForRelease()) {
164 delete context->fdClass;
165 context->fdClass = nullptr;
166 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
167 }
168 };
169 Base::IdleUvTask(loopTask, ctx, funcReqClose);
170 } else {
171 auto funcReqClose = [](uv_idle_t *handle) -> void {
172 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
173 HCtxForward ctx = (HCtxForward)handle->data;
174 ctx->thisClass->FreeContextCallBack(ctx);
175 delete (uv_idle_t *)handle;
176 };
177 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
178 };
179 Base::IdleUvTask(loopTask, ctx, funcReqClose);
180 }
181 }
182
FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)183 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote)
184 {
185 std::lock_guard<std::mutex> lock(ctxFreeMutex);
186 HCtxForward ctx = nullptr;
187 if (!ctxIn) {
188 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
189 WRITE_LOG(LOG_DEBUG, "Query id:%u failed", id);
190 return;
191 }
192 } else {
193 ctx = ctxIn;
194 }
195 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d, finish:%d",
196 ctx->id, bNotifyRemote, ctx->finish);
197 if (ctx->finish) {
198 return;
199 }
200 if (bNotifyRemote) {
201 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0);
202 }
203 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void {
204 HCtxForward ctx = (HCtxForward)handle->data;
205 ctx->thisClass->FreeContextCallBack(ctx);
206 };
207 switch (ctx->type) {
208 case FORWARD_TCP:
209 case FORWARD_JDWP:
210 case FORWARD_ARK:
211 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose);
212 break;
213 case FORWARD_ABSTRACT:
214 case FORWARD_RESERVED:
215 case FORWARD_FILESYSTEM:
216 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose);
217 break;
218 case FORWARD_DEVICE: {
219 FreeJDWP(ctx);
220 break;
221 }
222 default:
223 break;
224 }
225 ctx->finish = true;
226 }
227
SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)228 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize)
229 {
230 StartTraceScope("HdcForwardBase::SendToTask");
231 bool ret = false;
232 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO
233 if (bufSize > Base::GetMaxBufSizeStable() * BUF_MULTIPLE) {
234 WRITE_LOG(LOG_FATAL, "SendToTask bufSize:%d", bufSize);
235 return false;
236 }
237 auto newBuf = new uint8_t[bufSize + BUF_EXTEND_SIZE];
238 if (!newBuf) {
239 return false;
240 }
241 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid);
242 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + BUF_EXTEND_SIZE, bufSize, bufPtr, bufSize) != EOK) {
243 delete[] newBuf;
244 return false;
245 }
246 ret = SendToAnother(command, newBuf, bufSize + BUF_EXTEND_SIZE);
247 delete[] newBuf;
248 return ret;
249 }
250
251 // Forward flow is small and frequency is fast
AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)252 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf)
253 {
254 size_t size = sizeSuggested;
255 if (size > MAX_USBFFS_BULK) {
256 size = MAX_USBFFS_BULK;
257 }
258 buf->base = (char *)new char[size];
259 if (buf->base) {
260 buf->len = (size > 0) ? (size - 1) : 0;
261 } else {
262 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null");
263 }
264 }
265
ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)266 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
267 {
268 HCtxForward ctx = (HCtxForward)stream->data;
269 if (nread < 0) {
270 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:%zd id:%u", nread, ctx->id);
271 ctx->thisClass->FreeContext(ctx, 0, true);
272 delete[] buf->base;
273 return;
274 }
275 if (nread == 0) {
276 WRITE_LOG(LOG_INFO, "ReadForwardBuf nread:0 id:%u", ctx->id);
277 delete[] buf->base;
278 return;
279 }
280 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread);
281 // clear
282 delete[] buf->base;
283 }
284
ConnectTarget(uv_connect_t * connection,int status)285 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status)
286 {
287 HCtxForward ctx = (HCtxForward)connection->data;
288 HdcForwardBase *thisClass = ctx->thisClass;
289 delete connection;
290 if (status < 0) {
291 constexpr int bufSize = 1024;
292 char buf[bufSize] = { 0 };
293 uv_err_name_r(status, buf, bufSize);
294 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf);
295 }
296 thisClass->SetupPointContinue(ctx, status);
297 }
298
CheckNodeInfo(const char * nodeInfo,string as[2])299 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2])
300 {
301 string str = nodeInfo;
302 size_t strLen = str.size();
303 if (strLen < 1) {
304 return false;
305 }
306 size_t pos = str.find(':');
307 if (pos != string::npos) {
308 if (pos == 0 || pos == strLen - 1) {
309 return false;
310 }
311 as[0] = str.substr(0, pos);
312 as[1] = str.substr(pos + 1);
313 } else {
314 return false;
315 }
316 if (as[0] == "tcp") {
317 if (as[1].size() > std::to_string(MAX_IP_PORT).size()) {
318 return false;
319 }
320 int port = atoi(as[1].c_str());
321 if (port <= 0 || port > MAX_IP_PORT) {
322 return false;
323 }
324 }
325 return true;
326 }
327
SetupPointContinue(HCtxForward ctx,int status)328 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status)
329 {
330 if (ctx->checkPoint) {
331 // send to active
332 uint8_t flag = status > 0;
333 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1);
334 FreeContext(ctx, 0, false);
335 return true;
336 }
337 if (status < 0) {
338 FreeContext(ctx, 0, true);
339 return false;
340 }
341 // send to active
342 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) {
343 WRITE_LOG(LOG_FATAL, "SetupPointContinue SendToTask failed id:%u", ctx->id);
344 FreeContext(ctx, 0, true);
345 return false;
346 }
347 return DoForwardBegin(ctx);
348 }
349
DetechForwardType(HCtxForward ctxPoint)350 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint)
351 {
352 string &sFType = ctxPoint->localArgs[0];
353 string &sNodeCfg = ctxPoint->localArgs[1];
354 // string to enum
355 if (sFType == "tcp") {
356 ctxPoint->type = FORWARD_TCP;
357 } else if (sFType == "dev") {
358 ctxPoint->type = FORWARD_DEVICE;
359 } else if (sFType == "localabstract") {
360 // daemon shell: /system/bin/socat abstract-listen:linux-abstract -
361 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract
362 // host: hdc fport tcp:8080 localabstract:linux-abstract
363 ctxPoint->type = FORWARD_ABSTRACT;
364 } else if (sFType == "localreserved") {
365 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg;
366 ctxPoint->type = FORWARD_RESERVED;
367 } else if (sFType == "localfilesystem") {
368 sNodeCfg = filesystemSocketPrefix + sNodeCfg;
369 ctxPoint->type = FORWARD_FILESYSTEM;
370 } else if (sFType == "jdwp") {
371 ctxPoint->type = FORWARD_JDWP;
372 } else if (sFType == "ark") {
373 ctxPoint->type = FORWARD_ARK;
374 } else {
375 return false;
376 }
377 return true;
378 }
379
SetupTCPPoint(HCtxForward ctxPoint)380 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint)
381 {
382 string &sNodeCfg = ctxPoint->localArgs[1];
383 int port = atoi(sNodeCfg.c_str());
384 ctxPoint->tcp.data = ctxPoint;
385 uv_tcp_init(loopTask, &ctxPoint->tcp);
386 struct sockaddr_in addr;
387 if (ctxPoint->masterSlave) {
388 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
389 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0);
390 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, UV_LISTEN_LBACKOG, ListenCallback)) {
391 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg;
392 return false;
393 }
394 } else {
395 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
396 uv_connect_t *conn = new(std::nothrow) uv_connect_t();
397 if (conn == nullptr) {
398 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed");
399 return false;
400 }
401 conn->data = ctxPoint;
402 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget);
403 }
404 return true;
405 }
406
SetupDevicePoint(HCtxForward ctxPoint)407 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint)
408 {
409 uint8_t flag = 1;
410 string &sNodeCfg = ctxPoint->localArgs[1];
411 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg);
412 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) {
413 ctxPoint->lastError = "Open unix-dev failed";
414 flag = -1;
415 }
416 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool {
417 HCtxForward ctx = (HCtxForward)a;
418 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c);
419 };
420 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool {
421 HCtxForward ctx = (HCtxForward)a;
422 WRITE_LOG(LOG_DEBUG, "funcFinish id:%u ret:%d reason:%s", ctx->id, b, c.c_str());
423 FreeContext(ctx, 0, true);
424 return false;
425 };
426 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead,
427 funcFinish, true);
428 if (ctxPoint->fdClass == nullptr) {
429 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed");
430 return false;
431 }
432 SetupPointContinue(ctxPoint, flag);
433 return true;
434 }
435
LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)436 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg)
437 {
438 bool abstractRet = false;
439 #ifndef _WIN32
440 int s = 0;
441 do {
442 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
443 break;
444 }
445 fcntl(s, F_SETFD, FD_CLOEXEC);
446 struct sockaddr_un addr;
447 Base::ZeroStruct(addr);
448 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1;
449 addr.sun_family = AF_LOCAL;
450 addr.sun_path[0] = 0;
451
452 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) {
453 break;
454 };
455 struct timeval timeout = { 3, 0 };
456 setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
457 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) {
458 WRITE_LOG(LOG_FATAL, "LocalAbstractConnect failed errno:%d", errno);
459 break;
460 }
461 if (uv_pipe_open(pipe, s)) {
462 break;
463 }
464 abstractRet = true;
465 } while (false);
466 if (!abstractRet) {
467 Base::CloseFd(s);
468 }
469 #endif
470 return abstractRet;
471 }
472
SetupFilePoint(HCtxForward ctxPoint)473 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint)
474 {
475 string &sNodeCfg = ctxPoint->localArgs[1];
476 ctxPoint->pipe.data = ctxPoint;
477 uv_pipe_init(loopTask, &ctxPoint->pipe, 0);
478 if (ctxPoint->masterSlave) {
479 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) {
480 unlink(sNodeCfg.c_str());
481 }
482 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) {
483 ctxPoint->lastError = "Unix pipe bind failed";
484 return false;
485 }
486 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, UV_LISTEN_LBACKOG, ListenCallback)) {
487 ctxPoint->lastError = "Unix pipe listen failed";
488 return false;
489 }
490 } else {
491 if (ctxPoint->type == FORWARD_ABSTRACT) {
492 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg);
493 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1);
494 if (!abstractRet) {
495 ctxPoint->lastError = "LocalAbstractConnect failed";
496 return false;
497 }
498 } else {
499 uv_connect_t *connect = new(std::nothrow) uv_connect_t();
500 if (connect == nullptr) {
501 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed");
502 return false;
503 }
504 connect->data = ctxPoint;
505 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget);
506 }
507 }
508 return true;
509 }
510
SetupPoint(HCtxForward ctxPoint)511 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint)
512 {
513 bool ret = true;
514 if (!DetechForwardType(ctxPoint)) {
515 return false;
516 }
517 switch (ctxPoint->type) {
518 case FORWARD_TCP:
519 if (!SetupTCPPoint(ctxPoint)) {
520 ret = false;
521 };
522 break;
523 #ifndef _WIN32
524 case FORWARD_DEVICE:
525 if (!SetupDevicePoint(ctxPoint)) {
526 ret = false;
527 };
528 break;
529 case FORWARD_JDWP:
530 if (!SetupJdwpPoint(ctxPoint)) {
531 ret = false;
532 };
533 break;
534 case FORWARD_ABSTRACT:
535 case FORWARD_RESERVED:
536 case FORWARD_FILESYSTEM:
537 if (!SetupFilePoint(ctxPoint)) {
538 ret = false;
539 };
540 break;
541 #else
542 case FORWARD_DEVICE:
543 case FORWARD_JDWP:
544 case FORWARD_ABSTRACT:
545 case FORWARD_RESERVED:
546 case FORWARD_FILESYSTEM:
547 ctxPoint->lastError = "Not supoort forward-type";
548 ret = false;
549 break;
550 #endif
551 default:
552 ctxPoint->lastError = "Not supoort forward-type";
553 ret = false;
554 break;
555 }
556 return ret;
557 }
558
BeginForward(const string & command,string & sError)559 bool HdcForwardBase::BeginForward(const string &command, string &sError)
560 {
561 bool ret = false;
562 int argc = 0;
563 char bufString[BUF_SIZE_SMALL] = "";
564 HCtxForward ctxPoint = (HCtxForward)MallocContext(true);
565 if (!ctxPoint) {
566 WRITE_LOG(LOG_FATAL, "MallocContext failed");
567 return false;
568 }
569 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc);
570 if (argv == nullptr) {
571 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed");
572 return false;
573 }
574 while (true) {
575 if (argc < CMD_ARG1_COUNT) {
576 break;
577 }
578 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) {
579 break;
580 }
581 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) {
582 break;
583 }
584 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) {
585 break;
586 }
587 ctxPoint->remoteParamenters = argv[1];
588 if (!SetupPoint(ctxPoint)) {
589 break;
590 }
591
592 ret = true;
593 break;
594 }
595 sError = ctxPoint->lastError;
596 if (ret) {
597 // First 8-byte parameter bit
598 int maxBufSize = sizeof(bufString) - forwardParameterBufSize;
599 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) {
600 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString),
601 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1);
602 taskCommand = command;
603 }
604 }
605 delete[](reinterpret_cast<char *>(argv));
606 return ret;
607 }
608
FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)609 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf)
610 {
611 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE;
612 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn));
613 return true;
614 }
615
SlaveConnect(uint8_t * bufCmd,const int bufSize,bool bCheckPoint,string & sError)616 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, const int bufSize, bool bCheckPoint, string &sError)
617 {
618 if (bufSize <= DWORD_SERIALIZE_SIZE + forwardParameterBufSize) {
619 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward header");
620 return false;
621 }
622 bool ret = false;
623 char *content = nullptr;
624 uint32_t idSlaveOld = 0;
625 HCtxForward ctxPoint = (HCtxForward)MallocContext(false);
626 if (!ctxPoint) {
627 WRITE_LOG(LOG_FATAL, "MallocContext failed");
628 return false;
629 }
630 idSlaveOld = ctxPoint->id;
631 ctxPoint->checkPoint = bCheckPoint;
632 // refresh another id,8byte param
633 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content));
634 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint);
635 content += forwardParameterBufSize;
636 if (!CheckNodeInfo(content, ctxPoint->localArgs)) {
637 WRITE_LOG(LOG_FATAL, "SlaveConnect CheckNodeInfo failed content:%s", content);
638 goto Finish;
639 }
640 if (!DetechForwardType(ctxPoint)) {
641 WRITE_LOG(LOG_FATAL, "SlaveConnect DetechForwardType failed content:%s", content);
642 goto Finish;
643 }
644 WRITE_LOG(LOG_DEBUG, "id:%u type:%d", ctxPoint->id, ctxPoint->type);
645 if (ctxPoint->type == FORWARD_ARK) {
646 if (ctxPoint->checkPoint) {
647 if (!SetupArkPoint(ctxPoint)) {
648 sError = ctxPoint->lastError;
649 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupArkPoint failed content:%s", content);
650 goto Finish;
651 }
652 } else {
653 SetupPointContinue(ctxPoint, 0);
654 }
655 ret = true;
656 } else {
657 if (!ctxPoint->checkPoint) {
658 if (!SetupPoint(ctxPoint)) {
659 sError = ctxPoint->lastError;
660 WRITE_LOG(LOG_FATAL, "SlaveConnect SetupPoint failed content:%s", content);
661 goto Finish;
662 }
663 } else {
664 SetupPointContinue(ctxPoint, 0);
665 }
666 ret = true;
667 }
668 Finish:
669 if (!ret) {
670 FreeContext(ctxPoint, 0, true);
671 }
672 return ret;
673 }
674
DoForwardBegin(HCtxForward ctx)675 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx)
676 {
677 switch (ctx->type) {
678 case FORWARD_TCP:
679 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm
680 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
681 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
682 break;
683 case FORWARD_ARK:
684 WRITE_LOG(LOG_DEBUG, "DoForwardBegin ark socketpair id:%u fds[0]:%d", ctx->id, fds[0]);
685 uv_tcp_init(loopTask, &ctx->tcp);
686 uv_tcp_open(&ctx->tcp, fds[0]);
687 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
688 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
689 break;
690 case FORWARD_ABSTRACT:
691 case FORWARD_RESERVED:
692 case FORWARD_FILESYSTEM:
693 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf);
694 break;
695 case FORWARD_DEVICE: {
696 ctx->fdClass->StartWorkOnThread();
697 break;
698 }
699 default:
700 break;
701 }
702 ctx->ready = true;
703 return true;
704 }
705
AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)706 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput)
707 {
708 ctxPointMutex.lock();
709 void *hRet = nullptr;
710 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint;
711 switch (op) {
712 case OP_ADD:
713 mapCtx[id] = hInput;
714 break;
715 case OP_REMOVE:
716 mapCtx.erase(id);
717 break;
718 case OP_QUERY:
719 if (mapCtx.count(id)) {
720 hRet = mapCtx[id];
721 }
722 break;
723 case OP_UPDATE:
724 mapCtx.erase(id);
725 mapCtx[hInput->id] = hInput;
726 break;
727 default:
728 break;
729 }
730 ctxPointMutex.unlock();
731 return hRet;
732 }
733
SendCallbackForwardBuf(uv_write_t * req,int status)734 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status)
735 {
736 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data;
737 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward);
738 if (status < 0 && !ctx->finish) {
739 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status);
740 ctx->thisClass->FreeContext(ctx, 0, true);
741 }
742 delete[] ctxIO->bufIO;
743 delete ctxIO;
744 delete req;
745 }
746
SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)747 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size)
748 {
749 int nRet = 0;
750 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
751 WRITE_LOG(LOG_WARN, "SendForwardBuf size:%d > HDC_BUF_MAX_BYTES, ctxId:%u", size, ctx->id);
752 return -1;
753 }
754 if (size <= 0) {
755 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d, ctxId:%u", size, ctx->id);
756 return -1;
757 }
758 auto pDynBuf = new(std::nothrow) uint8_t[size];
759 if (!pDynBuf) {
760 WRITE_LOG(LOG_WARN, "SendForwardBuf new DynBuf failed size:%d, ctxId:%u", size, ctx->id);
761 return -1;
762 }
763 (void)memcpy_s(pDynBuf, size, bufPtr, size);
764 if (ctx->type == FORWARD_DEVICE) {
765 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size);
766 } else {
767 auto ctxIO = new ContextForwardIO();
768 if (!ctxIO) {
769 WRITE_LOG(LOG_WARN, "SendForwardBuf new ContextForwardIO failed, ctxId:%u", ctx->id);
770 delete[] pDynBuf;
771 return -1;
772 }
773 ctxIO->ctxForward = ctx;
774 ctxIO->bufIO = pDynBuf;
775 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP || ctx->type == FORWARD_ARK) {
776 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr,
777 (void *)SendCallbackForwardBuf, (void *)ctxIO);
778 } else {
779 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
780 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr,
781 (void *)SendCallbackForwardBuf, (void *)ctxIO);
782 }
783 if (nRet < 0) {
784 WRITE_LOG(LOG_WARN, "SendForwardBuf SendToStreamEx ret:%d, size:%d ctxId:%u type:%d",
785 nRet, size, ctx->id, ctx->type);
786 }
787 }
788 return nRet;
789 }
790
CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)791 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload)
792 {
793 bool ret = true;
794 bool bCheck = static_cast<bool>(payload);
795 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed");
796 if (bCheck) {
797 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|";
798 mapInfo += taskCommand;
799 ctx->ready = true;
800 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())),
801 mapInfo.size() + 1);
802 } else {
803 ret = false;
804 FreeContext(ctx, 0, false);
805 }
806 return ret;
807 }
808
ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)809 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
810 {
811 if (payloadSize <= DWORD_SERIALIZE_SIZE && command != CMD_FORWARD_FREE_CONTEXT
812 && command != CMD_FORWARD_ACTIVE_MASTER) {
813 WRITE_LOG(LOG_FATAL, "Illegal payloadSize, shorter than forward command header");
814 return false;
815 }
816 bool ret = true;
817 uint8_t *pContent = nullptr;
818 int sizeContent = 0;
819 uint32_t id = 0;
820 HCtxForward ctx = nullptr;
821 FilterCommand(payload, &id, &pContent);
822 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE;
823 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
824 WRITE_LOG(LOG_WARN, "Query id:%u failed", id);
825 return true;
826 }
827 switch (command) {
828 case CMD_FORWARD_CHECK_RESULT: {
829 ret = CommandForwardCheckResult(ctx, pContent);
830 break;
831 }
832 case CMD_FORWARD_ACTIVE_MASTER: {
833 ret = DoForwardBegin(ctx);
834 break;
835 }
836 case CMD_FORWARD_DATA: {
837 if (ctx->finish) {
838 break;
839 }
840 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) {
841 WRITE_LOG(LOG_WARN, "ForwardCommandDispatch SendForwardBuf rc < 0, ctxid:%u", ctx->id);
842 FreeContext(ctx, 0, true);
843 }
844 break;
845 }
846 case CMD_FORWARD_FREE_CONTEXT: {
847 FreeContext(ctx, 0, false);
848 break;
849 }
850 default:
851 ret = false;
852 break;
853 }
854 if (!ret) {
855 if (ctx) {
856 FreeContext(ctx, 0, true);
857 } else {
858 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free");
859 TaskFinish();
860 }
861 }
862 return ret;
863 }
864
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)865 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
866 {
867 if (command != CMD_FORWARD_DATA) {
868 WRITE_LOG(LOG_WARN, "CommandDispatch command:%d payloadSize:%d", command, payloadSize);
869 }
870 bool ret = true;
871 string sError;
872 // prepare
873 if (command == CMD_FORWARD_INIT) {
874 string strCommand(reinterpret_cast<char *>(payload), payloadSize);
875 if (!BeginForward(strCommand, sError)) {
876 ret = false;
877 goto Finish;
878 }
879 return true;
880 } else if (command == CMD_FORWARD_CHECK) {
881 // Detect remote if it's reachable
882 if (!SlaveConnect(payload, payloadSize, true, sError)) {
883 ret = false;
884 goto Finish;
885 }
886 return true;
887 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) {
888 // slave connect target port when activating
889 if (!SlaveConnect(payload, payloadSize, false, sError)) {
890 ret = false;
891 goto Finish;
892 }
893 return true;
894 }
895 if (!ForwardCommandDispatch(command, payload, payloadSize)) {
896 ret = false;
897 goto Finish;
898 }
899 Finish:
900 if (!ret) {
901 if (!sError.size()) {
902 LogMsg(MSG_FAIL, "Forward parament failed");
903 } else {
904 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str()));
905 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str()));
906 }
907 }
908 return ret;
909 }
910 } // namespace Hdc
911