1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "forward.h"
16 #include "base.h"
17
18 namespace Hdc {
HdcForwardBase(HTaskInfo hTaskInfo)19 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo)
20 : HdcTaskBase(hTaskInfo)
21 {
22 }
23
~HdcForwardBase()24 HdcForwardBase::~HdcForwardBase()
25 {
26 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase");
27 };
28
ReadyForRelease()29 bool HdcForwardBase::ReadyForRelease()
30 {
31 if (!HdcTaskBase::ReadyForRelease()) {
32 return false;
33 }
34 return true;
35 }
36
StopTask()37 void HdcForwardBase::StopTask()
38 {
39 ctxPointMutex.lock();
40 vector<HCtxForward> ctxs;
41 map<uint32_t, HCtxForward>::iterator iter;
42 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) {
43 HCtxForward ctx = iter->second;
44 ctxs.push_back(ctx);
45 }
46 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other.
47 mapCtxPoint.clear();
48 ctxPointMutex.unlock();
49 for (auto ctx: ctxs) {
50 FreeContext(ctx, 0, false);
51 }
52 }
53
OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)54 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client)
55 {
56 HCtxForward ctxListen = (HCtxForward)server->data;
57 char buf[BUF_SIZE_DEFAULT] = { 0 };
58 bool ret = false;
59 while (true) {
60 if (uv_accept(server, client)) {
61 break;
62 }
63 ctxClient->type = ctxListen->type;
64 ctxClient->remoteParamenters = ctxListen->remoteParamenters;
65 int maxSize = sizeof(buf) - forwardParameterBufSize;
66 // clang-format off
67 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s",
68 ctxClient->remoteParamenters.c_str()) < 0) {
69 break;
70 }
71 // clang-format on
72 // pre 8bytes preserve for param bits
73 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf),
74 strlen(buf + forwardParameterBufSize) + 9);
75 ret = true;
76 break;
77 }
78 if (!ret) {
79 FreeContext(ctxClient, 0, false);
80 }
81 }
82
ListenCallback(uv_stream_t * server,const int status)83 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status)
84 {
85 HCtxForward ctxListen = (HCtxForward)server->data;
86 HdcForwardBase *thisClass = ctxListen->thisClass;
87 uv_stream_t *client = nullptr;
88
89 if (status == -1 || !ctxListen->ready) {
90 thisClass->FreeContext(ctxListen, 0, false);
91 thisClass->TaskFinish();
92 return;
93 }
94 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true);
95 if (!ctxClient) {
96 return;
97 }
98 if (ctxListen->type == FORWARD_TCP) {
99 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp);
100 client = (uv_stream_t *)&ctxClient->tcp;
101 } else {
102 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
103 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0);
104 client = (uv_stream_t *)&ctxClient->pipe;
105 }
106 thisClass->OnAccept(server, ctxClient, client);
107 }
108
MallocContext(bool masterSlave)109 void *HdcForwardBase::MallocContext(bool masterSlave)
110 {
111 HCtxForward ctx = nullptr;
112 if ((ctx = new ContextForward()) == nullptr) {
113 return nullptr;
114 }
115 ctx->id = Base::GetRuntimeMSec();
116 ctx->masterSlave = masterSlave;
117 ctx->thisClass = this;
118 ctx->fdClass = nullptr;
119 ctx->tcp.data = ctx;
120 ctx->pipe.data = ctx;
121 AdminContext(OP_ADD, ctx->id, ctx);
122 refCount++;
123 return ctx;
124 }
125
FreeContextCallBack(HCtxForward ctx)126 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx)
127 {
128 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) {
129 HCtxForward ctx = (HCtxForward)data;
130 AdminContext(OP_REMOVE, ctx->id, nullptr);
131 if (ctx != nullptr) {
132 delete ctx;
133 ctx = nullptr;
134 }
135 if (refCount > 0) {
136 --refCount;
137 }
138 });
139 }
140
FreeJDWP(HCtxForward ctx)141 void HdcForwardBase::FreeJDWP(HCtxForward ctx)
142 {
143 Base::CloseFd(ctx->fd);
144 if (ctx->fdClass) {
145 ctx->fdClass->StopWork(false, nullptr);
146
147 auto funcReqClose = [](uv_idle_t *handle) -> void {
148 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
149 HCtxForward ctx = (HCtxForward)handle->data;
150 ctx->thisClass->FreeContextCallBack(ctx);
151 delete (uv_idle_t *)handle;
152 };
153 HCtxForward context = (HCtxForward)handle->data;
154 if (context->fdClass->ReadyForRelease()) {
155 delete context->fdClass;
156 context->fdClass = nullptr;
157 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
158 }
159 };
160 Base::IdleUvTask(loopTask, ctx, funcReqClose);
161 }
162 }
163
FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)164 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote)
165 {
166 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d", id, bNotifyRemote);
167 HCtxForward ctx = nullptr;
168 if (!ctxIn) {
169 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
170 WRITE_LOG(LOG_DEBUG, "Query id failed");
171 return;
172 }
173 } else {
174 ctx = ctxIn;
175 }
176 if (ctx->finish) {
177 return;
178 }
179 if (bNotifyRemote) {
180 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0);
181 }
182 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void {
183 HCtxForward ctx = (HCtxForward)handle->data;
184 ctx->thisClass->FreeContextCallBack(ctx);
185 };
186 switch (ctx->type) {
187 case FORWARD_TCP:
188 case FORWARD_JDWP:
189 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose);
190 break;
191 case FORWARD_ABSTRACT:
192 case FORWARD_RESERVED:
193 case FORWARD_FILESYSTEM:
194 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose);
195 break;
196 case FORWARD_DEVICE: {
197 FreeJDWP(ctx);
198 break;
199 }
200 default:
201 break;
202 }
203 ctx->finish = true;
204 }
205
SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)206 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize)
207 {
208 bool ret = false;
209 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO
210 if (bufSize > Base::GetMaxBufSize() * 2) {
211 return false;
212 }
213 auto newBuf = new uint8_t[bufSize + 4];
214 if (!newBuf) {
215 return false;
216 }
217 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid);
218 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + 4, bufSize, bufPtr, bufSize) != EOK) {
219 delete[] newBuf;
220 return false;
221 }
222 ret = SendToAnother(command, newBuf, bufSize + 4);
223 delete[] newBuf;
224 return ret;
225 }
226
227 // Forward flow is small and frequency is fast
AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)228 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf)
229 {
230 const uint16_t size = 1492 - 256; // For layer 3, the default MTU is 1492 bytes. reserve hdc header 256 bytes
231 buf->base = (char *)new char[size];
232 if (buf->base) {
233 buf->len = size - 1;
234 } else {
235 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null");
236 }
237 }
238
ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)239 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
240 {
241 HCtxForward ctx = (HCtxForward)stream->data;
242 if (nread < 0) {
243 ctx->thisClass->FreeContext(ctx, 0, true);
244 delete[] buf->base;
245 return;
246 }
247 if (nread == 0) {
248 delete[] buf->base;
249 return;
250 }
251 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread);
252 // clear
253 delete[] buf->base;
254 }
255
ConnectTarget(uv_connect_t * connection,int status)256 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status)
257 {
258 HCtxForward ctx = (HCtxForward)connection->data;
259 HdcForwardBase *thisClass = ctx->thisClass;
260 delete connection;
261 if (status < 0) {
262 constexpr int bufSize = 1024;
263 char buf[bufSize] = { 0 };
264 uv_err_name_r(status, buf, bufSize);
265 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf);
266 }
267 thisClass->SetupPointContinue(ctx, status);
268 }
269
CheckNodeInfo(const char * nodeInfo,string as[2])270 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2])
271 {
272 char bufString[BUF_SIZE_MEDIUM];
273 if (!strchr(nodeInfo, ':')) {
274 return false;
275 }
276 if (EOK != strcpy_s(bufString, sizeof(bufString), nodeInfo)) {
277 return false;
278 }
279 if (*strchr(bufString, ':')) {
280 *strchr(bufString, ':') = '\0';
281 } else {
282 return false;
283 }
284 as[0] = bufString;
285 as[1] = bufString + strlen(bufString) + 1;
286 if (as[0].size() > BUF_SIZE_SMALL || as[1].size() > BUF_SIZE_SMALL) {
287 return false;
288 }
289 if (as[0] == "tcp") {
290 int port = atoi(as[1].c_str());
291 if (port <= 0 || port > MAX_IP_PORT) {
292 return false;
293 }
294 }
295 return true;
296 }
297
SetupPointContinue(HCtxForward ctx,int status)298 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status)
299 {
300 if (ctx->checkPoint) {
301 // send to active
302 uint8_t flag = status > 0;
303 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1);
304 FreeContext(ctx, 0, false);
305 return true;
306 }
307 if (status < 0) {
308 FreeContext(ctx, 0, true);
309 return false;
310 }
311 // send to active
312 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) {
313 FreeContext(ctx, 0, true);
314 return false;
315 }
316 return DoForwardBegin(ctx);
317 }
318
DetechForwardType(HCtxForward ctxPoint)319 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint)
320 {
321 string &sFType = ctxPoint->localArgs[0];
322 string &sNodeCfg = ctxPoint->localArgs[1];
323 // string to enum
324 if (sFType == "tcp") {
325 ctxPoint->type = FORWARD_TCP;
326 } else if (sFType == "dev") {
327 ctxPoint->type = FORWARD_DEVICE;
328 } else if (sFType == "localabstract") {
329 // daemon shell: /system/bin/socat abstract-listen:linux-abstract -
330 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract
331 // host: hdc fport tcp:8080 localabstract:linux-abstract
332 ctxPoint->type = FORWARD_ABSTRACT;
333 } else if (sFType == "localreserved") {
334 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg;
335 ctxPoint->type = FORWARD_RESERVED;
336 } else if (sFType == "localfilesystem") {
337 sNodeCfg = filesystemSocketPrefix + sNodeCfg;
338 ctxPoint->type = FORWARD_FILESYSTEM;
339 } else if (sFType == "jdwp") {
340 ctxPoint->type = FORWARD_JDWP;
341 } else {
342 return false;
343 }
344 return true;
345 }
346
SetupTCPPoint(HCtxForward ctxPoint)347 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint)
348 {
349 string &sNodeCfg = ctxPoint->localArgs[1];
350 int port = atoi(sNodeCfg.c_str());
351 ctxPoint->tcp.data = ctxPoint;
352 uv_tcp_init(loopTask, &ctxPoint->tcp);
353 struct sockaddr_in addr;
354 if (ctxPoint->masterSlave) {
355 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
356 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0);
357 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, 4, ListenCallback)) {
358 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg;
359 return false;
360 }
361 } else {
362 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
363 uv_connect_t *conn = new(std::nothrow) uv_connect_t();
364 if (conn == nullptr) {
365 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed");
366 return false;
367 }
368 conn->data = ctxPoint;
369 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget);
370 }
371 return true;
372 }
373
SetupDevicePoint(HCtxForward ctxPoint)374 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint)
375 {
376 uint8_t flag = 1;
377 string &sNodeCfg = ctxPoint->localArgs[1];
378 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg);
379 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) {
380 ctxPoint->lastError = "Open unix-dev failed";
381 flag = -1;
382 }
383 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool {
384 HCtxForward ctx = (HCtxForward)a;
385 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c);
386 };
387 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool {
388 HCtxForward ctx = (HCtxForward)a;
389 WRITE_LOG(LOG_DEBUG, "Error ReadForwardBuf dev,ret:%d reason:%s", b, c.c_str());
390 FreeContext(ctx, 0, true);
391 return false;
392 };
393 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead, funcFinish);
394 if (ctxPoint->fdClass == nullptr) {
395 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed");
396 return false;
397 }
398 SetupPointContinue(ctxPoint, flag);
399 return true;
400 }
401
LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)402 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg)
403 {
404 bool abstractRet = false;
405 #ifndef _WIN32
406 int s = 0;
407 do {
408 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
409 break;
410 }
411 fcntl(s, F_SETFD, FD_CLOEXEC);
412 struct sockaddr_un addr;
413 Base::ZeroStruct(addr);
414 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1;
415 addr.sun_family = AF_LOCAL;
416 addr.sun_path[0] = 0;
417
418 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) {
419 break;
420 };
421 // local connect, ignore timeout
422 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) {
423 break;
424 }
425 if (uv_pipe_open(pipe, s)) {
426 break;
427 }
428 abstractRet = true;
429 } while (false);
430 if (!abstractRet) {
431 Base::CloseFd(s);
432 }
433 #endif
434 return abstractRet;
435 }
436
SetupFilePoint(HCtxForward ctxPoint)437 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint)
438 {
439 string &sNodeCfg = ctxPoint->localArgs[1];
440 ctxPoint->pipe.data = ctxPoint;
441 uv_pipe_init(loopTask, &ctxPoint->pipe, 0);
442 if (ctxPoint->masterSlave) {
443 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) {
444 unlink(sNodeCfg.c_str());
445 }
446 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) {
447 ctxPoint->lastError = "Unix pipe bind failed";
448 return false;
449 }
450 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, 4, ListenCallback)) {
451 ctxPoint->lastError = "Unix pipe listen failed";
452 return false;
453 }
454 } else {
455 if (ctxPoint->type == FORWARD_ABSTRACT) {
456 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg);
457 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1);
458 if (!abstractRet) {
459 ctxPoint->lastError = "LocalAbstractConnect failed";
460 return false;
461 }
462 } else {
463 uv_connect_t *connect = new(std::nothrow) uv_connect_t();
464 if (connect == nullptr) {
465 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed");
466 return false;
467 }
468 connect->data = ctxPoint;
469 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget);
470 }
471 }
472 return true;
473 }
474
SetupPoint(HCtxForward ctxPoint)475 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint)
476 {
477 bool ret = true;
478 if (!DetechForwardType(ctxPoint)) {
479 return false;
480 }
481 switch (ctxPoint->type) {
482 case FORWARD_TCP:
483 if (!SetupTCPPoint(ctxPoint)) {
484 ret = false;
485 };
486 break;
487 #ifndef _WIN32
488 case FORWARD_DEVICE:
489 if (!SetupDevicePoint(ctxPoint)) {
490 ret = false;
491 };
492 break;
493 case FORWARD_JDWP:
494 if (!SetupJdwpPoint(ctxPoint)) {
495 ret = false;
496 };
497 break;
498 case FORWARD_ABSTRACT:
499 case FORWARD_RESERVED:
500 case FORWARD_FILESYSTEM:
501 if (!SetupFilePoint(ctxPoint)) {
502 ret = false;
503 };
504 break;
505 #else
506 case FORWARD_DEVICE:
507 case FORWARD_JDWP:
508 case FORWARD_ABSTRACT:
509 case FORWARD_RESERVED:
510 case FORWARD_FILESYSTEM:
511 ctxPoint->lastError = "Not supoort forward-type";
512 ret = false;
513 break;
514 #endif
515 default:
516 ctxPoint->lastError = "Not supoort forward-type";
517 ret = false;
518 break;
519 }
520 return ret;
521 }
522
BeginForward(const string & command,string & sError)523 bool HdcForwardBase::BeginForward(const string &command, string &sError)
524 {
525 bool ret = false;
526 int argc = 0;
527 char bufString[BUF_SIZE_SMALL] = "";
528 HCtxForward ctxPoint = (HCtxForward)MallocContext(true);
529 if (!ctxPoint) {
530 WRITE_LOG(LOG_FATAL, "MallocContext failed");
531 return false;
532 }
533 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc);
534 if (argv == nullptr) {
535 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed");
536 return false;
537 }
538 while (true) {
539 if (argc < CMD_ARG1_COUNT) {
540 break;
541 }
542 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) {
543 break;
544 }
545 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) {
546 break;
547 }
548 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) {
549 break;
550 }
551 ctxPoint->remoteParamenters = argv[1];
552 if (!SetupPoint(ctxPoint)) {
553 break;
554 }
555
556 ret = true;
557 break;
558 }
559 sError = ctxPoint->lastError;
560 if (ret) {
561 // First 8-byte parameter bit
562 int maxBufSize = sizeof(bufString) - forwardParameterBufSize;
563 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) {
564 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString),
565 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1);
566 taskCommand = command;
567 }
568 }
569 delete[](reinterpret_cast<char *>(argv));
570 return ret;
571 }
572
FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)573 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf)
574 {
575 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE;
576 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn));
577 return true;
578 }
579
SlaveConnect(uint8_t * bufCmd,bool bCheckPoint,string & sError)580 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, bool bCheckPoint, string &sError)
581 {
582 bool ret = false;
583 char *content = nullptr;
584 uint32_t idSlaveOld = 0;
585 HCtxForward ctxPoint = (HCtxForward)MallocContext(false);
586 if (!ctxPoint) {
587 WRITE_LOG(LOG_FATAL, "MallocContext failed");
588 return false;
589 }
590 idSlaveOld = ctxPoint->id;
591 ctxPoint->checkPoint = bCheckPoint;
592 // refresh another id,8byte param
593 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content));
594 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint);
595 content += forwardParameterBufSize;
596 if (!CheckNodeInfo(content, ctxPoint->localArgs)) {
597 return false;
598 }
599 if ((ctxPoint->checkPoint && slaveCheckWhenBegin) || !ctxPoint->checkPoint) {
600 if (!SetupPoint(ctxPoint)) {
601 WRITE_LOG(LOG_FATAL, "SetupPoint failed");
602 goto Finish;
603 }
604 sError = ctxPoint->lastError;
605 } else {
606 SetupPointContinue(ctxPoint, 0);
607 }
608 ret = true;
609 Finish:
610 if (!ret) {
611 FreeContext(ctxPoint, 0, true);
612 }
613 return ret;
614 }
615
DoForwardBegin(HCtxForward ctx)616 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx)
617 {
618 switch (ctx->type) {
619 case FORWARD_TCP:
620 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm
621 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
622 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
623 break;
624 case FORWARD_ABSTRACT:
625 case FORWARD_RESERVED:
626 case FORWARD_FILESYSTEM:
627 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf);
628 break;
629 case FORWARD_DEVICE: {
630 ctx->fdClass->StartWork();
631 break;
632 }
633 default:
634 break;
635 }
636 ctx->ready = true;
637 return true;
638 }
639
AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)640 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput)
641 {
642 ctxPointMutex.lock();
643 void *hRet = nullptr;
644 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint;
645 switch (op) {
646 case OP_ADD:
647 mapCtx[id] = hInput;
648 break;
649 case OP_REMOVE:
650 mapCtx.erase(id);
651 break;
652 case OP_QUERY:
653 if (mapCtx.count(id)) {
654 hRet = mapCtx[id];
655 }
656 break;
657 case OP_UPDATE:
658 mapCtx.erase(id);
659 mapCtx[hInput->id] = hInput;
660 break;
661 default:
662 break;
663 }
664 ctxPointMutex.unlock();
665 return hRet;
666 }
667
SendCallbackForwardBuf(uv_write_t * req,int status)668 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status)
669 {
670 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data;
671 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward);
672 if (status < 0 && !ctx->finish) {
673 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status);
674 ctx->thisClass->FreeContext(ctx, 0, true);
675 }
676 delete[] ctxIO->bufIO;
677 delete ctxIO;
678 delete req;
679 }
680
SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)681 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size)
682 {
683 int nRet = 0;
684 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
685 return -1;
686 }
687 if (size <= 0) {
688 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d", size);
689 return -1;
690 }
691 auto pDynBuf = new(std::nothrow) uint8_t[size];
692 if (!pDynBuf) {
693 return -1;
694 }
695 (void)memcpy_s(pDynBuf, size, bufPtr, size);
696 if (ctx->type == FORWARD_DEVICE) {
697 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size);
698 } else {
699 auto ctxIO = new ContextForwardIO();
700 if (!ctxIO) {
701 delete[] pDynBuf;
702 return -1;
703 }
704 ctxIO->ctxForward = ctx;
705 ctxIO->bufIO = pDynBuf;
706 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP) {
707 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr,
708 (void *)SendCallbackForwardBuf, (void *)ctxIO);
709 } else {
710 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
711 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr,
712 (void *)SendCallbackForwardBuf, (void *)ctxIO);
713 }
714 }
715 return nRet;
716 }
717
CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)718 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload)
719 {
720 bool ret = true;
721 bool bCheck = static_cast<bool>(payload);
722 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed");
723 if (bCheck) {
724 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|";
725 mapInfo += taskCommand;
726 ctx->ready = true;
727 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())),
728 mapInfo.size() + 1);
729 } else {
730 ret = false;
731 FreeContext(ctx, 0, false);
732 }
733 return ret;
734 }
735
ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)736 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
737 {
738 bool ret = true;
739 uint8_t *pContent = nullptr;
740 int sizeContent = 0;
741 uint32_t id = 0;
742 HCtxForward ctx = nullptr;
743 FilterCommand(payload, &id, &pContent);
744 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE;
745 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
746 WRITE_LOG(LOG_WARN, "Query id failed");
747 return false;
748 }
749 switch (command) {
750 case CMD_FORWARD_CHECK_RESULT: {
751 ret = CommandForwardCheckResult(ctx, payload);
752 break;
753 }
754 case CMD_FORWARD_ACTIVE_MASTER: {
755 ret = DoForwardBegin(ctx);
756 break;
757 }
758 case CMD_FORWARD_DATA: {
759 if (ctx->finish) {
760 break;
761 }
762 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) {
763 FreeContext(ctx, 0, true);
764 }
765 break;
766 }
767 case CMD_FORWARD_FREE_CONTEXT: {
768 FreeContext(ctx, 0, false);
769 break;
770 }
771 default:
772 ret = false;
773 break;
774 }
775 if (!ret) {
776 if (ctx) {
777 FreeContext(ctx, 0, true);
778 } else {
779 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free");
780 TaskFinish();
781 }
782 }
783 return ret;
784 }
785
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)786 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
787 {
788 bool ret = true;
789 string sError;
790 // prepare
791 if (command == CMD_FORWARD_INIT) {
792 string strCommand(reinterpret_cast<char *>(payload), payloadSize);
793 if (!BeginForward(strCommand, sError)) {
794 ret = false;
795 goto Finish;
796 }
797 return true;
798 } else if (command == CMD_FORWARD_CHECK) {
799 // Detect remote if it's reachable
800 if (!SlaveConnect(payload, true, sError)) {
801 ret = false;
802 goto Finish;
803 }
804 return true;
805 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) {
806 // slave connect target port when activating
807 if (!SlaveConnect(payload, false, sError)) {
808 ret = false;
809 goto Finish;
810 }
811 return true;
812 }
813 if (!ForwardCommandDispatch(command, payload, payloadSize)) {
814 ret = false;
815 goto Finish;
816 }
817 Finish:
818 if (!ret) {
819 if (!sError.size()) {
820 LogMsg(MSG_FAIL, "Forward parament failed");
821 } else {
822 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str()));
823 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str()));
824 }
825 }
826 return ret;
827 }
828 } // namespace Hdc
829