1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #include <sstream> 16 #include <fstream> 17 #include <climits> 18 #include <cstring> 19 #include <cstdio> 20 #include <algorithm> 21 #include <iostream> 22 #include <thread> 23 #include <unistd.h> 24 #include <string> 25 #include <regex> 26 #include <cstdarg> 27 #include <sys/time.h> 28 #include <sys/select.h> 29 #include <netinet/in.h> 30 #include "include/sp_utils.h" 31 #include "include/startup_delay.h" 32 #include "include/sp_log.h" 33 #include "include/sdk_data_recv.h" 34 #include "memory_collector.h" 35 #include "collect_result.h" 36 #include "include/sp_task.h" 37 #include "include/sp_utils.h" 38 #include "securec.h" 39 namespace OHOS { 40 namespace SmartPerf { SdkDataRecv()41 SdkDataRecv::SdkDataRecv() 42 { 43 FD_ZERO(&readFds); 44 } 45 ItemData()46 std::map<std::string, std::string> SdkDataRecv::ItemData() 47 { 48 return std::map<std::string, std::string>(); 49 } 50 CreateOhSocketServer(int basePort)51 int SdkDataRecv::CreateOhSocketServer(int basePort) 52 { 53 int i = 0; 54 int socketFd = 0; 55 struct sockaddr_in address; 56 const int reuse = 1; 57 58 LOGD("Creating socket server on base port: %d", basePort); 59 60 socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); 61 if (socketFd < 0) { 62 LOGE("Failed to create socket. Error: %d", errno); 63 return -1; 64 } 65 setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); 66 67 std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0); 68 address.sin_family = AF_INET; 69 address.sin_addr.s_addr = inet_addr("127.0.0.1"); 70 71 for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) { 72 address.sin_port = htons(basePort + i); 73 if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) { 74 LOGD("Socket bound successfully to port: %d", basePort + i); 75 break; 76 } 77 } 78 79 if (i >= SOCKET_PORT_NUM_PER_TYPE) { 80 LOGE("Failed to bind socket after trying all ports starting from: %d", basePort); 81 return -1; 82 } 83 84 if (listen(socketFd, OH_SOCKET_MAX) < 0) { 85 LOGE("Failed to listen on socket. Error: %d", errno); 86 close(socketFd); 87 return -1; 88 } 89 90 LOGD("Listening on port %d, socket fd: %d", basePort + i, socketFd); 91 return socketFd; 92 } ProcessData(std::string message,ServerParams & params)93 std::string SdkDataRecv::ProcessData(std::string message, ServerParams ¶ms) 94 { 95 std::stringstream ss(message); 96 std::string item; 97 std::string source; 98 std::string timestamp; 99 std::string eventName; 100 std::string enable; 101 std::string value; 102 std::string realTimestamp; 103 while (std::getline(ss, item, ',')) { 104 std::stringstream itemSS(item); 105 std::string first; 106 std::string second; 107 std::getline(itemSS, first, ':'); 108 std::getline(itemSS, second, ':'); 109 if (first == "src") { 110 source = second; 111 } else if (first == "para0") { 112 eventName = second; 113 } else if (first == "time") { 114 realTimestamp = std::to_string(SPUtilesTye::StringToSometype<long long>(second) - 115 SPTask::GetInstance().GetRealStartTime()); 116 timestamp = std::to_string(SPUtilesTye::StringToSometype<long long>(second) - params.startTime); 117 } else if (first == "enable") { 118 enable = second; 119 } else if (first == "value") { 120 value = second; 121 } 122 } 123 item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n"; 124 sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";"; 125 return item; 126 } 127 OhDataReceive(int index,ServerParams & params)128 std::string SdkDataRecv::OhDataReceive(int index, ServerParams ¶ms) 129 { 130 char receiveBuf[MSG_MAX_LEN]; 131 std::string resStr; 132 int readLen = 0; 133 if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) { 134 close(params.receiveFd[index]); 135 params.receiveFd[index] = -1; 136 LOGE("Failed to read data from socket fd[%d]. Read length: %d, Error: %d", index, readLen, errno); 137 return ""; 138 } 139 if (readLen < MSG_MAX_LEN) { 140 receiveBuf[readLen] = '\0'; 141 } else { 142 receiveBuf[MSG_MAX_LEN - 1] = '\0'; 143 } 144 receiveBuffer = receiveBuf; 145 SocketCommandVerification(resStr, params); 146 147 if (!resStr.empty() && resStr.back() == '\n') { 148 resStr.pop_back(); 149 } 150 return resStr; 151 } 152 SetRunningState(bool state)153 void SdkDataRecv::SetRunningState(bool state) 154 { 155 collectRunring = state; 156 } 157 ServerThread(std::vector<std::string> & dataVec)158 void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec) 159 { 160 LOGD("Starting SdkDataRecv server thread"); 161 for (int i = 0; i < OH_SOCKET_MAX; i++) { 162 sdkParams.receiveFd[i] = -1; 163 } 164 sdkParams.startTime = SPUtils::GetCurTime(); 165 sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT); 166 if (sdkParams.serverFd < 0) { 167 LOGE("Failed to create sdk data server, exiting..."); 168 return; 169 } 170 171 if (pipe(sdkParams.pipFd) == -1) { 172 LOGE("Failed to create sdk data pipe."); 173 close(sdkParams.serverFd); 174 return; 175 } 176 listenFd = sdkParams.pipFd[1]; 177 LOGD("Sdk data server listening on pipe fd: %d", listenFd); 178 RunServerThread(dataVec, sdkParams); 179 LOGD("Sdk Data server thread exit."); 180 } 181 RunServerThread(std::vector<std::string> & dataVec,ServerParams & params)182 void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams ¶ms) 183 { 184 while (collectRunring) { 185 SetUpFdSet(params); 186 if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) { 187 continue; 188 } 189 for (int i = 0; i < OH_SOCKET_MAX; i++) { 190 HandleReceiveFd(dataVec, i, params); 191 } 192 HandleServerFd(params); 193 } 194 CleanUpResources(params); 195 } 196 SetUpFdSet(ServerParams & params)197 void SdkDataRecv::SetUpFdSet(ServerParams ¶ms) 198 { 199 FD_ZERO(&readFds); 200 FD_SET(params.serverFd, &readFds); 201 FD_SET(params.pipFd[0], &readFds); 202 203 maxFd = std::max(params.serverFd, params.pipFd[0]); 204 for (int i = 0; i < OH_SOCKET_MAX; i++) { 205 if (params.receiveFd[i] >= 0) { 206 FD_SET(params.receiveFd[i], &readFds); 207 maxFd = std::max(maxFd, params.receiveFd[i]); 208 LOGD("Sdk data adding receiveFd[%d]: %d to FD set", i, params.receiveFd[i]); 209 } 210 } 211 } 212 HandleReceiveFd(std::vector<std::string> & dataVec,int i,ServerParams & params)213 void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams ¶ms) 214 { 215 if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) { 216 std::string data = OhDataReceive(i, params); 217 if (SPTask::GetInstance().GetRecordState()) { 218 dataVec.push_back(data); 219 } 220 } 221 } 222 HandleServerFd(ServerParams & params)223 void SdkDataRecv::HandleServerFd(ServerParams ¶ms) 224 { 225 if (!FD_ISSET(params.serverFd, &readFds)) { 226 return; 227 } 228 229 int fd = accept(params.serverFd, nullptr, nullptr); 230 if (fd < 0) { 231 return; 232 } 233 234 for (int i = 0; i < OH_SOCKET_MAX; i++) { 235 if (params.receiveFd[i] < 0) { 236 params.receiveFd[i] = fd; 237 if (fd > maxFd) { 238 maxFd = fd; 239 } 240 break; 241 } 242 } 243 } 244 CleanUpResources(ServerParams & params)245 void SdkDataRecv::CleanUpResources(ServerParams ¶ms) 246 { 247 if (params.serverFd != -1) { 248 LOGD("Closing sdk data server socket fd: %d", params.serverFd); 249 close(params.serverFd); 250 params.serverFd = -1; 251 } 252 if (params.pipFd[0] != -1) { 253 close(params.pipFd[0]); 254 params.pipFd[0] = -1; 255 } 256 for (int i = 0; i < OH_SOCKET_MAX; i++) { 257 if (params.receiveFd[i] != -1) { 258 close(params.receiveFd[i]); 259 params.receiveFd[i] = -1; 260 } 261 } 262 } 263 GetListenFd()264 int SdkDataRecv::GetListenFd() 265 { 266 return listenFd; 267 } SetListenFd(int fd)268 void SdkDataRecv::SetListenFd(int fd) 269 { 270 listenFd = fd; 271 } 272 GetSdkDataRealtimeData(std::map<std::string,std::string> & dataMap)273 void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap) 274 { 275 if (sdkDataRealtimeData.size() > 0) { 276 std::map<std::string, std::string> sdkDataRealtimeDataMap; 277 sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData; 278 realtimeDataLock.lock(); 279 dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end()); 280 realtimeDataLock.unlock(); 281 sdkDataRealtimeData.clear(); 282 } 283 } 284 SetStartRecordTime()285 void SdkDataRecv::SetStartRecordTime() 286 { 287 sdkParams.startTime = SPUtils::GetCurTime(); 288 } SocketCommandVerification(std::string & resStr,ServerParams & params)289 void SdkDataRecv::SocketCommandVerification(std::string &resStr, ServerParams ¶ms) 290 { 291 bool processFlag = true; 292 while (processFlag) { 293 size_t start = receiveBuffer.find('{'); 294 if (start == std::string::npos) { 295 processFlag = false; 296 break; 297 } 298 299 size_t end = receiveBuffer.find('}', start); 300 if (end == std::string::npos) { 301 processFlag = false; 302 break; 303 } 304 305 std::size_t startPosition = start + 1; 306 std::size_t length = end > start ? end - start - 1 : 0; 307 if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) { 308 processFlag = false; 309 break; 310 } 311 312 std::string message = receiveBuffer.substr(startPosition, length); 313 resStr += ProcessData(message, params); 314 315 receiveBuffer.erase(0, end + 1); 316 const int bufferSizeCheck = 2; 317 if (receiveBuffer.size() <= bufferSizeCheck) { 318 processFlag = false; 319 } 320 } 321 } 322 } 323 }