1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #include "forward.h" 16 #include "base.h" 17 18 namespace Hdc { HdcForwardBase(HTaskInfo hTaskInfo)19 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo) 20 : HdcTaskBase(hTaskInfo) 21 { 22 } 23 ~HdcForwardBase()24 HdcForwardBase::~HdcForwardBase() 25 { 26 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase"); 27 }; 28 ReadyForRelease()29 bool HdcForwardBase::ReadyForRelease() 30 { 31 if (!HdcTaskBase::ReadyForRelease()) { 32 return false; 33 } 34 return true; 35 } 36 StopTask()37 void HdcForwardBase::StopTask() 38 { 39 ctxPointMutex.lock(); 40 vector<HCtxForward> ctxs; 41 map<uint32_t, HCtxForward>::iterator iter; 42 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) { 43 HCtxForward ctx = iter->second; 44 ctxs.push_back(ctx); 45 } 46 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other. 47 mapCtxPoint.clear(); 48 ctxPointMutex.unlock(); 49 for (auto ctx: ctxs) { 50 FreeContext(ctx, 0, false); 51 } 52 } 53 OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)54 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client) 55 { 56 HCtxForward ctxListen = (HCtxForward)server->data; 57 char buf[BUF_SIZE_DEFAULT] = { 0 }; 58 bool ret = false; 59 while (true) { 60 if (uv_accept(server, client)) { 61 break; 62 } 63 ctxClient->type = ctxListen->type; 64 ctxClient->remoteParamenters = ctxListen->remoteParamenters; 65 int maxSize = sizeof(buf) - forwardParameterBufSize; 66 // clang-format off 67 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s", 68 ctxClient->remoteParamenters.c_str()) < 0) { 69 break; 70 } 71 // clang-format on 72 // pre 8bytes preserve for param bits 73 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf), 74 strlen(buf + forwardParameterBufSize) + 9); 75 ret = true; 76 break; 77 } 78 if (!ret) { 79 FreeContext(ctxClient, 0, false); 80 } 81 } 82 ListenCallback(uv_stream_t * server,const int status)83 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status) 84 { 85 HCtxForward ctxListen = (HCtxForward)server->data; 86 HdcForwardBase *thisClass = ctxListen->thisClass; 87 uv_stream_t *client = nullptr; 88 89 if (status == -1 || !ctxListen->ready) { 90 thisClass->FreeContext(ctxListen, 0, false); 91 thisClass->TaskFinish(); 92 return; 93 } 94 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true); 95 if (!ctxClient) { 96 return; 97 } 98 if (ctxListen->type == FORWARD_TCP) { 99 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp); 100 client = (uv_stream_t *)&ctxClient->tcp; 101 } else { 102 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM, 103 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0); 104 client = (uv_stream_t *)&ctxClient->pipe; 105 } 106 thisClass->OnAccept(server, ctxClient, client); 107 } 108 MallocContext(bool masterSlave)109 void *HdcForwardBase::MallocContext(bool masterSlave) 110 { 111 HCtxForward ctx = nullptr; 112 if ((ctx = new ContextForward()) == nullptr) { 113 return nullptr; 114 } 115 ctx->id = Base::GetRuntimeMSec(); 116 ctx->masterSlave = masterSlave; 117 ctx->thisClass = this; 118 ctx->fdClass = nullptr; 119 ctx->tcp.data = ctx; 120 ctx->pipe.data = ctx; 121 AdminContext(OP_ADD, ctx->id, ctx); 122 refCount++; 123 return ctx; 124 } 125 FreeContextCallBack(HCtxForward ctx)126 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx) 127 { 128 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) { 129 HCtxForward ctx = (HCtxForward)data; 130 AdminContext(OP_REMOVE, ctx->id, nullptr); 131 if (ctx != nullptr) { 132 delete ctx; 133 ctx = nullptr; 134 } 135 if (refCount > 0) { 136 --refCount; 137 } 138 }); 139 } 140 FreeJDWP(HCtxForward ctx)141 void HdcForwardBase::FreeJDWP(HCtxForward ctx) 142 { 143 Base::CloseFd(ctx->fd); 144 if (ctx->fdClass) { 145 ctx->fdClass->StopWork(false, nullptr); 146 147 auto funcReqClose = [](uv_idle_t *handle) -> void { 148 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void { 149 HCtxForward ctx = (HCtxForward)handle->data; 150 ctx->thisClass->FreeContextCallBack(ctx); 151 delete (uv_idle_t *)handle; 152 }; 153 HCtxForward context = (HCtxForward)handle->data; 154 if (context->fdClass->ReadyForRelease()) { 155 delete context->fdClass; 156 context->fdClass = nullptr; 157 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose); 158 } 159 }; 160 Base::IdleUvTask(loopTask, ctx, funcReqClose); 161 } 162 } 163 FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)164 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote) 165 { 166 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d", id, bNotifyRemote); 167 HCtxForward ctx = nullptr; 168 if (!ctxIn) { 169 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) { 170 WRITE_LOG(LOG_DEBUG, "Query id failed"); 171 return; 172 } 173 } else { 174 ctx = ctxIn; 175 } 176 if (ctx->finish) { 177 return; 178 } 179 if (bNotifyRemote) { 180 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0); 181 } 182 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void { 183 HCtxForward ctx = (HCtxForward)handle->data; 184 ctx->thisClass->FreeContextCallBack(ctx); 185 }; 186 switch (ctx->type) { 187 case FORWARD_TCP: 188 case FORWARD_JDWP: 189 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose); 190 break; 191 case FORWARD_ABSTRACT: 192 case FORWARD_RESERVED: 193 case FORWARD_FILESYSTEM: 194 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose); 195 break; 196 case FORWARD_DEVICE: { 197 FreeJDWP(ctx); 198 break; 199 } 200 default: 201 break; 202 } 203 ctx->finish = true; 204 } 205 SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)206 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize) 207 { 208 bool ret = false; 209 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO 210 if (bufSize > Base::GetMaxBufSize() * 2) { 211 return false; 212 } 213 auto newBuf = new uint8_t[bufSize + 4]; 214 if (!newBuf) { 215 return false; 216 } 217 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid); 218 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + 4, bufSize, bufPtr, bufSize) != EOK) { 219 delete[] newBuf; 220 return false; 221 } 222 ret = SendToAnother(command, newBuf, bufSize + 4); 223 delete[] newBuf; 224 return ret; 225 } 226 227 // Forward flow is small and frequency is fast AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)228 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf) 229 { 230 const uint16_t size = 1492 - 256; // For layer 3, the default MTU is 1492 bytes. reserve hdc header 256 bytes 231 buf->base = (char *)new char[size]; 232 if (buf->base) { 233 buf->len = size - 1; 234 } else { 235 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null"); 236 } 237 } 238 ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)239 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) 240 { 241 HCtxForward ctx = (HCtxForward)stream->data; 242 if (nread < 0) { 243 ctx->thisClass->FreeContext(ctx, 0, true); 244 delete[] buf->base; 245 return; 246 } 247 if (nread == 0) { 248 delete[] buf->base; 249 return; 250 } 251 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread); 252 // clear 253 delete[] buf->base; 254 } 255 ConnectTarget(uv_connect_t * connection,int status)256 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status) 257 { 258 HCtxForward ctx = (HCtxForward)connection->data; 259 HdcForwardBase *thisClass = ctx->thisClass; 260 delete connection; 261 if (status < 0) { 262 constexpr int bufSize = 1024; 263 char buf[bufSize] = { 0 }; 264 uv_err_name_r(status, buf, bufSize); 265 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf); 266 } 267 thisClass->SetupPointContinue(ctx, status); 268 } 269 CheckNodeInfo(const char * nodeInfo,string as[2])270 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2]) 271 { 272 char bufString[BUF_SIZE_MEDIUM]; 273 if (!strchr(nodeInfo, ':')) { 274 return false; 275 } 276 if (EOK != strcpy_s(bufString, sizeof(bufString), nodeInfo)) { 277 return false; 278 } 279 if (*strchr(bufString, ':')) { 280 *strchr(bufString, ':') = '\0'; 281 } else { 282 return false; 283 } 284 as[0] = bufString; 285 as[1] = bufString + strlen(bufString) + 1; 286 if (as[0].size() > BUF_SIZE_SMALL || as[1].size() > BUF_SIZE_SMALL) { 287 return false; 288 } 289 if (as[0] == "tcp") { 290 int port = atoi(as[1].c_str()); 291 if (port <= 0 || port > MAX_IP_PORT) { 292 return false; 293 } 294 } 295 return true; 296 } 297 SetupPointContinue(HCtxForward ctx,int status)298 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status) 299 { 300 if (ctx->checkPoint) { 301 // send to active 302 uint8_t flag = status > 0; 303 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1); 304 FreeContext(ctx, 0, false); 305 return true; 306 } 307 if (status < 0) { 308 FreeContext(ctx, 0, true); 309 return false; 310 } 311 // send to active 312 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) { 313 FreeContext(ctx, 0, true); 314 return false; 315 } 316 return DoForwardBegin(ctx); 317 } 318 DetechForwardType(HCtxForward ctxPoint)319 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint) 320 { 321 string &sFType = ctxPoint->localArgs[0]; 322 string &sNodeCfg = ctxPoint->localArgs[1]; 323 // string to enum 324 if (sFType == "tcp") { 325 ctxPoint->type = FORWARD_TCP; 326 } else if (sFType == "dev") { 327 ctxPoint->type = FORWARD_DEVICE; 328 } else if (sFType == "localabstract") { 329 // daemon shell: /system/bin/socat abstract-listen:linux-abstract - 330 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract 331 // host: hdc fport tcp:8080 localabstract:linux-abstract 332 ctxPoint->type = FORWARD_ABSTRACT; 333 } else if (sFType == "localreserved") { 334 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg; 335 ctxPoint->type = FORWARD_RESERVED; 336 } else if (sFType == "localfilesystem") { 337 sNodeCfg = filesystemSocketPrefix + sNodeCfg; 338 ctxPoint->type = FORWARD_FILESYSTEM; 339 } else if (sFType == "jdwp") { 340 ctxPoint->type = FORWARD_JDWP; 341 } else { 342 return false; 343 } 344 return true; 345 } 346 SetupTCPPoint(HCtxForward ctxPoint)347 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint) 348 { 349 string &sNodeCfg = ctxPoint->localArgs[1]; 350 int port = atoi(sNodeCfg.c_str()); 351 ctxPoint->tcp.data = ctxPoint; 352 uv_tcp_init(loopTask, &ctxPoint->tcp); 353 struct sockaddr_in addr; 354 if (ctxPoint->masterSlave) { 355 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface 356 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0); 357 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, 4, ListenCallback)) { 358 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg; 359 return false; 360 } 361 } else { 362 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface 363 uv_connect_t *conn = new(std::nothrow) uv_connect_t(); 364 if (conn == nullptr) { 365 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed"); 366 return false; 367 } 368 conn->data = ctxPoint; 369 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget); 370 } 371 return true; 372 } 373 SetupDevicePoint(HCtxForward ctxPoint)374 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint) 375 { 376 uint8_t flag = 1; 377 string &sNodeCfg = ctxPoint->localArgs[1]; 378 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg); 379 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) { 380 ctxPoint->lastError = "Open unix-dev failed"; 381 flag = -1; 382 } 383 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool { 384 HCtxForward ctx = (HCtxForward)a; 385 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c); 386 }; 387 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool { 388 HCtxForward ctx = (HCtxForward)a; 389 WRITE_LOG(LOG_DEBUG, "Error ReadForwardBuf dev,ret:%d reason:%s", b, c.c_str()); 390 FreeContext(ctx, 0, true); 391 return false; 392 }; 393 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead, funcFinish); 394 if (ctxPoint->fdClass == nullptr) { 395 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed"); 396 return false; 397 } 398 SetupPointContinue(ctxPoint, flag); 399 return true; 400 } 401 LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)402 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg) 403 { 404 bool abstractRet = false; 405 #ifndef _WIN32 406 int s = 0; 407 do { 408 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) { 409 break; 410 } 411 fcntl(s, F_SETFD, FD_CLOEXEC); 412 struct sockaddr_un addr; 413 Base::ZeroStruct(addr); 414 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1; 415 addr.sun_family = AF_LOCAL; 416 addr.sun_path[0] = 0; 417 418 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) { 419 break; 420 }; 421 // local connect, ignore timeout 422 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) { 423 break; 424 } 425 if (uv_pipe_open(pipe, s)) { 426 break; 427 } 428 abstractRet = true; 429 } while (false); 430 if (!abstractRet) { 431 Base::CloseFd(s); 432 } 433 #endif 434 return abstractRet; 435 } 436 SetupFilePoint(HCtxForward ctxPoint)437 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint) 438 { 439 string &sNodeCfg = ctxPoint->localArgs[1]; 440 ctxPoint->pipe.data = ctxPoint; 441 uv_pipe_init(loopTask, &ctxPoint->pipe, 0); 442 if (ctxPoint->masterSlave) { 443 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) { 444 unlink(sNodeCfg.c_str()); 445 } 446 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) { 447 ctxPoint->lastError = "Unix pipe bind failed"; 448 return false; 449 } 450 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, 4, ListenCallback)) { 451 ctxPoint->lastError = "Unix pipe listen failed"; 452 return false; 453 } 454 } else { 455 if (ctxPoint->type == FORWARD_ABSTRACT) { 456 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg); 457 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1); 458 if (!abstractRet) { 459 ctxPoint->lastError = "LocalAbstractConnect failed"; 460 return false; 461 } 462 } else { 463 uv_connect_t *connect = new(std::nothrow) uv_connect_t(); 464 if (connect == nullptr) { 465 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed"); 466 return false; 467 } 468 connect->data = ctxPoint; 469 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget); 470 } 471 } 472 return true; 473 } 474 SetupPoint(HCtxForward ctxPoint)475 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint) 476 { 477 bool ret = true; 478 if (!DetechForwardType(ctxPoint)) { 479 return false; 480 } 481 switch (ctxPoint->type) { 482 case FORWARD_TCP: 483 if (!SetupTCPPoint(ctxPoint)) { 484 ret = false; 485 }; 486 break; 487 #ifndef _WIN32 488 case FORWARD_DEVICE: 489 if (!SetupDevicePoint(ctxPoint)) { 490 ret = false; 491 }; 492 break; 493 case FORWARD_JDWP: 494 if (!SetupJdwpPoint(ctxPoint)) { 495 ret = false; 496 }; 497 break; 498 case FORWARD_ABSTRACT: 499 case FORWARD_RESERVED: 500 case FORWARD_FILESYSTEM: 501 if (!SetupFilePoint(ctxPoint)) { 502 ret = false; 503 }; 504 break; 505 #else 506 case FORWARD_DEVICE: 507 case FORWARD_JDWP: 508 case FORWARD_ABSTRACT: 509 case FORWARD_RESERVED: 510 case FORWARD_FILESYSTEM: 511 ctxPoint->lastError = "Not supoort forward-type"; 512 ret = false; 513 break; 514 #endif 515 default: 516 ctxPoint->lastError = "Not supoort forward-type"; 517 ret = false; 518 break; 519 } 520 return ret; 521 } 522 BeginForward(const string & command,string & sError)523 bool HdcForwardBase::BeginForward(const string &command, string &sError) 524 { 525 bool ret = false; 526 int argc = 0; 527 char bufString[BUF_SIZE_SMALL] = ""; 528 HCtxForward ctxPoint = (HCtxForward)MallocContext(true); 529 if (!ctxPoint) { 530 WRITE_LOG(LOG_FATAL, "MallocContext failed"); 531 return false; 532 } 533 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc); 534 if (argv == nullptr) { 535 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed"); 536 return false; 537 } 538 while (true) { 539 if (argc < CMD_ARG1_COUNT) { 540 break; 541 } 542 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) { 543 break; 544 } 545 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) { 546 break; 547 } 548 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) { 549 break; 550 } 551 ctxPoint->remoteParamenters = argv[1]; 552 if (!SetupPoint(ctxPoint)) { 553 break; 554 } 555 556 ret = true; 557 break; 558 } 559 sError = ctxPoint->lastError; 560 if (ret) { 561 // First 8-byte parameter bit 562 int maxBufSize = sizeof(bufString) - forwardParameterBufSize; 563 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) { 564 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString), 565 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1); 566 taskCommand = command; 567 } 568 } 569 delete[](reinterpret_cast<char *>(argv)); 570 return ret; 571 } 572 FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)573 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf) 574 { 575 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE; 576 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn)); 577 return true; 578 } 579 SlaveConnect(uint8_t * bufCmd,bool bCheckPoint,string & sError)580 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, bool bCheckPoint, string &sError) 581 { 582 bool ret = false; 583 char *content = nullptr; 584 uint32_t idSlaveOld = 0; 585 HCtxForward ctxPoint = (HCtxForward)MallocContext(false); 586 if (!ctxPoint) { 587 WRITE_LOG(LOG_FATAL, "MallocContext failed"); 588 return false; 589 } 590 idSlaveOld = ctxPoint->id; 591 ctxPoint->checkPoint = bCheckPoint; 592 // refresh another id,8byte param 593 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content)); 594 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint); 595 content += forwardParameterBufSize; 596 if (!CheckNodeInfo(content, ctxPoint->localArgs)) { 597 return false; 598 } 599 if ((ctxPoint->checkPoint && slaveCheckWhenBegin) || !ctxPoint->checkPoint) { 600 if (!SetupPoint(ctxPoint)) { 601 WRITE_LOG(LOG_FATAL, "SetupPoint failed"); 602 goto Finish; 603 } 604 sError = ctxPoint->lastError; 605 } else { 606 SetupPointContinue(ctxPoint, 0); 607 } 608 ret = true; 609 Finish: 610 if (!ret) { 611 FreeContext(ctxPoint, 0, true); 612 } 613 return ret; 614 } 615 DoForwardBegin(HCtxForward ctx)616 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx) 617 { 618 switch (ctx->type) { 619 case FORWARD_TCP: 620 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm 621 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1); 622 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf); 623 break; 624 case FORWARD_ABSTRACT: 625 case FORWARD_RESERVED: 626 case FORWARD_FILESYSTEM: 627 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf); 628 break; 629 case FORWARD_DEVICE: { 630 ctx->fdClass->StartWork(); 631 break; 632 } 633 default: 634 break; 635 } 636 ctx->ready = true; 637 return true; 638 } 639 AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)640 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput) 641 { 642 ctxPointMutex.lock(); 643 void *hRet = nullptr; 644 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint; 645 switch (op) { 646 case OP_ADD: 647 mapCtx[id] = hInput; 648 break; 649 case OP_REMOVE: 650 mapCtx.erase(id); 651 break; 652 case OP_QUERY: 653 if (mapCtx.count(id)) { 654 hRet = mapCtx[id]; 655 } 656 break; 657 case OP_UPDATE: 658 mapCtx.erase(id); 659 mapCtx[hInput->id] = hInput; 660 break; 661 default: 662 break; 663 } 664 ctxPointMutex.unlock(); 665 return hRet; 666 } 667 SendCallbackForwardBuf(uv_write_t * req,int status)668 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status) 669 { 670 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data; 671 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward); 672 if (status < 0 && !ctx->finish) { 673 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status); 674 ctx->thisClass->FreeContext(ctx, 0, true); 675 } 676 delete[] ctxIO->bufIO; 677 delete ctxIO; 678 delete req; 679 } 680 SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)681 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size) 682 { 683 int nRet = 0; 684 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) { 685 return -1; 686 } 687 if (size <= 0) { 688 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d", size); 689 return -1; 690 } 691 auto pDynBuf = new(std::nothrow) uint8_t[size]; 692 if (!pDynBuf) { 693 return -1; 694 } 695 (void)memcpy_s(pDynBuf, size, bufPtr, size); 696 if (ctx->type == FORWARD_DEVICE) { 697 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size); 698 } else { 699 auto ctxIO = new ContextForwardIO(); 700 if (!ctxIO) { 701 delete[] pDynBuf; 702 return -1; 703 } 704 ctxIO->ctxForward = ctx; 705 ctxIO->bufIO = pDynBuf; 706 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP) { 707 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr, 708 (void *)SendCallbackForwardBuf, (void *)ctxIO); 709 } else { 710 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM, 711 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr, 712 (void *)SendCallbackForwardBuf, (void *)ctxIO); 713 } 714 } 715 return nRet; 716 } 717 CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)718 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload) 719 { 720 bool ret = true; 721 bool bCheck = static_cast<bool>(payload); 722 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed"); 723 if (bCheck) { 724 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|"; 725 mapInfo += taskCommand; 726 ctx->ready = true; 727 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())), 728 mapInfo.size() + 1); 729 } else { 730 ret = false; 731 FreeContext(ctx, 0, false); 732 } 733 return ret; 734 } 735 ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)736 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize) 737 { 738 bool ret = true; 739 uint8_t *pContent = nullptr; 740 int sizeContent = 0; 741 uint32_t id = 0; 742 HCtxForward ctx = nullptr; 743 FilterCommand(payload, &id, &pContent); 744 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE; 745 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) { 746 WRITE_LOG(LOG_WARN, "Query id failed"); 747 return false; 748 } 749 switch (command) { 750 case CMD_FORWARD_CHECK_RESULT: { 751 ret = CommandForwardCheckResult(ctx, payload); 752 break; 753 } 754 case CMD_FORWARD_ACTIVE_MASTER: { 755 ret = DoForwardBegin(ctx); 756 break; 757 } 758 case CMD_FORWARD_DATA: { 759 if (ctx->finish) { 760 break; 761 } 762 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) { 763 FreeContext(ctx, 0, true); 764 } 765 break; 766 } 767 case CMD_FORWARD_FREE_CONTEXT: { 768 FreeContext(ctx, 0, false); 769 break; 770 } 771 default: 772 ret = false; 773 break; 774 } 775 if (!ret) { 776 if (ctx) { 777 FreeContext(ctx, 0, true); 778 } else { 779 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free"); 780 TaskFinish(); 781 } 782 } 783 return ret; 784 } 785 CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)786 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize) 787 { 788 bool ret = true; 789 string sError; 790 // prepare 791 if (command == CMD_FORWARD_INIT) { 792 string strCommand(reinterpret_cast<char *>(payload), payloadSize); 793 if (!BeginForward(strCommand, sError)) { 794 ret = false; 795 goto Finish; 796 } 797 return true; 798 } else if (command == CMD_FORWARD_CHECK) { 799 // Detect remote if it's reachable 800 if (!SlaveConnect(payload, true, sError)) { 801 ret = false; 802 goto Finish; 803 } 804 return true; 805 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) { 806 // slave connect target port when activating 807 if (!SlaveConnect(payload, false, sError)) { 808 ret = false; 809 goto Finish; 810 } 811 return true; 812 } 813 if (!ForwardCommandDispatch(command, payload, payloadSize)) { 814 ret = false; 815 goto Finish; 816 } 817 Finish: 818 if (!ret) { 819 if (!sError.size()) { 820 LogMsg(MSG_FAIL, "Forward parament failed"); 821 } else { 822 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str())); 823 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str())); 824 } 825 } 826 return ret; 827 } 828 } // namespace Hdc 829