1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "forward.h"
16 #include "base.h"
17
18 namespace Hdc {
HdcForwardBase(HTaskInfo hTaskInfo)19 HdcForwardBase::HdcForwardBase(HTaskInfo hTaskInfo)
20 : HdcTaskBase(hTaskInfo)
21 {
22 }
23
~HdcForwardBase()24 HdcForwardBase::~HdcForwardBase()
25 {
26 WRITE_LOG(LOG_DEBUG, "~HdcForwardBase");
27 };
28
ReadyForRelease()29 bool HdcForwardBase::ReadyForRelease()
30 {
31 if (!HdcTaskBase::ReadyForRelease()) {
32 return false;
33 }
34 return true;
35 }
36
StopTask()37 void HdcForwardBase::StopTask()
38 {
39 ctxPointMutex.lock();
40 vector<HCtxForward> ctxs;
41 map<uint32_t, HCtxForward>::iterator iter;
42 for (iter = mapCtxPoint.begin(); iter != mapCtxPoint.end(); ++iter) {
43 HCtxForward ctx = iter->second;
44 ctxs.push_back(ctx);
45 }
46 // FREECONTEXT in the STOP is triggered by the other party sector, no longer notifying each other.
47 mapCtxPoint.clear();
48 ctxPointMutex.unlock();
49 for (auto ctx: ctxs) {
50 FreeContext(ctx, 0, false);
51 }
52 }
53
OnAccept(uv_stream_t * server,HCtxForward ctxClient,uv_stream_t * client)54 void HdcForwardBase::OnAccept(uv_stream_t *server, HCtxForward ctxClient, uv_stream_t *client)
55 {
56 HCtxForward ctxListen = (HCtxForward)server->data;
57 char buf[BUF_SIZE_DEFAULT] = { 0 };
58 bool ret = false;
59 while (true) {
60 if (uv_accept(server, client)) {
61 break;
62 }
63 ctxClient->type = ctxListen->type;
64 ctxClient->remoteParamenters = ctxListen->remoteParamenters;
65 int maxSize = sizeof(buf) - forwardParameterBufSize;
66 // clang-format off
67 if (snprintf_s(buf + forwardParameterBufSize, maxSize, maxSize - 1, "%s",
68 ctxClient->remoteParamenters.c_str()) < 0) {
69 break;
70 }
71 // clang-format on
72 // pre 8bytes preserve for param bits
73 SendToTask(ctxClient->id, CMD_FORWARD_ACTIVE_SLAVE, reinterpret_cast<uint8_t *>(buf),
74 strlen(buf + forwardParameterBufSize) + 9);
75 ret = true;
76 break;
77 }
78 if (!ret) {
79 FreeContext(ctxClient, 0, false);
80 }
81 }
82
ListenCallback(uv_stream_t * server,const int status)83 void HdcForwardBase::ListenCallback(uv_stream_t *server, const int status)
84 {
85 HCtxForward ctxListen = (HCtxForward)server->data;
86 HdcForwardBase *thisClass = ctxListen->thisClass;
87 uv_stream_t *client = nullptr;
88
89 if (status == -1 || !ctxListen->ready) {
90 thisClass->FreeContext(ctxListen, 0, false);
91 thisClass->TaskFinish();
92 return;
93 }
94 HCtxForward ctxClient = (HCtxForward)thisClass->MallocContext(true);
95 if (!ctxClient) {
96 return;
97 }
98 if (ctxListen->type == FORWARD_TCP) {
99 uv_tcp_init(ctxClient->thisClass->loopTask, &ctxClient->tcp);
100 client = (uv_stream_t *)&ctxClient->tcp;
101 } else {
102 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
103 uv_pipe_init(ctxClient->thisClass->loopTask, &ctxClient->pipe, 0);
104 client = (uv_stream_t *)&ctxClient->pipe;
105 }
106 thisClass->OnAccept(server, ctxClient, client);
107 }
108
MallocContext(bool masterSlave)109 void *HdcForwardBase::MallocContext(bool masterSlave)
110 {
111 HCtxForward ctx = nullptr;
112 if ((ctx = new ContextForward()) == nullptr) {
113 return nullptr;
114 }
115 ctx->id = Base::GetRuntimeMSec();
116 ctx->masterSlave = masterSlave;
117 ctx->thisClass = this;
118 ctx->fdClass = nullptr;
119 ctx->tcp.data = ctx;
120 ctx->pipe.data = ctx;
121 AdminContext(OP_ADD, ctx->id, ctx);
122 refCount++;
123 return ctx;
124 }
125
FreeContextCallBack(HCtxForward ctx)126 void HdcForwardBase::FreeContextCallBack(HCtxForward ctx)
127 {
128 Base::DoNextLoop(loopTask, ctx, [this](const uint8_t flag, string &msg, const void *data) {
129 HCtxForward ctx = (HCtxForward)data;
130 AdminContext(OP_REMOVE, ctx->id, nullptr);
131 if (ctx != nullptr) {
132 delete ctx;
133 ctx = nullptr;
134 }
135 if (refCount > 0) {
136 --refCount;
137 }
138 });
139 }
140
FreeJDWP(HCtxForward ctx)141 void HdcForwardBase::FreeJDWP(HCtxForward ctx)
142 {
143 Base::CloseFd(ctx->fd);
144 if (ctx->fdClass) {
145 ctx->fdClass->StopWorkOnThread(false, nullptr);
146
147 auto funcReqClose = [](uv_idle_t *handle) -> void {
148 uv_close_cb funcIdleHandleClose = [](uv_handle_t *handle) -> void {
149 HCtxForward ctx = (HCtxForward)handle->data;
150 ctx->thisClass->FreeContextCallBack(ctx);
151 delete (uv_idle_t *)handle;
152 };
153 HCtxForward context = (HCtxForward)handle->data;
154 if (context->fdClass->ReadyForRelease()) {
155 delete context->fdClass;
156 context->fdClass = nullptr;
157 Base::TryCloseHandle((uv_handle_t *)handle, funcIdleHandleClose);
158 }
159 };
160 Base::IdleUvTask(loopTask, ctx, funcReqClose);
161 }
162 }
163
FreeContext(HCtxForward ctxIn,const uint32_t id,bool bNotifyRemote)164 void HdcForwardBase::FreeContext(HCtxForward ctxIn, const uint32_t id, bool bNotifyRemote)
165 {
166 WRITE_LOG(LOG_DEBUG, "FreeContext id:%u, bNotifyRemote:%d", id, bNotifyRemote);
167 std::lock_guard<std::mutex> lock(ctxFreeMutex);
168 HCtxForward ctx = nullptr;
169 if (!ctxIn) {
170 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
171 WRITE_LOG(LOG_DEBUG, "Query id failed");
172 return;
173 }
174 } else {
175 ctx = ctxIn;
176 }
177 if (ctx->finish) {
178 return;
179 }
180 if (bNotifyRemote) {
181 SendToTask(ctx->id, CMD_FORWARD_FREE_CONTEXT, nullptr, 0);
182 }
183 uv_close_cb funcHandleClose = [](uv_handle_t *handle) -> void {
184 HCtxForward ctx = (HCtxForward)handle->data;
185 ctx->thisClass->FreeContextCallBack(ctx);
186 };
187 switch (ctx->type) {
188 case FORWARD_TCP:
189 case FORWARD_JDWP:
190 Base::TryCloseHandle((uv_handle_t *)&ctx->tcp, true, funcHandleClose);
191 break;
192 case FORWARD_ABSTRACT:
193 case FORWARD_RESERVED:
194 case FORWARD_FILESYSTEM:
195 Base::TryCloseHandle((uv_handle_t *)&ctx->pipe, true, funcHandleClose);
196 break;
197 case FORWARD_DEVICE: {
198 FreeJDWP(ctx);
199 break;
200 }
201 default:
202 break;
203 }
204 ctx->finish = true;
205 }
206
SendToTask(const uint32_t cid,const uint16_t command,uint8_t * bufPtr,const int bufSize)207 bool HdcForwardBase::SendToTask(const uint32_t cid, const uint16_t command, uint8_t *bufPtr, const int bufSize)
208 {
209 StartTraceScope("HdcForwardBase::SendToTask");
210 bool ret = false;
211 // usually MAX_SIZE_IOBUF*2 from HdcFileDescriptor maxIO
212 if (bufSize > Base::GetMaxBufSize() * 2) {
213 return false;
214 }
215 auto newBuf = new uint8_t[bufSize + 4];
216 if (!newBuf) {
217 return false;
218 }
219 *reinterpret_cast<uint32_t *>(newBuf) = htonl(cid);
220 if (bufSize > 0 && bufPtr != nullptr && memcpy_s(newBuf + 4, bufSize, bufPtr, bufSize) != EOK) {
221 delete[] newBuf;
222 return false;
223 }
224 ret = SendToAnother(command, newBuf, bufSize + 4);
225 delete[] newBuf;
226 return ret;
227 }
228
229 // Forward flow is small and frequency is fast
AllocForwardBuf(uv_handle_t * handle,size_t sizeSuggested,uv_buf_t * buf)230 void HdcForwardBase::AllocForwardBuf(uv_handle_t *handle, size_t sizeSuggested, uv_buf_t *buf)
231 {
232 size_t size = sizeSuggested;
233 if (size > MAX_USBFFS_BULK) {
234 size = MAX_USBFFS_BULK;
235 }
236 buf->base = (char *)new char[size];
237 if (buf->base) {
238 buf->len = size - 1;
239 } else {
240 WRITE_LOG(LOG_WARN, "AllocForwardBuf == null");
241 }
242 }
243
ReadForwardBuf(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)244 void HdcForwardBase::ReadForwardBuf(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
245 {
246 HCtxForward ctx = (HCtxForward)stream->data;
247 if (nread < 0) {
248 ctx->thisClass->FreeContext(ctx, 0, true);
249 delete[] buf->base;
250 return;
251 }
252 if (nread == 0) {
253 delete[] buf->base;
254 return;
255 }
256 ctx->thisClass->SendToTask(ctx->id, CMD_FORWARD_DATA, (uint8_t *)buf->base, nread);
257 // clear
258 delete[] buf->base;
259 }
260
ConnectTarget(uv_connect_t * connection,int status)261 void HdcForwardBase::ConnectTarget(uv_connect_t *connection, int status)
262 {
263 HCtxForward ctx = (HCtxForward)connection->data;
264 HdcForwardBase *thisClass = ctx->thisClass;
265 delete connection;
266 if (status < 0) {
267 constexpr int bufSize = 1024;
268 char buf[bufSize] = { 0 };
269 uv_err_name_r(status, buf, bufSize);
270 WRITE_LOG(LOG_WARN, "Forward connect result:%d error:%s", status, buf);
271 }
272 thisClass->SetupPointContinue(ctx, status);
273 }
274
CheckNodeInfo(const char * nodeInfo,string as[2])275 bool HdcForwardBase::CheckNodeInfo(const char *nodeInfo, string as[2])
276 {
277 string str = nodeInfo;
278 size_t strLen = str.size();
279 size_t pos = str.find(':');
280 if (pos != string::npos) {
281 if (pos == 0 || pos == strLen - 1) {
282 return false;
283 }
284 as[0] = str.substr(0, pos);
285 as[1] = str.substr(pos + 1);
286 } else {
287 return false;
288 }
289 if (as[0] == "tcp") {
290 if (as[1].size() > std::to_string(MAX_IP_PORT).size()) {
291 return false;
292 }
293 int port = atoi(as[1].c_str());
294 if (port <= 0 || port > MAX_IP_PORT) {
295 return false;
296 }
297 }
298 return true;
299 }
300
SetupPointContinue(HCtxForward ctx,int status)301 bool HdcForwardBase::SetupPointContinue(HCtxForward ctx, int status)
302 {
303 if (ctx->checkPoint) {
304 // send to active
305 uint8_t flag = status > 0;
306 SendToTask(ctx->id, CMD_FORWARD_CHECK_RESULT, &flag, 1);
307 FreeContext(ctx, 0, false);
308 return true;
309 }
310 if (status < 0) {
311 FreeContext(ctx, 0, true);
312 return false;
313 }
314 // send to active
315 if (!SendToTask(ctx->id, CMD_FORWARD_ACTIVE_MASTER, nullptr, 0)) {
316 FreeContext(ctx, 0, true);
317 return false;
318 }
319 return DoForwardBegin(ctx);
320 }
321
DetechForwardType(HCtxForward ctxPoint)322 bool HdcForwardBase::DetechForwardType(HCtxForward ctxPoint)
323 {
324 string &sFType = ctxPoint->localArgs[0];
325 string &sNodeCfg = ctxPoint->localArgs[1];
326 // string to enum
327 if (sFType == "tcp") {
328 ctxPoint->type = FORWARD_TCP;
329 } else if (sFType == "dev") {
330 ctxPoint->type = FORWARD_DEVICE;
331 } else if (sFType == "localabstract") {
332 // daemon shell: /system/bin/socat abstract-listen:linux-abstract -
333 // daemon shell: /system/bin/socat - abstract-connect:linux-abstract
334 // host: hdc fport tcp:8080 localabstract:linux-abstract
335 ctxPoint->type = FORWARD_ABSTRACT;
336 } else if (sFType == "localreserved") {
337 sNodeCfg = harmonyReservedSocketPrefix + sNodeCfg;
338 ctxPoint->type = FORWARD_RESERVED;
339 } else if (sFType == "localfilesystem") {
340 sNodeCfg = filesystemSocketPrefix + sNodeCfg;
341 ctxPoint->type = FORWARD_FILESYSTEM;
342 } else if (sFType == "jdwp") {
343 ctxPoint->type = FORWARD_JDWP;
344 } else {
345 return false;
346 }
347 return true;
348 }
349
SetupTCPPoint(HCtxForward ctxPoint)350 bool HdcForwardBase::SetupTCPPoint(HCtxForward ctxPoint)
351 {
352 string &sNodeCfg = ctxPoint->localArgs[1];
353 int port = atoi(sNodeCfg.c_str());
354 ctxPoint->tcp.data = ctxPoint;
355 uv_tcp_init(loopTask, &ctxPoint->tcp);
356 struct sockaddr_in addr;
357 if (ctxPoint->masterSlave) {
358 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
359 uv_tcp_bind(&ctxPoint->tcp, (const struct sockaddr *)&addr, 0);
360 if (uv_listen((uv_stream_t *)&ctxPoint->tcp, 4, ListenCallback)) {
361 ctxPoint->lastError = "TCP Port listen failed at " + sNodeCfg;
362 return false;
363 }
364 } else {
365 uv_ip4_addr("127.0.0.1", port, &addr); // loop interface
366 uv_connect_t *conn = new(std::nothrow) uv_connect_t();
367 if (conn == nullptr) {
368 WRITE_LOG(LOG_FATAL, "SetupTCPPoint new conn failed");
369 return false;
370 }
371 conn->data = ctxPoint;
372 uv_tcp_connect(conn, (uv_tcp_t *)&ctxPoint->tcp, (const struct sockaddr *)&addr, ConnectTarget);
373 }
374 return true;
375 }
376
SetupDevicePoint(HCtxForward ctxPoint)377 bool HdcForwardBase::SetupDevicePoint(HCtxForward ctxPoint)
378 {
379 uint8_t flag = 1;
380 string &sNodeCfg = ctxPoint->localArgs[1];
381 string resolvedPath = Base::CanonicalizeSpecPath(sNodeCfg);
382 if ((ctxPoint->fd = open(resolvedPath.c_str(), O_RDWR)) < 0) {
383 ctxPoint->lastError = "Open unix-dev failed";
384 flag = -1;
385 }
386 auto funcRead = [&](const void *a, uint8_t *b, const int c) -> bool {
387 HCtxForward ctx = (HCtxForward)a;
388 return SendToTask(ctx->id, CMD_FORWARD_DATA, b, c);
389 };
390 auto funcFinish = [&](const void *a, const bool b, const string c) -> bool {
391 HCtxForward ctx = (HCtxForward)a;
392 WRITE_LOG(LOG_DEBUG, "Error ReadForwardBuf dev,ret:%d reason:%s", b, c.c_str());
393 FreeContext(ctx, 0, true);
394 return false;
395 };
396 ctxPoint->fdClass = new(std::nothrow) HdcFileDescriptor(loopTask, ctxPoint->fd, ctxPoint, funcRead, funcFinish);
397 if (ctxPoint->fdClass == nullptr) {
398 WRITE_LOG(LOG_FATAL, "SetupDevicePoint new ctxPoint->fdClass failed");
399 return false;
400 }
401 SetupPointContinue(ctxPoint, flag);
402 return true;
403 }
404
LocalAbstractConnect(uv_pipe_t * pipe,string & sNodeCfg)405 bool HdcForwardBase::LocalAbstractConnect(uv_pipe_t *pipe, string &sNodeCfg)
406 {
407 bool abstractRet = false;
408 #ifndef _WIN32
409 int s = 0;
410 do {
411 if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
412 break;
413 }
414 fcntl(s, F_SETFD, FD_CLOEXEC);
415 struct sockaddr_un addr;
416 Base::ZeroStruct(addr);
417 int addrLen = sNodeCfg.size() + offsetof(struct sockaddr_un, sun_path) + 1;
418 addr.sun_family = AF_LOCAL;
419 addr.sun_path[0] = 0;
420
421 if (memcpy_s(addr.sun_path + 1, sizeof(addr.sun_path) - 1, sNodeCfg.c_str(), sNodeCfg.size()) != EOK) {
422 break;
423 };
424 // local connect, ignore timeout
425 if (connect(s, reinterpret_cast<struct sockaddr *>(&addr), addrLen) < 0) {
426 break;
427 }
428 if (uv_pipe_open(pipe, s)) {
429 break;
430 }
431 abstractRet = true;
432 } while (false);
433 if (!abstractRet) {
434 Base::CloseFd(s);
435 }
436 #endif
437 return abstractRet;
438 }
439
SetupFilePoint(HCtxForward ctxPoint)440 bool HdcForwardBase::SetupFilePoint(HCtxForward ctxPoint)
441 {
442 string &sNodeCfg = ctxPoint->localArgs[1];
443 ctxPoint->pipe.data = ctxPoint;
444 uv_pipe_init(loopTask, &ctxPoint->pipe, 0);
445 if (ctxPoint->masterSlave) {
446 if (ctxPoint->type == FORWARD_RESERVED || ctxPoint->type == FORWARD_FILESYSTEM) {
447 unlink(sNodeCfg.c_str());
448 }
449 if (uv_pipe_bind(&ctxPoint->pipe, sNodeCfg.c_str())) {
450 ctxPoint->lastError = "Unix pipe bind failed";
451 return false;
452 }
453 if (uv_listen((uv_stream_t *)&ctxPoint->pipe, 4, ListenCallback)) {
454 ctxPoint->lastError = "Unix pipe listen failed";
455 return false;
456 }
457 } else {
458 if (ctxPoint->type == FORWARD_ABSTRACT) {
459 bool abstractRet = LocalAbstractConnect(&ctxPoint->pipe, sNodeCfg);
460 SetupPointContinue(ctxPoint, abstractRet ? 0 : -1);
461 if (!abstractRet) {
462 ctxPoint->lastError = "LocalAbstractConnect failed";
463 return false;
464 }
465 } else {
466 uv_connect_t *connect = new(std::nothrow) uv_connect_t();
467 if (connect == nullptr) {
468 WRITE_LOG(LOG_FATAL, "SetupFilePoint new connect failed");
469 return false;
470 }
471 connect->data = ctxPoint;
472 uv_pipe_connect(connect, &ctxPoint->pipe, sNodeCfg.c_str(), ConnectTarget);
473 }
474 }
475 return true;
476 }
477
SetupPoint(HCtxForward ctxPoint)478 bool HdcForwardBase::SetupPoint(HCtxForward ctxPoint)
479 {
480 bool ret = true;
481 if (!DetechForwardType(ctxPoint)) {
482 return false;
483 }
484 switch (ctxPoint->type) {
485 case FORWARD_TCP:
486 if (!SetupTCPPoint(ctxPoint)) {
487 ret = false;
488 };
489 break;
490 #ifndef _WIN32
491 case FORWARD_DEVICE:
492 if (!SetupDevicePoint(ctxPoint)) {
493 ret = false;
494 };
495 break;
496 case FORWARD_JDWP:
497 if (!SetupJdwpPoint(ctxPoint)) {
498 ret = false;
499 };
500 break;
501 case FORWARD_ABSTRACT:
502 case FORWARD_RESERVED:
503 case FORWARD_FILESYSTEM:
504 if (!SetupFilePoint(ctxPoint)) {
505 ret = false;
506 };
507 break;
508 #else
509 case FORWARD_DEVICE:
510 case FORWARD_JDWP:
511 case FORWARD_ABSTRACT:
512 case FORWARD_RESERVED:
513 case FORWARD_FILESYSTEM:
514 ctxPoint->lastError = "Not supoort forward-type";
515 ret = false;
516 break;
517 #endif
518 default:
519 ctxPoint->lastError = "Not supoort forward-type";
520 ret = false;
521 break;
522 }
523 return ret;
524 }
525
BeginForward(const string & command,string & sError)526 bool HdcForwardBase::BeginForward(const string &command, string &sError)
527 {
528 bool ret = false;
529 int argc = 0;
530 char bufString[BUF_SIZE_SMALL] = "";
531 HCtxForward ctxPoint = (HCtxForward)MallocContext(true);
532 if (!ctxPoint) {
533 WRITE_LOG(LOG_FATAL, "MallocContext failed");
534 return false;
535 }
536 char **argv = Base::SplitCommandToArgs(command.c_str(), &argc);
537 if (argv == nullptr) {
538 WRITE_LOG(LOG_FATAL, "SplitCommandToArgs failed");
539 return false;
540 }
541 while (true) {
542 if (argc < CMD_ARG1_COUNT) {
543 break;
544 }
545 if (strlen(argv[0]) > BUF_SIZE_SMALL || strlen(argv[1]) > BUF_SIZE_SMALL) {
546 break;
547 }
548 if (!CheckNodeInfo(argv[0], ctxPoint->localArgs)) {
549 break;
550 }
551 if (!CheckNodeInfo(argv[1], ctxPoint->remoteArgs)) {
552 break;
553 }
554 ctxPoint->remoteParamenters = argv[1];
555 if (!SetupPoint(ctxPoint)) {
556 break;
557 }
558
559 ret = true;
560 break;
561 }
562 sError = ctxPoint->lastError;
563 if (ret) {
564 // First 8-byte parameter bit
565 int maxBufSize = sizeof(bufString) - forwardParameterBufSize;
566 if (snprintf_s(bufString + forwardParameterBufSize, maxBufSize, maxBufSize - 1, "%s", argv[1]) > 0) {
567 SendToTask(ctxPoint->id, CMD_FORWARD_CHECK, reinterpret_cast<uint8_t *>(bufString),
568 forwardParameterBufSize + strlen(bufString + forwardParameterBufSize) + 1);
569 taskCommand = command;
570 }
571 }
572 delete[](reinterpret_cast<char *>(argv));
573 return ret;
574 }
575
FilterCommand(uint8_t * bufCmdIn,uint32_t * idOut,uint8_t ** pContentBuf)576 inline bool HdcForwardBase::FilterCommand(uint8_t *bufCmdIn, uint32_t *idOut, uint8_t **pContentBuf)
577 {
578 *pContentBuf = bufCmdIn + DWORD_SERIALIZE_SIZE;
579 *idOut = ntohl(*reinterpret_cast<uint32_t *>(bufCmdIn));
580 return true;
581 }
582
SlaveConnect(uint8_t * bufCmd,bool bCheckPoint,string & sError)583 bool HdcForwardBase::SlaveConnect(uint8_t *bufCmd, bool bCheckPoint, string &sError)
584 {
585 bool ret = false;
586 char *content = nullptr;
587 uint32_t idSlaveOld = 0;
588 HCtxForward ctxPoint = (HCtxForward)MallocContext(false);
589 if (!ctxPoint) {
590 WRITE_LOG(LOG_FATAL, "MallocContext failed");
591 return false;
592 }
593 idSlaveOld = ctxPoint->id;
594 ctxPoint->checkPoint = bCheckPoint;
595 // refresh another id,8byte param
596 FilterCommand(bufCmd, &ctxPoint->id, reinterpret_cast<uint8_t **>(&content));
597 AdminContext(OP_UPDATE, idSlaveOld, ctxPoint);
598 content += forwardParameterBufSize;
599 if (!CheckNodeInfo(content, ctxPoint->localArgs)) {
600 return false;
601 }
602 if ((ctxPoint->checkPoint && slaveCheckWhenBegin) || !ctxPoint->checkPoint) {
603 if (!SetupPoint(ctxPoint)) {
604 WRITE_LOG(LOG_FATAL, "SetupPoint failed");
605 goto Finish;
606 }
607 sError = ctxPoint->lastError;
608 } else {
609 SetupPointContinue(ctxPoint, 0);
610 }
611 ret = true;
612 Finish:
613 if (!ret) {
614 FreeContext(ctxPoint, 0, true);
615 }
616 return ret;
617 }
618
DoForwardBegin(HCtxForward ctx)619 bool HdcForwardBase::DoForwardBegin(HCtxForward ctx)
620 {
621 switch (ctx->type) {
622 case FORWARD_TCP:
623 case FORWARD_JDWP: // jdwp use tcp ->socketpair->jvm
624 uv_tcp_nodelay((uv_tcp_t *)&ctx->tcp, 1);
625 uv_read_start((uv_stream_t *)&ctx->tcp, AllocForwardBuf, ReadForwardBuf);
626 break;
627 case FORWARD_ABSTRACT:
628 case FORWARD_RESERVED:
629 case FORWARD_FILESYSTEM:
630 uv_read_start((uv_stream_t *)&ctx->pipe, AllocForwardBuf, ReadForwardBuf);
631 break;
632 case FORWARD_DEVICE: {
633 ctx->fdClass->StartWorkOnThread();
634 break;
635 }
636 default:
637 break;
638 }
639 ctx->ready = true;
640 return true;
641 }
642
AdminContext(const uint8_t op,const uint32_t id,HCtxForward hInput)643 void *HdcForwardBase::AdminContext(const uint8_t op, const uint32_t id, HCtxForward hInput)
644 {
645 ctxPointMutex.lock();
646 void *hRet = nullptr;
647 map<uint32_t, HCtxForward> &mapCtx = mapCtxPoint;
648 switch (op) {
649 case OP_ADD:
650 mapCtx[id] = hInput;
651 break;
652 case OP_REMOVE:
653 mapCtx.erase(id);
654 break;
655 case OP_QUERY:
656 if (mapCtx.count(id)) {
657 hRet = mapCtx[id];
658 }
659 break;
660 case OP_UPDATE:
661 mapCtx.erase(id);
662 mapCtx[hInput->id] = hInput;
663 break;
664 default:
665 break;
666 }
667 ctxPointMutex.unlock();
668 return hRet;
669 }
670
SendCallbackForwardBuf(uv_write_t * req,int status)671 void HdcForwardBase::SendCallbackForwardBuf(uv_write_t *req, int status)
672 {
673 ContextForwardIO *ctxIO = (ContextForwardIO *)req->data;
674 HCtxForward ctx = reinterpret_cast<HCtxForward>(ctxIO->ctxForward);
675 if (status < 0 && !ctx->finish) {
676 WRITE_LOG(LOG_DEBUG, "SendCallbackForwardBuf ctx->type:%d, status:%d finish", ctx->type, status);
677 ctx->thisClass->FreeContext(ctx, 0, true);
678 }
679 delete[] ctxIO->bufIO;
680 delete ctxIO;
681 delete req;
682 }
683
SendForwardBuf(HCtxForward ctx,uint8_t * bufPtr,const int size)684 int HdcForwardBase::SendForwardBuf(HCtxForward ctx, uint8_t *bufPtr, const int size)
685 {
686 int nRet = 0;
687 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
688 return -1;
689 }
690 if (size <= 0) {
691 WRITE_LOG(LOG_WARN, "SendForwardBuf failed size:%d", size);
692 return -1;
693 }
694 auto pDynBuf = new(std::nothrow) uint8_t[size];
695 if (!pDynBuf) {
696 return -1;
697 }
698 (void)memcpy_s(pDynBuf, size, bufPtr, size);
699 if (ctx->type == FORWARD_DEVICE) {
700 nRet = ctx->fdClass->WriteWithMem(pDynBuf, size);
701 } else {
702 auto ctxIO = new ContextForwardIO();
703 if (!ctxIO) {
704 delete[] pDynBuf;
705 return -1;
706 }
707 ctxIO->ctxForward = ctx;
708 ctxIO->bufIO = pDynBuf;
709 if (ctx->type == FORWARD_TCP || ctx->type == FORWARD_JDWP) {
710 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->tcp, pDynBuf, size, nullptr,
711 (void *)SendCallbackForwardBuf, (void *)ctxIO);
712 } else {
713 // FORWARD_ABSTRACT, FORWARD_RESERVED, FORWARD_FILESYSTEM,
714 nRet = Base::SendToStreamEx((uv_stream_t *)&ctx->pipe, pDynBuf, size, nullptr,
715 (void *)SendCallbackForwardBuf, (void *)ctxIO);
716 }
717 }
718 return nRet;
719 }
720
CommandForwardCheckResult(HCtxForward ctx,uint8_t * payload)721 bool HdcForwardBase::CommandForwardCheckResult(HCtxForward ctx, uint8_t *payload)
722 {
723 bool ret = true;
724 bool bCheck = static_cast<bool>(payload);
725 LogMsg(bCheck ? MSG_OK : MSG_FAIL, "Forwardport result:%s", bCheck ? "OK" : "Failed");
726 if (bCheck) {
727 string mapInfo = taskInfo->serverOrDaemon ? "1|" : "0|";
728 mapInfo += taskCommand;
729 ctx->ready = true;
730 ServerCommand(CMD_FORWARD_SUCCESS, reinterpret_cast<uint8_t *>(const_cast<char *>(mapInfo.c_str())),
731 mapInfo.size() + 1);
732 } else {
733 ret = false;
734 FreeContext(ctx, 0, false);
735 }
736 return ret;
737 }
738
ForwardCommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)739 bool HdcForwardBase::ForwardCommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
740 {
741 bool ret = true;
742 uint8_t *pContent = nullptr;
743 int sizeContent = 0;
744 uint32_t id = 0;
745 HCtxForward ctx = nullptr;
746 FilterCommand(payload, &id, &pContent);
747 sizeContent = payloadSize - DWORD_SERIALIZE_SIZE;
748 if (!(ctx = (HCtxForward)AdminContext(OP_QUERY, id, nullptr))) {
749 WRITE_LOG(LOG_WARN, "Query id failed");
750 return false;
751 }
752 switch (command) {
753 case CMD_FORWARD_CHECK_RESULT: {
754 ret = CommandForwardCheckResult(ctx, payload);
755 break;
756 }
757 case CMD_FORWARD_ACTIVE_MASTER: {
758 ret = DoForwardBegin(ctx);
759 break;
760 }
761 case CMD_FORWARD_DATA: {
762 if (ctx->finish) {
763 break;
764 }
765 if (SendForwardBuf(ctx, pContent, sizeContent) < 0) {
766 FreeContext(ctx, 0, true);
767 }
768 break;
769 }
770 case CMD_FORWARD_FREE_CONTEXT: {
771 FreeContext(ctx, 0, false);
772 break;
773 }
774 default:
775 ret = false;
776 break;
777 }
778 if (!ret) {
779 if (ctx) {
780 FreeContext(ctx, 0, true);
781 } else {
782 WRITE_LOG(LOG_DEBUG, "ctx==nullptr raw free");
783 TaskFinish();
784 }
785 }
786 return ret;
787 }
788
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)789 bool HdcForwardBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
790 {
791 bool ret = true;
792 string sError;
793 // prepare
794 if (command == CMD_FORWARD_INIT) {
795 string strCommand(reinterpret_cast<char *>(payload), payloadSize);
796 if (!BeginForward(strCommand, sError)) {
797 ret = false;
798 goto Finish;
799 }
800 return true;
801 } else if (command == CMD_FORWARD_CHECK) {
802 // Detect remote if it's reachable
803 if (!SlaveConnect(payload, true, sError)) {
804 ret = false;
805 goto Finish;
806 }
807 return true;
808 } else if (command == CMD_FORWARD_ACTIVE_SLAVE) {
809 // slave connect target port when activating
810 if (!SlaveConnect(payload, false, sError)) {
811 ret = false;
812 goto Finish;
813 }
814 return true;
815 }
816 if (!ForwardCommandDispatch(command, payload, payloadSize)) {
817 ret = false;
818 goto Finish;
819 }
820 Finish:
821 if (!ret) {
822 if (!sError.size()) {
823 LogMsg(MSG_FAIL, "Forward parament failed");
824 } else {
825 LogMsg(MSG_FAIL, const_cast<char *>(sError.c_str()));
826 WRITE_LOG(LOG_WARN, const_cast<char *>(sError.c_str()));
827 }
828 }
829 return ret;
830 }
831 } // namespace Hdc
832