• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <sstream>
16 #include <fstream>
17 #include <climits>
18 #include <cstring>
19 #include <cstdio>
20 #include <algorithm>
21 #include <iostream>
22 #include <thread>
23 #include <unistd.h>
24 #include <string>
25 #include <regex>
26 #include <cstdarg>
27 #include <sys/time.h>
28 #include <sys/select.h>
29 #include <netinet/in.h>
30 #include "include/sp_utils.h"
31 #include "include/startup_delay.h"
32 #include "include/sp_log.h"
33 #include "include/sdk_data_recv.h"
34 #include "memory_collector.h"
35 #include "collect_result.h"
36 #include "include/sp_task.h"
37 #include "include/sp_utils.h"
38 #include "securec.h"
39 namespace OHOS {
40     namespace SmartPerf {
SdkDataRecv()41         SdkDataRecv::SdkDataRecv()
42         {
43             FD_ZERO(&readFds);
44         }
45 
ItemData()46         std::map<std::string, std::string> SdkDataRecv::ItemData()
47         {
48             return std::map<std::string, std::string>();
49         }
50 
CreateOhSocketServer(int basePort)51         int SdkDataRecv::CreateOhSocketServer(int basePort)
52         {
53             int i = 0;
54             int socketFd = 0;
55             struct sockaddr_in address;
56             const int reuse = 1;
57 
58             LOGD("Creating socket server on base port: %d", basePort);
59 
60             socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
61             if (socketFd < 0) {
62                 LOGE("Failed to create socket. Error: %d", errno);
63                 return -1;
64             }
65             setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
66 
67             std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0);
68             address.sin_family = AF_INET;
69             address.sin_addr.s_addr = inet_addr("127.0.0.1");
70 
71             for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) {
72                 address.sin_port = htons(basePort + i);
73                 if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) {
74                     LOGD("Socket bound successfully to port: %d", basePort + i);
75                     break;
76                 }
77             }
78 
79             if (i >= SOCKET_PORT_NUM_PER_TYPE) {
80                 LOGE("Failed to bind socket after trying all ports starting from: %d", basePort);
81                 return -1;
82             }
83 
84             if (listen(socketFd, OH_SOCKET_MAX) < 0) {
85                 LOGE("Failed to listen on socket. Error: %d", errno);
86                 close(socketFd);
87                 return -1;
88             }
89 
90             LOGD("Listening on port %d, socket fd: %d", basePort + i, socketFd);
91             return socketFd;
92         }
ProcessData(std::string message,ServerParams & params)93         std::string SdkDataRecv::ProcessData(std::string message, ServerParams &params)
94         {
95             std::stringstream ss(message);
96             std::string item;
97             std::string source;
98             std::string timestamp;
99             std::string eventName;
100             std::string enable;
101             std::string value;
102             std::string realTimestamp;
103             while (std::getline(ss, item, ',')) {
104                 std::stringstream itemSS(item);
105                 std::string first;
106                 std::string second;
107                 std::getline(itemSS, first, ':');
108                 std::getline(itemSS, second, ':');
109                 if (first == "src") {
110                     source = second;
111                 } else if (first == "para0") {
112                     eventName = second;
113                 } else if (first == "time") {
114                     realTimestamp = std::to_string(SPUtilesTye::StringToSometype<long long>(second) -
115                                     SPTask::GetInstance().GetRealStartTime());
116                     timestamp = std::to_string(SPUtilesTye::StringToSometype<long long>(second) - params.startTime);
117                 } else if (first == "enable") {
118                     enable = second;
119                 } else if (first == "value") {
120                     value = second;
121                 }
122             }
123             item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n";
124             sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";";
125             return item;
126         }
127 
OhDataReceive(int index,ServerParams & params)128         std::string SdkDataRecv::OhDataReceive(int index, ServerParams &params)
129         {
130             char receiveBuf[MSG_MAX_LEN];
131             std::string resStr;
132             int readLen = 0;
133             if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) {
134                 close(params.receiveFd[index]);
135                 params.receiveFd[index] = -1;
136                 LOGE("Failed to read data from socket fd[%d]. Read length: %d, Error: %d", index, readLen, errno);
137                 return "";
138             }
139             if (readLen < MSG_MAX_LEN) {
140                 receiveBuf[readLen] = '\0';
141             } else {
142                 receiveBuf[MSG_MAX_LEN - 1] = '\0';
143             }
144             receiveBuffer = receiveBuf;
145             SocketCommandVerification(resStr, params);
146 
147             if (!resStr.empty() && resStr.back() == '\n') {
148                 resStr.pop_back();
149             }
150             return resStr;
151         }
152 
SetRunningState(bool state)153         void SdkDataRecv::SetRunningState(bool state)
154         {
155             collectRunring = state;
156         }
157 
ServerThread(std::vector<std::string> & dataVec)158         void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec)
159         {
160             LOGD("Starting SdkDataRecv server thread");
161             for (int i = 0; i < OH_SOCKET_MAX; i++) {
162                 sdkParams.receiveFd[i] = -1;
163             }
164             sdkParams.startTime = SPUtils::GetCurTime();
165             sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT);
166             if (sdkParams.serverFd < 0) {
167                 LOGE("Failed to create sdk data server, exiting...");
168                 return;
169             }
170 
171             if (pipe(sdkParams.pipFd) == -1) {
172                 LOGE("Failed to create sdk data pipe.");
173                 close(sdkParams.serverFd);
174                 return;
175             }
176             listenFd = sdkParams.pipFd[1];
177             LOGD("Sdk data server listening on pipe fd: %d", listenFd);
178             RunServerThread(dataVec, sdkParams);
179             LOGD("Sdk Data server thread exit.");
180         }
181 
RunServerThread(std::vector<std::string> & dataVec,ServerParams & params)182         void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams &params)
183         {
184             while (collectRunring) {
185                 SetUpFdSet(params);
186                 if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) {
187                     continue;
188                 }
189                 for (int i = 0; i < OH_SOCKET_MAX; i++) {
190                     HandleReceiveFd(dataVec, i, params);
191                 }
192                 HandleServerFd(params);
193             }
194             CleanUpResources(params);
195         }
196 
SetUpFdSet(ServerParams & params)197         void SdkDataRecv::SetUpFdSet(ServerParams &params)
198         {
199             FD_ZERO(&readFds);
200             FD_SET(params.serverFd, &readFds);
201             FD_SET(params.pipFd[0], &readFds);
202 
203             maxFd = std::max(params.serverFd, params.pipFd[0]);
204             for (int i = 0; i < OH_SOCKET_MAX; i++) {
205                 if (params.receiveFd[i] >= 0) {
206                     FD_SET(params.receiveFd[i], &readFds);
207                     maxFd = std::max(maxFd, params.receiveFd[i]);
208                     LOGD("Sdk data adding receiveFd[%d]: %d to FD set", i, params.receiveFd[i]);
209                 }
210             }
211         }
212 
HandleReceiveFd(std::vector<std::string> & dataVec,int i,ServerParams & params)213         void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams &params)
214         {
215             if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) {
216                 std::string data = OhDataReceive(i, params);
217                 if (SPTask::GetInstance().GetRecordState()) {
218                     dataVec.push_back(data);
219                 }
220             }
221         }
222 
HandleServerFd(ServerParams & params)223         void SdkDataRecv::HandleServerFd(ServerParams &params)
224         {
225             if (!FD_ISSET(params.serverFd, &readFds)) {
226                 return;
227             }
228 
229             int fd = accept(params.serverFd, nullptr, nullptr);
230             if (fd < 0) {
231                 return;
232             }
233 
234             for (int i = 0; i < OH_SOCKET_MAX; i++) {
235                 if (params.receiveFd[i] < 0) {
236                     params.receiveFd[i] = fd;
237                     if (fd > maxFd) {
238                         maxFd = fd;
239                     }
240                     break;
241                 }
242             }
243         }
244 
CleanUpResources(ServerParams & params)245         void SdkDataRecv::CleanUpResources(ServerParams &params)
246         {
247             if (params.serverFd != -1) {
248                 LOGD("Closing sdk data server socket fd: %d", params.serverFd);
249                 close(params.serverFd);
250                 params.serverFd = -1;
251             }
252             if (params.pipFd[0] != -1) {
253                 close(params.pipFd[0]);
254                 params.pipFd[0] = -1;
255             }
256             for (int i = 0; i < OH_SOCKET_MAX; i++) {
257                 if (params.receiveFd[i] != -1) {
258                     close(params.receiveFd[i]);
259                     params.receiveFd[i] = -1;
260                 }
261             }
262         }
263 
GetListenFd()264         int SdkDataRecv::GetListenFd()
265         {
266             return listenFd;
267         }
SetListenFd(int fd)268         void SdkDataRecv::SetListenFd(int fd)
269         {
270             listenFd = fd;
271         }
272 
GetSdkDataRealtimeData(std::map<std::string,std::string> & dataMap)273         void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap)
274         {
275             if (sdkDataRealtimeData.size() > 0) {
276                 std::map<std::string, std::string> sdkDataRealtimeDataMap;
277                 sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData;
278                 realtimeDataLock.lock();
279                 dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end());
280                 realtimeDataLock.unlock();
281                 sdkDataRealtimeData.clear();
282             }
283         }
284 
SetStartRecordTime()285         void SdkDataRecv::SetStartRecordTime()
286         {
287             sdkParams.startTime = SPUtils::GetCurTime();
288         }
SocketCommandVerification(std::string & resStr,ServerParams & params)289         void SdkDataRecv::SocketCommandVerification(std::string &resStr, ServerParams &params)
290         {
291             bool processFlag = true;
292             while (processFlag) {
293                 size_t start = receiveBuffer.find('{');
294                 if (start == std::string::npos) {
295                     processFlag = false;
296                     break;
297                 }
298 
299                 size_t end = receiveBuffer.find('}', start);
300                 if (end == std::string::npos) {
301                     processFlag = false;
302                     break;
303                 }
304 
305                 std::size_t startPosition = start + 1;
306                 std::size_t length = end > start ? end - start - 1 : 0;
307                 if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) {
308                     processFlag = false;
309                     break;
310                 }
311 
312                 std::string message = receiveBuffer.substr(startPosition, length);
313                 resStr += ProcessData(message, params);
314 
315                 receiveBuffer.erase(0, end + 1);
316                 const int bufferSizeCheck = 2;
317                 if (receiveBuffer.size() <= bufferSizeCheck) {
318                     processFlag = false;
319             }
320         }
321     }
322     }
323 }