1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "forward.h"
16 #include "base.h"
17
18 namespace Hdc {
HdcForwardBase(HTaskInfo hTaskInfo)19 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo)
20 : HdcTaskBase(hTaskInfo)
21 {
22 }
23
~HdcForwardBase()24 HdcForwardBase::~HdcForwardBase()
25 {
26 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase");
27 };
28
ReadyForRelease()29 bool HdcForwardBase::ReadyForRelease()
30 {
31 if (!HdcTaskBase::ReadyForRelease()) {
32 return false;
33 }
34 return true;
35 }
36
StopTask()37 void HdcForwardBase::StopTask()
38 {
39 ctxPointMutex.lock();
40 vector<HCtxForward> ctxs;
41 map<uint32_t, HCtxForward>::iterator iter;
42 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) {
43 HCtxForward ctx = iter->second;
44 ctxs.push_back(ctx);
45 }
46 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other.
47 mapCtxPoint.clear();
48 ctxPointMutex.unlock();
49 for (auto ctx: ctxs) {
50 FreeContext(ctx, 0, false);
51 }
52 }
53
OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)54 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client)
55 {
56 HCtxForward ctxListen = (HCtxForward)server->data;
57 char buf[BUF_SIZE_DEFAULT] = { 0 };
58 bool ret = false;
59 while (true) {
60 if (uv_accept(server, client)) {
61 break;
62 }
63 ctxClient->type = ctxListen->type;
64 ctxClient->remoteParamenters = ctxListen->remoteParamenters;
65 int maxSize = sizeof(buf) - FORWARD_PARAMENTER_BUFSIZE;
66 // clang-format off
67 if (snprintf_s(buf + FORWARD_PARAMENTER_BUFSIZE, maxSize, maxSize - 1, "%s",
68 ctxClient->remoteParamenters.c_str()) < 0) {
69 break;
70 }
71 // clang-format on
72 // pre 8bytes preserve for param bits
73 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, (uint8_t *)buf,
74 strlen(buf + FORWARD_PARAMENTER_BUFSIZE) + 9);
75 ret = true;
76 break;
77 }
78 if (!ret) {
79 FreeContext(ctxClient, 0, false);
80 }
81 }
82
ListenCallback(uv_stream_t * server,const int status)83 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status)
84 {
85 HCtxForward ctxListen = (HCtxForward)server->data;
86 HdcForwardBase *thisClass = ctxListen->thisClass;
87 uv_stream_t *client = nullptr;
88
89 if (status == -1 || !ctxListen->ready) {
90 thisClass->FreeContext(ctxListen, 0, false);
91 thisClass->TaskFinish();
92 return;
93 }
94 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true);
95 if (!ctxClient) {
96 return;
97 }
98 if (FORWARD_TCP == ctxListen->type) {
99 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp);
100 client = (uv_stream_t *)&ctxClient->tcp;
101 } else {
102 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
103 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0);
104 client = (uv_stream_t *)&ctxClient->pipe;
105 }
106 thisClass->OnAccept(server, ctxClient, client);
107 }
108
MallocContext(bool masterSlave)109 void *HdcForwardBase::MallocContext(bool masterSlave)
110 {
111 HCtxForward ctx = nullptr;
112 if ((ctx = new ContextForward()) == nullptr) {
113 return nullptr;
114 }
115 ctx->id = Base::GetRuntimeMSec();
116 ctx->masterSlave = masterSlave;
117 ctx->thisClass = this;
118 ctx->fdClass = nullptr;
119 ctx->tcp.data = ctx;
120 ctx->pipe.data = ctx;
121 AdminContext(OP_ADD, ctx->id, ctx);
122 refCount++;
123 return ctx;
124 }
125
FreeContextCallBack(HCtxForward ctx)126 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx)
127 {
128 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) {
129 HCtxForward ctx = (HCtxForward)data;
130 AdminContext(OP_REMOVE, ctx->id, nullptr);
131 if (ctx != nullptr) {
132 delete ctx;
133 ctx = nullptr;
134 }
135 if (refCount > 0) {
136 --refCount;
137 }
138 });
139 }
140
FreeJDWP(HCtxForward ctx)141 void HdcForwardBase::FreeJDWP(HCtxForward ctx)
142 {
143 if (ctx->fd > 0) {
144 close(ctx->fd);
145 }
146 if (ctx->fdClass) {
147 ctx->fdClass->StopWork(false, nullptr);
148
149 auto funcReqClose = [](uv_idle_t *handle) -> void {
150 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
151 HCtxForward ctx = (HCtxForward)handle->data;
152 ctx->thisClass->FreeContextCallBack(ctx);
153 delete (uv_idle_t *)handle;
154 };
155 HCtxForward ctx = (HCtxForward)handle->data;
156 if (ctx->fdClass->ReadyForRelease()) {
157 delete ctx->fdClass;
158 ctx->fdClass = nullptr;
159 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
160 }
161 };
162 Base::IdleUvTask(loopTask, ctx, funcReqClose);
163 }
164 }
165
FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)166 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote)
167 {
168 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d", id, bNotifyRemote);
169 HCtxForward ctx = nullptr;
170 if (!ctxIn) {
171 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
172 WRITE_LOG(LOG_DEBUG, "Query id failed");
173 return;
174 }
175 } else {
176 ctx = ctxIn;
177 }
178 if (ctx->finish) {
179 return;
180 }
181 if (bNotifyRemote) {
182 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0);
183 }
184 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void {
185 HCtxForward ctx = (HCtxForward)handle->data;
186 ctx->thisClass->FreeContextCallBack(ctx);
187 };
188 switch (ctx->type) {
189 case FORWARD_TCP:
190 case FORWARD_JDWP:
191 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose);
192 break;
193 case FORWARD_ABSTRACT:
194 case FORWARD_RESERVED:
195 case FORWARD_FILESYSTEM:
196 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose);
197 break;
198 case FORWARD_DEVICE: {
199 FreeJDWP(ctx);
200 break;
201 }
202 default:
203 break;
204 }
205 ctx->finish = true;
206 }
207
SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)208 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize)
209 {
210 bool ret = false;
211 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO
212 if (bufSize > Base::GetMaxBufSize() * 2) {
213 return false;
214 }
215 auto newBuf = new uint8_t[bufSize + 4];
216 if (!newBuf) {
217 return false;
218 }
219 *(uint32_t *)(newBuf) = htonl(cid);
220 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + 4, bufSize, bufPtr, bufSize) != EOK) {
221 delete[] newBuf;
222 return false;
223 }
224 ret = SendToAnother(command, newBuf, bufSize + 4);
225 delete[] newBuf;
226 return ret;
227 }
228
229 // Forward flow is small and frequency is fast
AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)230 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf)
231 {
232 const uint16_t size = 1492 - 256; // For layer 3, the default MTU is 1492 bytes. reserve hdc header 256 bytes
233 buf->base = (char *)new char[size];
234 if (buf->base) {
235 buf->len = size - 1;
236 } else {
237 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null");
238 }
239 }
240
ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)241 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
242 {
243 HCtxForward ctx = (HCtxForward)stream->data;
244 if (nread < 0) {
245 ctx->thisClass->FreeContext(ctx, 0, true);
246 delete[] buf->base;
247 return;
248 }
249 if (nread == 0) {
250 delete[] buf->base;
251 return;
252 }
253 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread);
254 // clear
255 delete[] buf->base;
256 }
257
ConnectTarget(uv_connect_t * connection,int status)258 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status)
259 {
260 HCtxForward ctx = (HCtxForward)connection->data;
261 HdcForwardBase *thisClass = ctx->thisClass;
262 delete connection;
263 if (status < 0) {
264 constexpr int bufSize = 1024;
265 char buf[bufSize] = { 0 };
266 uv_err_name_r(status, buf, bufSize);
267 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf);
268 }
269 thisClass->SetupPointContinue(ctx, status);
270 }
271
CheckNodeInfo(const char * nodeInfo,string as[2])272 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2])
273 {
274 char bufString[BUF_SIZE_MEDIUM];
275 if (!strchr(nodeInfo, ':')) {
276 return false;
277 }
278 if (EOK != strcpy_s(bufString, sizeof(bufString), nodeInfo)) {
279 return false;
280 }
281 if (*strchr(bufString, ':')) {
282 *strchr(bufString, ':') = '\0';
283 } else {
284 return false;
285 }
286 as[0] = bufString;
287 as[1] = bufString + strlen(bufString) + 1;
288 if (as[0].size() > BUF_SIZE_SMALL || as[1].size() > BUF_SIZE_SMALL) {
289 return false;
290 }
291 if (as[0] == "tcp") {
292 int port = atoi(as[1].c_str());
293 if (port <= 0 || port > MAX_IP_PORT) {
294 return false;
295 }
296 }
297 return true;
298 }
299
SetupPointContinue(HCtxForward ctx,int status)300 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status)
301 {
302 if (ctx->checkPoint) {
303 // send to active
304 uint8_t flag = status > 0;
305 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1);
306 FreeContext(ctx, 0, false);
307 return true;
308 }
309 if (status < 0) {
310 FreeContext(ctx, 0, true);
311 return false;
312 }
313 // send to active
314 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) {
315 FreeContext(ctx, 0, true);
316 return false;
317 }
318 return DoForwardBegin(ctx);
319 }
320
DetechForwardType(HCtxForward ctxPoint)321 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint)
322 {
323 string &sFType = ctxPoint->localArgs[0];
324 string &sNodeCfg = ctxPoint->localArgs[1];
325 // string to enum
326 if (sFType == "tcp") {
327 ctxPoint->type = FORWARD_TCP;
328 } else if (sFType == "dev") {
329 ctxPoint->type = FORWARD_DEVICE;
330 } else if (sFType == "localabstract") {
331 // daemon shell: /system/bin/socat abstract-listen:linux-abstract -
332 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract
333 // host: hdc_std fport tcp:8080 localabstract:linux-abstract
334 ctxPoint->type = FORWARD_ABSTRACT;
335 } else if (sFType == "localreserved") {
336 sNodeCfg = HARMONY_RESERVED_SOCKET_PREFIX + sNodeCfg;
337 ctxPoint->type = FORWARD_RESERVED;
338 } else if (sFType == "localfilesystem") {
339 sNodeCfg = FILESYSTEM_SOCKET_PREFIX + sNodeCfg;
340 ctxPoint->type = FORWARD_FILESYSTEM;
341 } else if (sFType == "jdwp") {
342 ctxPoint->type = FORWARD_JDWP;
343 } else {
344 return false;
345 }
346 return true;
347 }
348
SetupTCPPoint(HCtxForward ctxPoint)349 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint)
350 {
351 string &sNodeCfg = ctxPoint->localArgs[1];
352 int port = atoi(sNodeCfg.c_str());
353 ctxPoint->tcp.data = ctxPoint;
354 uv_tcp_init(loopTask, &ctxPoint->tcp);
355 struct sockaddr_in addr;
356 if (ctxPoint->masterSlave) {
357 uv_ip4_addr("0.0.0.0", port, &addr); // loop interface
358 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0);
359 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, 4, ListenCallback)) {
360 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg;
361 return false;
362 }
363 } else {
364 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
365 uv_connect_t *conn = new(std::nothrow) uv_connect_t();
366 if (conn == nullptr) {
367 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed");
368 return false;
369 }
370 conn->data = ctxPoint;
371 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget);
372 }
373 return true;
374 }
375
SetupDevicePoint(HCtxForward ctxPoint)376 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint)
377 {
378 uint8_t flag = 1;
379 string &sNodeCfg = ctxPoint->localArgs[1];
380 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg);
381 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) {
382 ctxPoint->lastError = "Open unix-dev failed";
383 flag = -1;
384 }
385 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool {
386 HCtxForward ctx = (HCtxForward)a;
387 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c);
388 };
389 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool {
390 HCtxForward ctx = (HCtxForward)a;
391 WRITE_LOG(LOG_DEBUG, "Error ReadForwardBuf dev,ret:%d reson:%s", b, c.c_str());
392 FreeContext(ctx, 0, true);
393 return false;
394 };
395 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead, funcFinish);
396 if (ctxPoint->fdClass == nullptr) {
397 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed");
398 return false;
399 }
400 SetupPointContinue(ctxPoint, flag);
401 return true;
402 }
403
LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)404 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg)
405 {
406 bool abstractRet = false;
407 #ifndef _WIN32
408 int s = 0;
409 do {
410 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
411 break;
412 }
413 fcntl(s, F_SETFD, FD_CLOEXEC);
414 struct sockaddr_un addr;
415 Base::ZeroStruct(addr);
416 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1;
417 addr.sun_family = AF_LOCAL;
418 addr.sun_path[0] = 0;
419
420 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) {
421 break;
422 };
423 // local connect, ignore timeout
424 if (connect(s, (struct sockaddr *)&addr, addrLen) < 0) {
425 break;
426 }
427 if (uv_pipe_open(pipe, s)) {
428 break;
429 }
430 abstractRet = true;
431 } while (false);
432 if (!abstractRet && s > 0) {
433 close(s);
434 }
435 #endif
436 return abstractRet;
437 }
438
SetupFilePoint(HCtxForward ctxPoint)439 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint)
440 {
441 string &sNodeCfg = ctxPoint->localArgs[1];
442 ctxPoint->pipe.data = ctxPoint;
443 uv_pipe_init(loopTask, &ctxPoint->pipe, 0);
444 if (ctxPoint->masterSlave) {
445 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) {
446 unlink(sNodeCfg.c_str());
447 }
448 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) {
449 ctxPoint->lastError = "Unix pipe bind failed";
450 return false;
451 }
452 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, 4, ListenCallback)) {
453 ctxPoint->lastError = "Unix pipe listen failed";
454 return false;
455 }
456 } else {
457 if (ctxPoint->type == FORWARD_ABSTRACT) {
458 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg);
459 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1);
460 if (!abstractRet) {
461 ctxPoint->lastError = "LocalAbstractConnect failed";
462 return false;
463 }
464 } else {
465 uv_connect_t *connect = new(std::nothrow) uv_connect_t();
466 if (connect == nullptr) {
467 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed");
468 return false;
469 }
470 connect->data = ctxPoint;
471 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget);
472 }
473 }
474 return true;
475 }
476
SetupPoint(HCtxForward ctxPoint)477 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint)
478 {
479 bool ret = true;
480 if (!DetechForwardType(ctxPoint)) {
481 return false;
482 }
483 switch (ctxPoint->type) {
484 case FORWARD_TCP:
485 if (!SetupTCPPoint(ctxPoint)) {
486 ret = false;
487 };
488 break;
489 #ifndef _WIN32
490 case FORWARD_DEVICE:
491 if (!SetupDevicePoint(ctxPoint)) {
492 ret = false;
493 };
494 break;
495 case FORWARD_JDWP:
496 if (!SetupJdwpPoint(ctxPoint)) {
497 ret = false;
498 };
499 break;
500 case FORWARD_ABSTRACT:
501 case FORWARD_RESERVED:
502 case FORWARD_FILESYSTEM:
503 if (!SetupFilePoint(ctxPoint)) {
504 ret = false;
505 };
506 break;
507 #else
508 case FORWARD_DEVICE:
509 case FORWARD_JDWP:
510 case FORWARD_ABSTRACT:
511 case FORWARD_RESERVED:
512 case FORWARD_FILESYSTEM:
513 ctxPoint->lastError = "Not supoort forward-type";
514 ret = false;
515 break;
516 #endif
517 default:
518 ctxPoint->lastError = "Not supoort forward-type";
519 ret = false;
520 break;
521 }
522 return ret;
523 }
524
BeginForward(const string & command,string & sError)525 bool HdcForwardBase::BeginForward(const string &command, string &sError)
526 {
527 bool ret = false;
528 int argc = 0;
529 char bufString[BUF_SIZE_SMALL] = "";
530 HCtxForward ctxPoint = (HCtxForward)MallocContext(true);
531 if (!ctxPoint) {
532 WRITE_LOG(LOG_FATAL, "MallocContext failed");
533 return false;
534 }
535 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc);
536 if (argv == nullptr) {
537 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed");
538 return false;
539 }
540 while (true) {
541 if (argc < CMD_ARG1_COUNT) {
542 break;
543 }
544 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) {
545 break;
546 }
547 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) {
548 break;
549 }
550 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) {
551 break;
552 }
553 ctxPoint->remoteParamenters = argv[1];
554 if (!SetupPoint(ctxPoint)) {
555 break;
556 }
557
558 ret = true;
559 break;
560 }
561 sError = ctxPoint->lastError;
562 if (ret) {
563 // First 8-byte parameter bit
564 int maxBufSize = sizeof(bufString) - FORWARD_PARAMENTER_BUFSIZE;
565 if (snprintf_s(bufString + FORWARD_PARAMENTER_BUFSIZE, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) {
566 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, (uint8_t *)bufString,
567 FORWARD_PARAMENTER_BUFSIZE + strlen(bufString + FORWARD_PARAMENTER_BUFSIZE) + 1);
568 taskCommand = command;
569 }
570 }
571 delete[]((char *)argv);
572 return ret;
573 }
574
FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)575 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf)
576 {
577 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE;
578 *idOut = ntohl(*(uint32_t *)bufCmdIn);
579 return true;
580 }
581
SlaveConnect(uint8_t * bufCmd,bool bCheckPoint,string & sError)582 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, bool bCheckPoint, string &sError)
583 {
584 bool ret = false;
585 char *content = nullptr;
586 uint32_t idSlaveOld = 0;
587 HCtxForward ctxPoint = (HCtxForward)MallocContext(false);
588 if (!ctxPoint) {
589 WRITE_LOG(LOG_FATAL, "MallocContext failed");
590 return false;
591 }
592 idSlaveOld = ctxPoint->id;
593 ctxPoint->checkPoint = bCheckPoint;
594 // refresh another id,8byte param
595 FilterCommand(bufCmd, &ctxPoint->id, (uint8_t **)&content);
596 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint);
597 content += FORWARD_PARAMENTER_BUFSIZE;
598 if (!CheckNodeInfo(content, ctxPoint->localArgs)) {
599 return false;
600 }
601 if ((ctxPoint->checkPoint && slaveCheckWhenBegin) || !ctxPoint->checkPoint) {
602 if (!SetupPoint(ctxPoint)) {
603 WRITE_LOG(LOG_FATAL, "SetupPoint failed");
604 goto Finish;
605 }
606 sError = ctxPoint->lastError;
607 } else {
608 SetupPointContinue(ctxPoint, 0);
609 }
610 ret = true;
611 Finish:
612 if (!ret) {
613 FreeContext(ctxPoint, 0, true);
614 }
615 return ret;
616 }
617
DoForwardBegin(HCtxForward ctx)618 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx)
619 {
620 switch (ctx->type) {
621 case FORWARD_TCP:
622 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm
623 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
624 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
625 break;
626 case FORWARD_ABSTRACT:
627 case FORWARD_RESERVED:
628 case FORWARD_FILESYSTEM:
629 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf);
630 break;
631 case FORWARD_DEVICE: {
632 ctx->fdClass->StartWork();
633 break;
634 }
635 default:
636 break;
637 }
638 ctx->ready = true;
639 return true;
640 }
641
AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)642 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput)
643 {
644 ctxPointMutex.lock();
645 void *hRet = nullptr;
646 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint;
647 switch (op) {
648 case OP_ADD:
649 mapCtx[id] = hInput;
650 break;
651 case OP_REMOVE:
652 mapCtx.erase(id);
653 break;
654 case OP_QUERY:
655 if (mapCtx.count(id)) {
656 hRet = mapCtx[id];
657 }
658 break;
659 case OP_UPDATE:
660 mapCtx.erase(id);
661 mapCtx[hInput->id] = hInput;
662 break;
663 default:
664 break;
665 }
666 ctxPointMutex.unlock();
667 return hRet;
668 }
669
SendCallbackForwardBuf(uv_write_t * req,int status)670 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status)
671 {
672 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data;
673 HCtxForward ctx = (HCtxForward)ctxIO->ctxForward;
674 if (status < 0 && !ctx->finish) {
675 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status);
676 ctx->thisClass->FreeContext(ctx, 0, true);
677 }
678 delete[] ctxIO->bufIO;
679 delete ctxIO;
680 delete req;
681 }
682
SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)683 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size)
684 {
685 int nRet = 0;
686 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
687 return -1;
688 }
689 if (size <= 0) {
690 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d", size);
691 return -1;
692 }
693 auto pDynBuf = new uint8_t[size];
694 if (!pDynBuf) {
695 return -1;
696 }
697 (void)memcpy_s(pDynBuf, size, bufPtr, size);
698 if (FORWARD_DEVICE == ctx->type) {
699 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size);
700 } else {
701 auto ctxIO = new ContextForwardIO();
702 if (!ctxIO) {
703 delete[] pDynBuf;
704 return -1;
705 }
706 ctxIO->ctxForward = ctx;
707 ctxIO->bufIO = pDynBuf;
708 if (FORWARD_TCP == ctx->type || FORWARD_JDWP == ctx->type) {
709 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr,
710 (void *)SendCallbackForwardBuf, (void *)ctxIO);
711 } else {
712 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
713 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr,
714 (void *)SendCallbackForwardBuf, (void *)ctxIO);
715 }
716 }
717 return nRet;
718 }
719
CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)720 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload)
721 {
722 bool ret = true;
723 bool bCheck = (bool)payload;
724 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed");
725 if (bCheck) {
726 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|";
727 mapInfo += taskCommand;
728 ctx->ready = true;
729 ServerCommand(CMD_FORWARD_SUCCESS, (uint8_t *)mapInfo.c_str(), mapInfo.size() + 1);
730 } else {
731 ret = false;
732 FreeContext(ctx, 0, false);
733 }
734 return ret;
735 }
736
ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)737 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
738 {
739 bool ret = true;
740 uint8_t *pContent = nullptr;
741 int sizeContent = 0;
742 uint32_t id = 0;
743 HCtxForward ctx = nullptr;
744 FilterCommand(payload, &id, &pContent);
745 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE;
746 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
747 WRITE_LOG(LOG_WARN, "Query id failed");
748 return false;
749 }
750 switch (command) {
751 case CMD_FORWARD_CHECK_RESULT: {
752 ret = CommandForwardCheckResult(ctx, payload);
753 break;
754 }
755 case CMD_FORWARD_ACTIVE_MASTER: {
756 ret = DoForwardBegin(ctx);
757 break;
758 }
759 case CMD_FORWARD_DATA: {
760 if (ctx->finish) {
761 break;
762 }
763 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) {
764 FreeContext(ctx, 0, true);
765 }
766 break;
767 }
768 case CMD_FORWARD_FREE_CONTEXT: {
769 FreeContext(ctx, 0, false);
770 break;
771 }
772 default:
773 ret = false;
774 break;
775 }
776 if (!ret) {
777 if (ctx) {
778 FreeContext(ctx, 0, true);
779 } else {
780 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free");
781 TaskFinish();
782 }
783 }
784 return ret;
785 }
786
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)787 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
788 {
789 bool ret = true;
790 string sError;
791 // prepare
792 if (CMD_FORWARD_INIT == command) {
793 string strCommand(reinterpret_cast<char *>(payload), payloadSize);
794 if (!BeginForward(strCommand, sError)) {
795 ret = false;
796 goto Finish;
797 }
798 return true;
799 } else if (CMD_FORWARD_CHECK == command) {
800 // Detect remote if it's reachable
801 if (!SlaveConnect(payload, true, sError)) {
802 ret = false;
803 goto Finish;
804 }
805 return true;
806 } else if (CMD_FORWARD_ACTIVE_SLAVE == command) {
807 // slave connect target port when activating
808 if (!SlaveConnect(payload, false, sError)) {
809 ret = false;
810 goto Finish;
811 }
812 return true;
813 }
814 if (!ForwardCommandDispatch(command, payload, payloadSize)) {
815 ret = false;
816 goto Finish;
817 }
818 Finish:
819 if (!ret) {
820 if (!sError.size()) {
821 LogMsg(MSG_FAIL, "Forward parament failed");
822 } else {
823 LogMsg(MSG_FAIL, (char *)sError.c_str());
824 WRITE_LOG(LOG_WARN, (char *)sError.c_str());
825 }
826 }
827 return ret;
828 }
829 } // namespace Hdc
830