• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "arkts_plugin.h"
17 
18 #include <arpa/inet.h>
19 #include <cstdlib>
20 #include <regex>
21 #include <sys/un.h>
22 #include <unistd.h>
23 
24 #include "arkts_plugin_result.pb.h"
25 #include "logging.h"
26 #include "securec.h"
27 
28 namespace {
29 const std::string PANDA = "PandaDebugger";
30 const std::string SNAPSHOT_HEAD =
31     R"({"id":1,"method":"HeapProfiler.takeHeapSnapshot","params":{"reportProgress":true,"captureNumericValue":)";
32 const std::string SNAPSHOT_TAIL = R"(,"exposeInternals":false}})";
33 const std::string TIMELINE_HEAD =
34     R"({"id":1,"method":"HeapProfiler.startTrackingHeapObjects","params":{"trackAllocations":)";
35 const std::string TIMELINE_TAIL = "}}";
36 const std::string TIMELINE_STOP =
37     R"({"id":2,"method":"HeapProfiler.stopTrackingHeapObjects","params":{"reportProgress":true}})";
38 const std::string CPU_PROFILER_INTERVAL_HEAD =
39     R"({"id":3,"method":"Profiler.setSamplingInterval","params":{"interval":)";
40 const std::string CPU_PROFILER_INTERVAL_TAIL = R"(}})";
41 const std::string CPU_PROFILER_START = R"({"id":3,"method":"Profiler.start","params":{}})";
42 const std::string CPU_PROFILER_STOP = R"({"id":3,"method":"Profiler.stop","params":{}})";
43 constexpr uint8_t TIMELINE_START_SUCCESS = 0x1;
44 constexpr uint8_t CPU_PROFILER_START_SUCCESS = 0x2;
45 const std::string RESPONSE_FLAG_HEAD = R"({"id":)";
46 const std::string RESPONSE_FLAG_TAIL = R"(,"result":{}})";
47 const std::string REGEX_PATTERN = R"("id":(\d+))";
48 const std::string ARKTS_SCHEDULE = R"(ArkTS_Snapshot)";
49 enum class HeapType : int32_t {
50     INVALID = -1,
51     SNAPSHOT,
52     TIMELINE,
53 };
54 constexpr char CLIENT_WEBSOCKET_UPGRADE_REQ[] =
55     "GET / HTTP/1.1\r\n"
56     "Connection: Upgrade\r\n"
57     "Pragma: no-cache\r\n"
58     "Cache-Control: no-cache\r\n"
59     "Upgrade: websocket\r\n"
60     "Sec-WebSocket-Version: 13\r\n"
61     "Accept-Encoding: gzip, deflate, br\r\n"
62     "Sec-WebSocket-Key: 64b4B+s5JDlgkdg7NekJ+g==\r\n"
63     "Sec-WebSocket-Extensions: permessage-deflate\r\n";
64 constexpr int32_t CLIENT_WEBSOCKET_UPGRADE_RSP_LEN = 129;
65 constexpr int32_t SOCKET_MASK_LEN = 4;
66 constexpr char MASK_KEY[SOCKET_MASK_LEN + 1] = "abcd";
67 constexpr uint32_t TIME_OUT = 5;
68 constexpr uint32_t TIME_BASE = 1000;
69 constexpr int32_t SOCKET_SUCCESS = 0;
70 constexpr int32_t SOCKET_HEADER_LEN = 2;
71 constexpr int32_t PAYLOAD_LEN = 2;
72 constexpr int32_t EXTEND_PAYLOAD_LEN = 8;
73 constexpr uint32_t CPU_PROFILER_INTERVAL_DEFAULT = 1000;
74 } // namespace
75 
Start(const uint8_t * configData,uint32_t configSize)76 int32_t ArkTSPlugin::Start(const uint8_t* configData, uint32_t configSize)
77 {
78     if (protoConfig_.ParseFromArray(configData, configSize) <= 0) {
79         PROFILER_LOG_ERROR(LOG_CORE, "%s:parseFromArray failed!", __func__);
80         return -1;
81     }
82 
83     if (!protoConfig_.split_outfile_name().empty()) {
84         splitTraceWriter_ = std::make_shared<TraceFileWriter>(protoConfig_.split_outfile_name());
85         splitTraceWriter_->WriteStandalonePluginData(
86             std::string(g_pluginModule.name) + "_config",
87             std::string(reinterpret_cast<const char *>(configData),
88                         configSize));
89         splitTraceWriter_->SetTimeSource();
90     }
91 
92     pid_ = protoConfig_.pid();
93     if (pid_ <= 0) {
94         PROFILER_LOG_ERROR(LOG_CORE, "%s: pid is less than or equal to 0", __func__);
95         return -1;
96     }
97 
98     if (!ClientConnectUnixWebSocket(std::to_string(pid_) + PANDA, TIME_OUT)) {
99         return -1;
100     }
101 
102     if (!ClientSendWSUpgradeReq()) {
103         return -1;
104     }
105 
106     if (!ClientRecvWSUpgradeRsp()) {
107         return -1;
108     }
109 
110     if (protoConfig_.enable_cpu_profiler()) {
111         if (EnableCpuProfiler() != 0) {
112             PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin cpu profiler start failed");
113             return -1;
114         }
115     }
116 
117     switch (static_cast<int32_t>(protoConfig_.type())) {
118         case static_cast<int32_t>(HeapType::SNAPSHOT): {
119             return EnableSnapshot();
120         }
121         case static_cast<int32_t>(HeapType::TIMELINE): {
122             return EnableTimeline();
123         }
124         case static_cast<int32_t>(HeapType::INVALID): {
125             PROFILER_LOG_INFO(LOG_CORE, "arkts plugin memory type is INVALID");
126             return 0;
127         }
128         default: {
129             PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin start type error");
130             return -1;
131         }
132     }
133 }
134 
EnableTimeline()135 int32_t ArkTSPlugin::EnableTimeline()
136 {
137     std::string timelineCmd = TIMELINE_HEAD + (protoConfig_.track_allocations() ? "true" : "false") + TIMELINE_TAIL;
138     if (!ClientSendReq(timelineCmd)) {
139         return -1;
140     }
141     FlushData(timelineCmd);
142     commandResult_ |= TIMELINE_START_SUCCESS;
143     return 0;
144 }
145 
EnableSnapshot()146 int32_t ArkTSPlugin::EnableSnapshot()
147 {
148     if (protoConfig_.interval() == 0) {
149         PROFILER_LOG_ERROR(LOG_CORE, "%s:scheduleTask interval == 0 error!", __func__);
150         return -1;
151     }
152     snapshotCmd_ = SNAPSHOT_HEAD + (protoConfig_.capture_numeric_value() ? "true" : "false") + SNAPSHOT_TAIL;
153     auto callback = std::bind(&ArkTSPlugin::Snapshot, this);
154     snapshotScheduleTaskFd_ = scheduleTaskManager_.ScheduleTask(callback, protoConfig_.interval() * TIME_BASE);
155     if (snapshotScheduleTaskFd_ == -1) {
156         PROFILER_LOG_ERROR(LOG_CORE, "%s:scheduleTask failed!", __func__);
157         return -1;
158     }
159     return 0;
160 }
161 
EnableCpuProfiler()162 int32_t ArkTSPlugin::EnableCpuProfiler()
163 {
164     std::string interval = CPU_PROFILER_INTERVAL_HEAD
165         + (protoConfig_.cpu_profiler_interval() == 0 ?
166         std::to_string(CPU_PROFILER_INTERVAL_DEFAULT) : std::to_string(protoConfig_.cpu_profiler_interval()))
167         + CPU_PROFILER_INTERVAL_TAIL;
168     if (!ClientSendReq(interval)) {
169         return -1;
170     }
171     FlushData(interval);
172 
173     if (!ClientSendReq(CPU_PROFILER_START)) {
174         return -1;
175     }
176     FlushData(CPU_PROFILER_START);
177     commandResult_ |= CPU_PROFILER_START_SUCCESS;
178     return 0;
179 }
180 
Stop()181 int32_t ArkTSPlugin::Stop()
182 {
183     switch (static_cast<int32_t>(protoConfig_.type())) {
184         case static_cast<int32_t>(HeapType::SNAPSHOT): {
185             scheduleTaskManager_.UnscheduleTask(snapshotScheduleTaskFd_);
186             break;
187         }
188         case static_cast<int32_t>(HeapType::TIMELINE): {
189             if (commandResult_ & TIMELINE_START_SUCCESS) {
190                 if (!ClientSendReq(TIMELINE_STOP)) {
191                     break;
192                 }
193                 FlushData(TIMELINE_STOP);
194             }
195             break;
196         }
197         case static_cast<int32_t>(HeapType::INVALID): {
198             break;
199         }
200         default: {
201             PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin stop type error");
202             break;
203         }
204     }
205 
206     if (protoConfig_.enable_cpu_profiler() && (commandResult_ & CPU_PROFILER_START_SUCCESS)) {
207         if (ClientSendReq(CPU_PROFILER_STOP)) {
208             FlushData();
209         }
210     }
211     Close();
212 
213     if (!protoConfig_.split_outfile_name().empty()) { // write split file.
214         CHECK_NOTNULL(splitTraceWriter_, -1, "%s: writer is nullptr, WriteStandaloneFile failed", __func__);
215         splitTraceWriter_->SetDurationTime();
216         splitTraceWriter_->Finish();
217         splitTraceWriter_.reset();
218         splitTraceWriter_ = nullptr;
219     }
220     return 0;
221 }
222 
SetWriter(WriterStruct * writer)223 void ArkTSPlugin::SetWriter(WriterStruct* writer)
224 {
225     resultWriter_ = writer;
226 }
227 
Snapshot()228 void ArkTSPlugin::Snapshot()
229 {
230     CHECK_NOTNULL(resultWriter_, NO_RETVAL, "%s: resultWriter_ nullptr", __func__);
231     if (!ClientSendReq(snapshotCmd_)) {
232         return;
233     }
234     FlushData(snapshotCmd_);
235 }
236 
FlushData(const std::string & command)237 void ArkTSPlugin::FlushData(const std::string& command)
238 {
239     std::string endFlag;
240     if (!command.empty()) {
241         std::regex pattern(REGEX_PATTERN);
242         std::smatch match;
243         if (std::regex_search(command, match, pattern)) {
244             endFlag = RESPONSE_FLAG_HEAD + match[1].str() + RESPONSE_FLAG_TAIL;
245         }
246     }
247     if (!protoConfig_.split_outfile_name().empty()) {
248         CHECK_NOTNULL(splitTraceWriter_, NO_RETVAL, "%s: writer is nullptr, WriteStandaloneFile failed", __func__);
249     }
250 
251     while (true) {
252         std::string recv = Decode();
253         if (recv.empty()) {
254             PROFILER_LOG_ERROR(LOG_CORE, "%s: recv is empty", __func__);
255             break;
256         }
257         ArkTSResult data;
258         data.set_result(recv.c_str(), recv.size());
259         buffer_.resize(data.ByteSizeLong());
260         data.SerializeToArray(buffer_.data(), buffer_.size());
261 
262         if (protoConfig_.split_outfile_name().empty()) {
263             resultWriter_->write(resultWriter_, buffer_.data(), buffer_.size());
264             resultWriter_->flush(resultWriter_);
265         } else { // write split file.
266             splitTraceWriter_->WriteStandalonePluginData(
267                 std::string(g_pluginModule.name),
268                 std::string(buffer_.data(), buffer_.size()),
269                 std::string(g_pluginModule.version));
270         }
271 
272         if (endFlag.empty() || recv == endFlag) {
273             break;
274         }
275     }
276 
277     if (!protoConfig_.split_outfile_name().empty()) {
278         splitTraceWriter_->Flush();
279     }
280 }
281 
ClientConnectUnixWebSocket(const std::string & sockName,uint32_t timeoutLimit)282 bool ArkTSPlugin::ClientConnectUnixWebSocket(const std::string& sockName, uint32_t timeoutLimit)
283 {
284     if (socketState_ != SocketState::UNINITED) {
285         PROFILER_LOG_ERROR(LOG_CORE, "client has inited");
286         return true;
287     }
288 
289     client_ = socket(AF_UNIX, SOCK_STREAM, 0);
290     if (client_ < SOCKET_SUCCESS) {
291         PROFILER_LOG_ERROR(LOG_CORE, "client socket failed, error = %d, , desc = %s", errno, strerror(errno));
292         return false;
293     }
294 
295     // set send and recv timeout limit
296     if (!SetWebSocketTimeOut(client_, timeoutLimit)) {
297         PROFILER_LOG_ERROR(LOG_CORE, "client SetWebSocketTimeOut failed, error = %d, desc = %s",
298                            errno, strerror(errno));
299         close(client_);
300         client_ = -1;
301         return false;
302     }
303 
304     struct sockaddr_un serverAddr;
305     if (memset_s(&serverAddr, sizeof(serverAddr), 0, sizeof(serverAddr)) != EOK) {
306         PROFILER_LOG_ERROR(LOG_CORE, "client memset_s serverAddr failed, error = %d, desc = %s",
307                            errno, strerror(errno));
308         close(client_);
309         client_ = -1;
310         return false;
311     }
312     serverAddr.sun_family = AF_UNIX;
313     if (strcpy_s(serverAddr.sun_path + 1, sizeof(serverAddr.sun_path) - 1, sockName.c_str()) != EOK) {
314         PROFILER_LOG_ERROR(LOG_CORE, "client strcpy_s serverAddr.sun_path failed, error = %d, , desc = %s",
315                            errno, strerror(errno));
316         close(client_);
317         client_ = -1;
318         return false;
319     }
320     serverAddr.sun_path[0] = '\0';
321 
322     uint32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(sockName.c_str()) + 1;
323     int ret = connect(client_, reinterpret_cast<struct sockaddr*>(&serverAddr), static_cast<int32_t>(len));
324     if (ret != SOCKET_SUCCESS) {
325         PROFILER_LOG_ERROR(LOG_CORE, "client connect failed, error, error = %d, , desc = %s", errno, strerror(errno));
326         close(client_);
327         client_ = -1;
328         return false;
329     }
330     socketState_ = SocketState::INITED;
331     PROFILER_LOG_INFO(LOG_CORE, "client connect success...");
332     return true;
333 }
334 
ClientSendWSUpgradeReq()335 bool ArkTSPlugin::ClientSendWSUpgradeReq()
336 {
337     if (socketState_ == SocketState::UNINITED) {
338         PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
339         return false;
340     }
341     if (socketState_ == SocketState::CONNECTED) {
342         PROFILER_LOG_ERROR(LOG_CORE, "client has connected");
343         return true;
344     }
345 
346     int msgLen = strlen(CLIENT_WEBSOCKET_UPGRADE_REQ);
347     int32_t sendLen = send(client_, CLIENT_WEBSOCKET_UPGRADE_REQ, msgLen, 0);
348     if (sendLen != msgLen) {
349         PROFILER_LOG_ERROR(LOG_CORE, "client send wsupgrade req failed, error = %d, desc = %s", errno, strerror(errno));
350         socketState_ = SocketState::UNINITED;
351         shutdown(client_, SHUT_RDWR);
352         close(client_);
353         client_ = -1;
354         return false;
355     }
356     PROFILER_LOG_INFO(LOG_CORE, "client send wsupgrade req success");
357     return true;
358 }
359 
ClientRecvWSUpgradeRsp()360 bool ArkTSPlugin::ClientRecvWSUpgradeRsp()
361 {
362     if (socketState_ == SocketState::UNINITED) {
363         PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
364         return false;
365     }
366     if (socketState_ == SocketState::CONNECTED) {
367         PROFILER_LOG_ERROR(LOG_CORE, "ClientRecvWSUpgradeRsp::client has connected");
368         return true;
369     }
370 
371     char recvBuf[CLIENT_WEBSOCKET_UPGRADE_RSP_LEN + 1] = {0};
372     int32_t bufLen = recv(client_, recvBuf, CLIENT_WEBSOCKET_UPGRADE_RSP_LEN, 0);
373     if (bufLen != CLIENT_WEBSOCKET_UPGRADE_RSP_LEN) {
374         PROFILER_LOG_ERROR(LOG_CORE, "client recv wsupgrade rsp failed, error = %d, desc = %s", errno, strerror(errno));
375         socketState_ = SocketState::UNINITED;
376         shutdown(client_, SHUT_RDWR);
377         close(client_);
378         client_ = -1;
379         return false;
380     }
381     socketState_ = SocketState::CONNECTED;
382     PROFILER_LOG_INFO(LOG_CORE, "client recv wsupgrade rsp success");
383     return true;
384 }
385 
ClientSendReq(const std::string & message)386 bool ArkTSPlugin::ClientSendReq(const std::string& message)
387 {
388     if (socketState_ != SocketState::CONNECTED) {
389         PROFILER_LOG_ERROR(LOG_CORE, "client has not connected");
390         return false;
391     }
392 
393     uint32_t msgLen = message.length();
394     std::unique_ptr<char[]> msgBuf = std::make_unique<char[]>(msgLen + 15); // 15: the maximum expand length
395     char* sendBuf = msgBuf.get();
396     uint32_t sendMsgLen = 0;
397     sendBuf[0] = 0x81; // 0x81: the text message sent by the server should start with '0x81'.
398     uint32_t mask = 1;
399     // Depending on the length of the messages, client will use shift operation to get the res
400     // and store them in the buffer.
401     if (msgLen <= 125) {                     // 125: situation 1 when message's length <= 125
402         sendBuf[1] = msgLen | (mask << 7);   // 7: mask need shift left by 7 bits
403         sendMsgLen = 2;                      // 2: the length of header frame is 2;
404     } else if (msgLen < 65536) {             // 65536: message's length
405         sendBuf[1] = 126 | (mask << 7);      // 126: payloadLen according to the spec; 7: mask shift left by 7 bits
406         sendBuf[2] = ((msgLen >> 8) & 0xff); // 8: shift right by 8 bits => res * (256^1)
407         sendBuf[3] = (msgLen & 0xff);        // 3: store len's data => res * (256^0)
408         sendMsgLen = 4;                      // 4: the length of header frame is 4
409     } else {
410         sendBuf[1] = 127 | (mask << 7);    // 127: payloadLen according to the spec; 7: mask shift left by 7 bits
411         for (int32_t i = 2; i <= 5; i++) { // 2 ~ 5: unused bits
412             sendBuf[i] = 0;
413         }
414         sendBuf[6] = ((msgLen & 0xff000000) >> 24); // 6: shift 24 bits => res * (256^3)
415         sendBuf[7] = ((msgLen & 0x00ff0000) >> 16); // 7: shift 16 bits => res * (256^2)
416         sendBuf[8] = ((msgLen & 0x0000ff00) >> 8);  // 8: shift 8 bits => res * (256^1)
417         sendBuf[9] = (msgLen & 0x000000ff);         // 9: res * (256^0)
418         sendMsgLen = 10;                            // 10: the length of header frame is 10
419     }
420 
421     if (memcpy_s(sendBuf + sendMsgLen, SOCKET_MASK_LEN, MASK_KEY, SOCKET_MASK_LEN) != EOK) {
422         PROFILER_LOG_ERROR(LOG_CORE, "client memcpy_s MASK_KEY failed, error = %d, desc = %s", errno, strerror(errno));
423         return false;
424     }
425     sendMsgLen += SOCKET_MASK_LEN;
426 
427     std::string maskMessage;
428     for (uint64_t i = 0; i < msgLen; i++) {
429         uint64_t j = i % SOCKET_MASK_LEN;
430         maskMessage.push_back(message[i] ^ MASK_KEY[j]);
431     }
432     if (memcpy_s(sendBuf + sendMsgLen, msgLen, maskMessage.c_str(), msgLen) != EOK) {
433         PROFILER_LOG_ERROR(LOG_CORE, "client memcpy_s maskMessage failed, error = %d, desc = %s",
434                            errno, strerror(errno));
435         return false;
436     }
437     msgBuf[sendMsgLen + msgLen] = '\0';
438 
439     if (send(client_, sendBuf, sendMsgLen + msgLen, 0) != static_cast<int>(sendMsgLen + msgLen)) {
440         PROFILER_LOG_ERROR(LOG_CORE, "client send msg req failed, error = %d, desc = %s", errno, strerror(errno));
441         return false;
442     }
443     PROFILER_LOG_INFO(LOG_CORE, "ClientRecvWSUpgradeRsp::client send msg req success...");
444     return true;
445 }
446 
Close()447 void ArkTSPlugin::Close()
448 {
449     if (socketState_ == SocketState::UNINITED) {
450         PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
451         return;
452     }
453     shutdown(client_, SHUT_RDWR);
454     close(client_);
455     client_ = -1;
456     socketState_ = SocketState::UNINITED;
457 }
458 
SetWebSocketTimeOut(int32_t fd,uint32_t timeoutLimit)459 bool ArkTSPlugin::SetWebSocketTimeOut(int32_t fd, uint32_t timeoutLimit)
460 {
461     if (timeoutLimit > 0) {
462         struct timeval timeout = {timeoutLimit, 0};
463         if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) != SOCKET_SUCCESS) {
464             return false;
465         }
466         if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != SOCKET_SUCCESS) {
467             return false;
468         }
469     }
470     return true;
471 }
472 
Decode()473 std::string ArkTSPlugin::Decode()
474 {
475     if (socketState_ != SocketState::CONNECTED) {
476         PROFILER_LOG_ERROR(LOG_CORE, "client has not connected");
477         return {};
478     }
479     char recvbuf[SOCKET_HEADER_LEN + 1];
480     if (!Recv(client_, recvbuf, SOCKET_HEADER_LEN, 0)) {
481         PROFILER_LOG_ERROR(LOG_CORE, "Decode failed, client websocket disconnect");
482         socketState_ = SocketState::INITED;
483         shutdown(client_, SHUT_RDWR);
484         close(client_);
485         client_ = -1;
486         return {};
487     }
488     recvbuf[SOCKET_HEADER_LEN] = '\0';
489     WebSocketFrame wsFrame;
490     int32_t index = 0;
491     wsFrame.fin = static_cast<uint8_t>(recvbuf[index] >> 7); // 7: shift right by 7 bits to get the fin
492     wsFrame.opCode = static_cast<uint8_t>(recvbuf[index] & 0xf);
493     if (wsFrame.opCode == 0x1) { // 0x1: 0x1 means a text frame
494         index++;
495         wsFrame.mask = static_cast<uint8_t>((recvbuf[index] >> 7) & 0x1); // 7: to get the mask
496         wsFrame.payloadLen = recvbuf[index] & 0x7f;
497         HandleFrame(wsFrame);
498         return wsFrame.payload.get();
499     }
500     return std::string();
501 }
502 
NetToHostLongLong(char * buf,uint32_t len)503 uint64_t ArkTSPlugin::NetToHostLongLong(char* buf, uint32_t len)
504 {
505     uint64_t result = 0;
506     for (uint32_t i = 0; i < len; i++) {
507         result |= static_cast<unsigned char>(buf[i]);
508         if ((i + 1) < len) {
509             result <<= 8; // 8: result need shift left 8 bits in order to big endian convert to int
510         }
511     }
512     return result;
513 }
514 
HandleFrame(WebSocketFrame & wsFrame)515 bool ArkTSPlugin::HandleFrame(WebSocketFrame& wsFrame)
516 {
517     if (wsFrame.payloadLen == 126) { // 126: the payloadLen read from frame
518         char recvbuf[PAYLOAD_LEN + 1] = {0};
519         if (!Recv(client_, recvbuf, PAYLOAD_LEN, 0)) {
520             PROFILER_LOG_ERROR(LOG_CORE, "HandleFrame: Recv payloadLen == 126 failed");
521             return false;
522         }
523         recvbuf[PAYLOAD_LEN] = '\0';
524         uint16_t msgLen = 0;
525         if (memcpy_s(&msgLen, sizeof(recvbuf), recvbuf, sizeof(recvbuf) - 1) != EOK) {
526             return false;
527         }
528         wsFrame.payloadLen = ntohs(msgLen);
529     } else if (wsFrame.payloadLen > 126) { // 126: the payloadLen read from frame
530         char recvbuf[EXTEND_PAYLOAD_LEN + 1] = {0};
531         if (!Recv(client_, recvbuf, EXTEND_PAYLOAD_LEN, 0)) {
532             PROFILER_LOG_ERROR(LOG_CORE, "HandleFrame: Recv payloadLen > 127 failed");
533             return false;
534         }
535         recvbuf[EXTEND_PAYLOAD_LEN] = '\0';
536         wsFrame.payloadLen = NetToHostLongLong(recvbuf, EXTEND_PAYLOAD_LEN);
537     }
538     return DecodeMessage(wsFrame);
539 }
540 
DecodeMessage(WebSocketFrame & wsFrame)541 bool ArkTSPlugin::DecodeMessage(WebSocketFrame& wsFrame)
542 {
543     if (wsFrame.payloadLen == 0 || wsFrame.payloadLen > UINT64_MAX) {
544         return false;
545     }
546     wsFrame.payload = std::make_unique<char[]>(wsFrame.payloadLen + 1);
547     if (wsFrame.mask == 1) {
548         CHECK_TRUE(Recv(client_, wsFrame.maskingKey, SOCKET_MASK_LEN, 0), false,
549                    "DecodeMessage: Recv maskingKey failed");
550         wsFrame.maskingKey[SOCKET_MASK_LEN] = '\0';
551 
552         char buf[wsFrame.payloadLen + 1];
553         CHECK_TRUE(Recv(client_, buf, wsFrame.payloadLen, 0), false, "DecodeMessage: Recv message with mask failed");
554         buf[wsFrame.payloadLen] = '\0';
555 
556         for (uint64_t i = 0; i < wsFrame.payloadLen; i++) {
557             uint64_t j = i % SOCKET_MASK_LEN;
558             wsFrame.payload.get()[i] = buf[i] ^ wsFrame.maskingKey[j];
559         }
560     } else {
561         if (!Recv(client_, wsFrame.payload.get(), wsFrame.payloadLen, 0)) {
562             return false;
563         }
564     }
565     wsFrame.payload.get()[wsFrame.payloadLen] = '\0';
566     return true;
567 }
568 
Recv(int32_t client,char * buf,size_t totalLen,int32_t flags) const569 bool ArkTSPlugin::Recv(int32_t client, char* buf, size_t totalLen, int32_t flags) const
570 {
571     size_t recvLen = 0;
572     while (recvLen < totalLen) {
573         ssize_t len = recv(client, buf + recvLen, totalLen - recvLen, flags);
574         CHECK_TRUE(len > 0, false, "Recv payload in while failed, websocket disconnect");
575         recvLen += static_cast<size_t>(len);
576     }
577     buf[totalLen] = '\0';
578     return true;
579 }