1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "arkts_plugin.h"
17
18 #include <arpa/inet.h>
19 #include <cstdlib>
20 #include <regex>
21 #include <sys/un.h>
22 #include <unistd.h>
23
24 #include "arkts_plugin_result.pb.h"
25 #include "logging.h"
26 #include "securec.h"
27
28 namespace {
29 const std::string PANDA = "PandaDebugger";
30 const std::string SNAPSHOT_HEAD =
31 R"({"id":1,"method":"HeapProfiler.takeHeapSnapshot","params":{"reportProgress":true,"captureNumericValue":)";
32 const std::string SNAPSHOT_TAIL = R"(,"exposeInternals":false}})";
33 const std::string TIMELINE_HEAD =
34 R"({"id":1,"method":"HeapProfiler.startTrackingHeapObjects","params":{"trackAllocations":)";
35 const std::string TIMELINE_TAIL = "}}";
36 const std::string TIMELINE_STOP =
37 R"({"id":2,"method":"HeapProfiler.stopTrackingHeapObjects","params":{"reportProgress":true}})";
38 const std::string CPU_PROFILER_INTERVAL_HEAD =
39 R"({"id":3,"method":"Profiler.setSamplingInterval","params":{"interval":)";
40 const std::string CPU_PROFILER_INTERVAL_TAIL = R"(}})";
41 const std::string CPU_PROFILER_START = R"({"id":3,"method":"Profiler.start","params":{}})";
42 const std::string CPU_PROFILER_STOP = R"({"id":3,"method":"Profiler.stop","params":{}})";
43 constexpr uint8_t TIMELINE_START_SUCCESS = 0x1;
44 constexpr uint8_t CPU_PROFILER_START_SUCCESS = 0x2;
45 const std::string RESPONSE_FLAG_HEAD = R"({"id":)";
46 const std::string RESPONSE_FLAG_TAIL = R"(,"result":{}})";
47 const std::string REGEX_PATTERN = R"("id":(\d+))";
48 const std::string ARKTS_SCHEDULE = R"(ArkTS_Snapshot)";
49 enum class HeapType : int32_t {
50 INVALID = -1,
51 SNAPSHOT,
52 TIMELINE,
53 };
54 constexpr char CLIENT_WEBSOCKET_UPGRADE_REQ[] =
55 "GET / HTTP/1.1\r\n"
56 "Connection: Upgrade\r\n"
57 "Pragma: no-cache\r\n"
58 "Cache-Control: no-cache\r\n"
59 "Upgrade: websocket\r\n"
60 "Sec-WebSocket-Version: 13\r\n"
61 "Accept-Encoding: gzip, deflate, br\r\n"
62 "Sec-WebSocket-Key: 64b4B+s5JDlgkdg7NekJ+g==\r\n"
63 "Sec-WebSocket-Extensions: permessage-deflate\r\n";
64 constexpr int32_t CLIENT_WEBSOCKET_UPGRADE_RSP_LEN = 129;
65 constexpr int32_t SOCKET_MASK_LEN = 4;
66 constexpr char MASK_KEY[SOCKET_MASK_LEN + 1] = "abcd";
67 constexpr uint32_t TIME_OUT = 5;
68 constexpr uint32_t TIME_BASE = 1000;
69 constexpr int32_t SOCKET_SUCCESS = 0;
70 constexpr int32_t SOCKET_HEADER_LEN = 2;
71 constexpr int32_t PAYLOAD_LEN = 2;
72 constexpr int32_t EXTEND_PAYLOAD_LEN = 8;
73 constexpr uint32_t CPU_PROFILER_INTERVAL_DEFAULT = 1000;
74 } // namespace
75
Start(const uint8_t * configData,uint32_t configSize)76 int32_t ArkTSPlugin::Start(const uint8_t* configData, uint32_t configSize)
77 {
78 if (protoConfig_.ParseFromArray(configData, configSize) <= 0) {
79 PROFILER_LOG_ERROR(LOG_CORE, "%s:parseFromArray failed!", __func__);
80 return -1;
81 }
82
83 if (!protoConfig_.split_outfile_name().empty()) {
84 splitTraceWriter_ = std::make_shared<TraceFileWriter>(protoConfig_.split_outfile_name());
85 splitTraceWriter_->WriteStandalonePluginData(
86 std::string(g_pluginModule.name) + "_config",
87 std::string(reinterpret_cast<const char *>(configData),
88 configSize));
89 splitTraceWriter_->SetTimeSource();
90 }
91
92 pid_ = protoConfig_.pid();
93 if (pid_ <= 0) {
94 PROFILER_LOG_ERROR(LOG_CORE, "%s: pid is less than or equal to 0", __func__);
95 return -1;
96 }
97
98 if (!ClientConnectUnixWebSocket(std::to_string(pid_) + PANDA, TIME_OUT)) {
99 return -1;
100 }
101
102 if (!ClientSendWSUpgradeReq()) {
103 return -1;
104 }
105
106 if (!ClientRecvWSUpgradeRsp()) {
107 return -1;
108 }
109
110 if (protoConfig_.enable_cpu_profiler()) {
111 if (EnableCpuProfiler() != 0) {
112 PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin cpu profiler start failed");
113 return -1;
114 }
115 }
116
117 switch (static_cast<int32_t>(protoConfig_.type())) {
118 case static_cast<int32_t>(HeapType::SNAPSHOT): {
119 return EnableSnapshot();
120 }
121 case static_cast<int32_t>(HeapType::TIMELINE): {
122 return EnableTimeline();
123 }
124 case static_cast<int32_t>(HeapType::INVALID): {
125 PROFILER_LOG_INFO(LOG_CORE, "arkts plugin memory type is INVALID");
126 return 0;
127 }
128 default: {
129 PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin start type error");
130 return -1;
131 }
132 }
133 }
134
EnableTimeline()135 int32_t ArkTSPlugin::EnableTimeline()
136 {
137 std::string timelineCmd = TIMELINE_HEAD + (protoConfig_.track_allocations() ? "true" : "false") + TIMELINE_TAIL;
138 if (!ClientSendReq(timelineCmd)) {
139 return -1;
140 }
141 FlushData(timelineCmd);
142 commandResult_ |= TIMELINE_START_SUCCESS;
143 return 0;
144 }
145
EnableSnapshot()146 int32_t ArkTSPlugin::EnableSnapshot()
147 {
148 if (protoConfig_.interval() == 0) {
149 PROFILER_LOG_ERROR(LOG_CORE, "%s:scheduleTask interval == 0 error!", __func__);
150 return -1;
151 }
152 snapshotCmd_ = SNAPSHOT_HEAD + (protoConfig_.capture_numeric_value() ? "true" : "false") + SNAPSHOT_TAIL;
153 auto callback = std::bind(&ArkTSPlugin::Snapshot, this);
154 snapshotScheduleTaskFd_ = scheduleTaskManager_.ScheduleTask(callback, protoConfig_.interval() * TIME_BASE);
155 if (snapshotScheduleTaskFd_ == -1) {
156 PROFILER_LOG_ERROR(LOG_CORE, "%s:scheduleTask failed!", __func__);
157 return -1;
158 }
159 return 0;
160 }
161
EnableCpuProfiler()162 int32_t ArkTSPlugin::EnableCpuProfiler()
163 {
164 std::string interval = CPU_PROFILER_INTERVAL_HEAD
165 + (protoConfig_.cpu_profiler_interval() == 0 ?
166 std::to_string(CPU_PROFILER_INTERVAL_DEFAULT) : std::to_string(protoConfig_.cpu_profiler_interval()))
167 + CPU_PROFILER_INTERVAL_TAIL;
168 if (!ClientSendReq(interval)) {
169 return -1;
170 }
171 FlushData(interval);
172
173 if (!ClientSendReq(CPU_PROFILER_START)) {
174 return -1;
175 }
176 FlushData(CPU_PROFILER_START);
177 commandResult_ |= CPU_PROFILER_START_SUCCESS;
178 return 0;
179 }
180
Stop()181 int32_t ArkTSPlugin::Stop()
182 {
183 switch (static_cast<int32_t>(protoConfig_.type())) {
184 case static_cast<int32_t>(HeapType::SNAPSHOT): {
185 scheduleTaskManager_.UnscheduleTask(snapshotScheduleTaskFd_);
186 break;
187 }
188 case static_cast<int32_t>(HeapType::TIMELINE): {
189 if (commandResult_ & TIMELINE_START_SUCCESS) {
190 if (!ClientSendReq(TIMELINE_STOP)) {
191 break;
192 }
193 FlushData(TIMELINE_STOP);
194 }
195 break;
196 }
197 case static_cast<int32_t>(HeapType::INVALID): {
198 break;
199 }
200 default: {
201 PROFILER_LOG_ERROR(LOG_CORE, "arkts plugin stop type error");
202 break;
203 }
204 }
205
206 if (protoConfig_.enable_cpu_profiler() && (commandResult_ & CPU_PROFILER_START_SUCCESS)) {
207 if (ClientSendReq(CPU_PROFILER_STOP)) {
208 FlushData();
209 }
210 }
211 Close();
212
213 if (!protoConfig_.split_outfile_name().empty()) { // write split file.
214 CHECK_NOTNULL(splitTraceWriter_, -1, "%s: writer is nullptr, WriteStandaloneFile failed", __func__);
215 splitTraceWriter_->SetDurationTime();
216 splitTraceWriter_->Finish();
217 splitTraceWriter_.reset();
218 splitTraceWriter_ = nullptr;
219 }
220 return 0;
221 }
222
SetWriter(WriterStruct * writer)223 void ArkTSPlugin::SetWriter(WriterStruct* writer)
224 {
225 resultWriter_ = writer;
226 }
227
Snapshot()228 void ArkTSPlugin::Snapshot()
229 {
230 CHECK_NOTNULL(resultWriter_, NO_RETVAL, "%s: resultWriter_ nullptr", __func__);
231 if (!ClientSendReq(snapshotCmd_)) {
232 return;
233 }
234 FlushData(snapshotCmd_);
235 }
236
FlushData(const std::string & command)237 void ArkTSPlugin::FlushData(const std::string& command)
238 {
239 std::string endFlag;
240 if (!command.empty()) {
241 std::regex pattern(REGEX_PATTERN);
242 std::smatch match;
243 if (std::regex_search(command, match, pattern)) {
244 endFlag = RESPONSE_FLAG_HEAD + match[1].str() + RESPONSE_FLAG_TAIL;
245 }
246 }
247 if (!protoConfig_.split_outfile_name().empty()) {
248 CHECK_NOTNULL(splitTraceWriter_, NO_RETVAL, "%s: writer is nullptr, WriteStandaloneFile failed", __func__);
249 }
250
251 while (true) {
252 std::string recv = Decode();
253 if (recv.empty()) {
254 PROFILER_LOG_ERROR(LOG_CORE, "%s: recv is empty", __func__);
255 break;
256 }
257 ArkTSResult data;
258 data.set_result(recv.c_str(), recv.size());
259 buffer_.resize(data.ByteSizeLong());
260 data.SerializeToArray(buffer_.data(), buffer_.size());
261
262 if (protoConfig_.split_outfile_name().empty()) {
263 resultWriter_->write(resultWriter_, buffer_.data(), buffer_.size());
264 resultWriter_->flush(resultWriter_);
265 } else { // write split file.
266 splitTraceWriter_->WriteStandalonePluginData(
267 std::string(g_pluginModule.name),
268 std::string(buffer_.data(), buffer_.size()),
269 std::string(g_pluginModule.version));
270 }
271
272 if (endFlag.empty() || recv == endFlag) {
273 break;
274 }
275 }
276
277 if (!protoConfig_.split_outfile_name().empty()) {
278 splitTraceWriter_->Flush();
279 }
280 }
281
ClientConnectUnixWebSocket(const std::string & sockName,uint32_t timeoutLimit)282 bool ArkTSPlugin::ClientConnectUnixWebSocket(const std::string& sockName, uint32_t timeoutLimit)
283 {
284 if (socketState_ != SocketState::UNINITED) {
285 PROFILER_LOG_ERROR(LOG_CORE, "client has inited");
286 return true;
287 }
288
289 client_ = socket(AF_UNIX, SOCK_STREAM, 0);
290 if (client_ < SOCKET_SUCCESS) {
291 PROFILER_LOG_ERROR(LOG_CORE, "client socket failed, error = %d, , desc = %s", errno, strerror(errno));
292 return false;
293 }
294
295 // set send and recv timeout limit
296 if (!SetWebSocketTimeOut(client_, timeoutLimit)) {
297 PROFILER_LOG_ERROR(LOG_CORE, "client SetWebSocketTimeOut failed, error = %d, desc = %s",
298 errno, strerror(errno));
299 close(client_);
300 client_ = -1;
301 return false;
302 }
303
304 struct sockaddr_un serverAddr;
305 if (memset_s(&serverAddr, sizeof(serverAddr), 0, sizeof(serverAddr)) != EOK) {
306 PROFILER_LOG_ERROR(LOG_CORE, "client memset_s serverAddr failed, error = %d, desc = %s",
307 errno, strerror(errno));
308 close(client_);
309 client_ = -1;
310 return false;
311 }
312 serverAddr.sun_family = AF_UNIX;
313 if (strcpy_s(serverAddr.sun_path + 1, sizeof(serverAddr.sun_path) - 1, sockName.c_str()) != EOK) {
314 PROFILER_LOG_ERROR(LOG_CORE, "client strcpy_s serverAddr.sun_path failed, error = %d, , desc = %s",
315 errno, strerror(errno));
316 close(client_);
317 client_ = -1;
318 return false;
319 }
320 serverAddr.sun_path[0] = '\0';
321
322 uint32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(sockName.c_str()) + 1;
323 int ret = connect(client_, reinterpret_cast<struct sockaddr*>(&serverAddr), static_cast<int32_t>(len));
324 if (ret != SOCKET_SUCCESS) {
325 PROFILER_LOG_ERROR(LOG_CORE, "client connect failed, error, error = %d, , desc = %s", errno, strerror(errno));
326 close(client_);
327 client_ = -1;
328 return false;
329 }
330 socketState_ = SocketState::INITED;
331 PROFILER_LOG_INFO(LOG_CORE, "client connect success...");
332 return true;
333 }
334
ClientSendWSUpgradeReq()335 bool ArkTSPlugin::ClientSendWSUpgradeReq()
336 {
337 if (socketState_ == SocketState::UNINITED) {
338 PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
339 return false;
340 }
341 if (socketState_ == SocketState::CONNECTED) {
342 PROFILER_LOG_ERROR(LOG_CORE, "client has connected");
343 return true;
344 }
345
346 int msgLen = strlen(CLIENT_WEBSOCKET_UPGRADE_REQ);
347 int32_t sendLen = send(client_, CLIENT_WEBSOCKET_UPGRADE_REQ, msgLen, 0);
348 if (sendLen != msgLen) {
349 PROFILER_LOG_ERROR(LOG_CORE, "client send wsupgrade req failed, error = %d, desc = %s", errno, strerror(errno));
350 socketState_ = SocketState::UNINITED;
351 shutdown(client_, SHUT_RDWR);
352 close(client_);
353 client_ = -1;
354 return false;
355 }
356 PROFILER_LOG_INFO(LOG_CORE, "client send wsupgrade req success");
357 return true;
358 }
359
ClientRecvWSUpgradeRsp()360 bool ArkTSPlugin::ClientRecvWSUpgradeRsp()
361 {
362 if (socketState_ == SocketState::UNINITED) {
363 PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
364 return false;
365 }
366 if (socketState_ == SocketState::CONNECTED) {
367 PROFILER_LOG_ERROR(LOG_CORE, "ClientRecvWSUpgradeRsp::client has connected");
368 return true;
369 }
370
371 char recvBuf[CLIENT_WEBSOCKET_UPGRADE_RSP_LEN + 1] = {0};
372 int32_t bufLen = recv(client_, recvBuf, CLIENT_WEBSOCKET_UPGRADE_RSP_LEN, 0);
373 if (bufLen != CLIENT_WEBSOCKET_UPGRADE_RSP_LEN) {
374 PROFILER_LOG_ERROR(LOG_CORE, "client recv wsupgrade rsp failed, error = %d, desc = %s", errno, strerror(errno));
375 socketState_ = SocketState::UNINITED;
376 shutdown(client_, SHUT_RDWR);
377 close(client_);
378 client_ = -1;
379 return false;
380 }
381 socketState_ = SocketState::CONNECTED;
382 PROFILER_LOG_INFO(LOG_CORE, "client recv wsupgrade rsp success");
383 return true;
384 }
385
ClientSendReq(const std::string & message)386 bool ArkTSPlugin::ClientSendReq(const std::string& message)
387 {
388 if (socketState_ != SocketState::CONNECTED) {
389 PROFILER_LOG_ERROR(LOG_CORE, "client has not connected");
390 return false;
391 }
392
393 uint32_t msgLen = message.length();
394 std::unique_ptr<char[]> msgBuf = std::make_unique<char[]>(msgLen + 15); // 15: the maximum expand length
395 char* sendBuf = msgBuf.get();
396 uint32_t sendMsgLen = 0;
397 sendBuf[0] = 0x81; // 0x81: the text message sent by the server should start with '0x81'.
398 uint32_t mask = 1;
399 // Depending on the length of the messages, client will use shift operation to get the res
400 // and store them in the buffer.
401 if (msgLen <= 125) { // 125: situation 1 when message's length <= 125
402 sendBuf[1] = msgLen | (mask << 7); // 7: mask need shift left by 7 bits
403 sendMsgLen = 2; // 2: the length of header frame is 2;
404 } else if (msgLen < 65536) { // 65536: message's length
405 sendBuf[1] = 126 | (mask << 7); // 126: payloadLen according to the spec; 7: mask shift left by 7 bits
406 sendBuf[2] = ((msgLen >> 8) & 0xff); // 8: shift right by 8 bits => res * (256^1)
407 sendBuf[3] = (msgLen & 0xff); // 3: store len's data => res * (256^0)
408 sendMsgLen = 4; // 4: the length of header frame is 4
409 } else {
410 sendBuf[1] = 127 | (mask << 7); // 127: payloadLen according to the spec; 7: mask shift left by 7 bits
411 for (int32_t i = 2; i <= 5; i++) { // 2 ~ 5: unused bits
412 sendBuf[i] = 0;
413 }
414 sendBuf[6] = ((msgLen & 0xff000000) >> 24); // 6: shift 24 bits => res * (256^3)
415 sendBuf[7] = ((msgLen & 0x00ff0000) >> 16); // 7: shift 16 bits => res * (256^2)
416 sendBuf[8] = ((msgLen & 0x0000ff00) >> 8); // 8: shift 8 bits => res * (256^1)
417 sendBuf[9] = (msgLen & 0x000000ff); // 9: res * (256^0)
418 sendMsgLen = 10; // 10: the length of header frame is 10
419 }
420
421 if (memcpy_s(sendBuf + sendMsgLen, SOCKET_MASK_LEN, MASK_KEY, SOCKET_MASK_LEN) != EOK) {
422 PROFILER_LOG_ERROR(LOG_CORE, "client memcpy_s MASK_KEY failed, error = %d, desc = %s", errno, strerror(errno));
423 return false;
424 }
425 sendMsgLen += SOCKET_MASK_LEN;
426
427 std::string maskMessage;
428 for (uint64_t i = 0; i < msgLen; i++) {
429 uint64_t j = i % SOCKET_MASK_LEN;
430 maskMessage.push_back(message[i] ^ MASK_KEY[j]);
431 }
432 if (memcpy_s(sendBuf + sendMsgLen, msgLen, maskMessage.c_str(), msgLen) != EOK) {
433 PROFILER_LOG_ERROR(LOG_CORE, "client memcpy_s maskMessage failed, error = %d, desc = %s",
434 errno, strerror(errno));
435 return false;
436 }
437 msgBuf[sendMsgLen + msgLen] = '\0';
438
439 if (send(client_, sendBuf, sendMsgLen + msgLen, 0) != static_cast<int>(sendMsgLen + msgLen)) {
440 PROFILER_LOG_ERROR(LOG_CORE, "client send msg req failed, error = %d, desc = %s", errno, strerror(errno));
441 return false;
442 }
443 PROFILER_LOG_INFO(LOG_CORE, "ClientRecvWSUpgradeRsp::client send msg req success...");
444 return true;
445 }
446
Close()447 void ArkTSPlugin::Close()
448 {
449 if (socketState_ == SocketState::UNINITED) {
450 PROFILER_LOG_ERROR(LOG_CORE, "client has not inited");
451 return;
452 }
453 shutdown(client_, SHUT_RDWR);
454 close(client_);
455 client_ = -1;
456 socketState_ = SocketState::UNINITED;
457 }
458
SetWebSocketTimeOut(int32_t fd,uint32_t timeoutLimit)459 bool ArkTSPlugin::SetWebSocketTimeOut(int32_t fd, uint32_t timeoutLimit)
460 {
461 if (timeoutLimit > 0) {
462 struct timeval timeout = {timeoutLimit, 0};
463 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) != SOCKET_SUCCESS) {
464 return false;
465 }
466 if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != SOCKET_SUCCESS) {
467 return false;
468 }
469 }
470 return true;
471 }
472
Decode()473 std::string ArkTSPlugin::Decode()
474 {
475 if (socketState_ != SocketState::CONNECTED) {
476 PROFILER_LOG_ERROR(LOG_CORE, "client has not connected");
477 return {};
478 }
479 char recvbuf[SOCKET_HEADER_LEN + 1];
480 if (!Recv(client_, recvbuf, SOCKET_HEADER_LEN, 0)) {
481 PROFILER_LOG_ERROR(LOG_CORE, "Decode failed, client websocket disconnect");
482 socketState_ = SocketState::INITED;
483 shutdown(client_, SHUT_RDWR);
484 close(client_);
485 client_ = -1;
486 return {};
487 }
488 recvbuf[SOCKET_HEADER_LEN] = '\0';
489 WebSocketFrame wsFrame;
490 int32_t index = 0;
491 wsFrame.fin = static_cast<uint8_t>(recvbuf[index] >> 7); // 7: shift right by 7 bits to get the fin
492 wsFrame.opCode = static_cast<uint8_t>(recvbuf[index] & 0xf);
493 if (wsFrame.opCode == 0x1) { // 0x1: 0x1 means a text frame
494 index++;
495 wsFrame.mask = static_cast<uint8_t>((recvbuf[index] >> 7) & 0x1); // 7: to get the mask
496 wsFrame.payloadLen = recvbuf[index] & 0x7f;
497 HandleFrame(wsFrame);
498 return wsFrame.payload.get();
499 }
500 return std::string();
501 }
502
NetToHostLongLong(char * buf,uint32_t len)503 uint64_t ArkTSPlugin::NetToHostLongLong(char* buf, uint32_t len)
504 {
505 uint64_t result = 0;
506 for (uint32_t i = 0; i < len; i++) {
507 result |= static_cast<unsigned char>(buf[i]);
508 if ((i + 1) < len) {
509 result <<= 8; // 8: result need shift left 8 bits in order to big endian convert to int
510 }
511 }
512 return result;
513 }
514
HandleFrame(WebSocketFrame & wsFrame)515 bool ArkTSPlugin::HandleFrame(WebSocketFrame& wsFrame)
516 {
517 if (wsFrame.payloadLen == 126) { // 126: the payloadLen read from frame
518 char recvbuf[PAYLOAD_LEN + 1] = {0};
519 if (!Recv(client_, recvbuf, PAYLOAD_LEN, 0)) {
520 PROFILER_LOG_ERROR(LOG_CORE, "HandleFrame: Recv payloadLen == 126 failed");
521 return false;
522 }
523 recvbuf[PAYLOAD_LEN] = '\0';
524 uint16_t msgLen = 0;
525 if (memcpy_s(&msgLen, sizeof(recvbuf), recvbuf, sizeof(recvbuf) - 1) != EOK) {
526 return false;
527 }
528 wsFrame.payloadLen = ntohs(msgLen);
529 } else if (wsFrame.payloadLen > 126) { // 126: the payloadLen read from frame
530 char recvbuf[EXTEND_PAYLOAD_LEN + 1] = {0};
531 if (!Recv(client_, recvbuf, EXTEND_PAYLOAD_LEN, 0)) {
532 PROFILER_LOG_ERROR(LOG_CORE, "HandleFrame: Recv payloadLen > 127 failed");
533 return false;
534 }
535 recvbuf[EXTEND_PAYLOAD_LEN] = '\0';
536 wsFrame.payloadLen = NetToHostLongLong(recvbuf, EXTEND_PAYLOAD_LEN);
537 }
538 return DecodeMessage(wsFrame);
539 }
540
DecodeMessage(WebSocketFrame & wsFrame)541 bool ArkTSPlugin::DecodeMessage(WebSocketFrame& wsFrame)
542 {
543 if (wsFrame.payloadLen == 0 || wsFrame.payloadLen > UINT64_MAX) {
544 return false;
545 }
546 wsFrame.payload = std::make_unique<char[]>(wsFrame.payloadLen + 1);
547 if (wsFrame.mask == 1) {
548 CHECK_TRUE(Recv(client_, wsFrame.maskingKey, SOCKET_MASK_LEN, 0), false,
549 "DecodeMessage: Recv maskingKey failed");
550 wsFrame.maskingKey[SOCKET_MASK_LEN] = '\0';
551
552 char buf[wsFrame.payloadLen + 1];
553 CHECK_TRUE(Recv(client_, buf, wsFrame.payloadLen, 0), false, "DecodeMessage: Recv message with mask failed");
554 buf[wsFrame.payloadLen] = '\0';
555
556 for (uint64_t i = 0; i < wsFrame.payloadLen; i++) {
557 uint64_t j = i % SOCKET_MASK_LEN;
558 wsFrame.payload.get()[i] = buf[i] ^ wsFrame.maskingKey[j];
559 }
560 } else {
561 if (!Recv(client_, wsFrame.payload.get(), wsFrame.payloadLen, 0)) {
562 return false;
563 }
564 }
565 wsFrame.payload.get()[wsFrame.payloadLen] = '\0';
566 return true;
567 }
568
Recv(int32_t client,char * buf,size_t totalLen,int32_t flags) const569 bool ArkTSPlugin::Recv(int32_t client, char* buf, size_t totalLen, int32_t flags) const
570 {
571 size_t recvLen = 0;
572 while (recvLen < totalLen) {
573 ssize_t len = recv(client, buf + recvLen, totalLen - recvLen, flags);
574 CHECK_TRUE(len > 0, false, "Recv payload in while failed, websocket disconnect");
575 recvLen += static_cast<size_t>(len);
576 }
577 buf[totalLen] = '\0';
578 return true;
579 }