1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #include <sstream> 16 #include <fstream> 17 #include <climits> 18 #include <cstring> 19 #include <cstdio> 20 #include <algorithm> 21 #include <iostream> 22 #include <thread> 23 #include <unistd.h> 24 #include <string> 25 #include <regex> 26 #include <cstdarg> 27 #include <sys/time.h> 28 #include <sys/select.h> 29 #include <netinet/in.h> 30 #include "include/sp_utils.h" 31 #include "include/startup_delay.h" 32 #include "include/sp_log.h" 33 #include "include/sdk_data_recv.h" 34 #include "memory_collector.h" 35 #include "collect_result.h" 36 #include "include/sp_task.h" 37 #include "include/sp_utils.h" 38 #include "securec.h" 39 namespace OHOS { 40 namespace SmartPerf { SdkDataRecv()41 SdkDataRecv::SdkDataRecv() 42 { 43 FD_ZERO(&readFds); 44 } 45 ItemData()46 std::map<std::string, std::string> SdkDataRecv::ItemData() 47 { 48 return std::map<std::string, std::string>(); 49 } 50 CreateOhSocketServer(int basePort)51 int SdkDataRecv::CreateOhSocketServer(int basePort) 52 { 53 int i = 0; 54 int socketFd = 0; 55 struct sockaddr_in address; 56 const int reuse = 1; 57 58 socketFd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); 59 if (socketFd < 0) { 60 LOGE("Create socket error."); 61 return -1; 62 } 63 setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); 64 65 std::fill_n(reinterpret_cast<char*>(&address), sizeof(address), 0); 66 address.sin_family = AF_INET; 67 address.sin_addr.s_addr = inet_addr("127.0.0.1"); 68 69 for (i = 0; i < SOCKET_PORT_NUM_PER_TYPE; i++) { 70 address.sin_port = htons(basePort + i); 71 if (::bind(socketFd, reinterpret_cast<struct sockaddr *>(&address), sizeof(address)) == 0) { 72 break; 73 } 74 } 75 76 if (i >= SOCKET_PORT_NUM_PER_TYPE) { 77 LOGE("Bind socket error."); 78 return -1; 79 } 80 81 if (listen(socketFd, OH_SOCKET_MAX) < 0) { 82 LOGE("Listen socket error."); 83 close(socketFd); 84 return -1; 85 } 86 87 LOGD("Listen port %d success, fd is %d", socketFd); 88 return socketFd; 89 } ProcessData(std::string message,ServerParams & params)90 std::string SdkDataRecv::ProcessData(std::string message, ServerParams ¶ms) 91 { 92 std::stringstream ss(message); 93 std::string item; 94 std::string source; 95 std::string timestamp; 96 std::string eventName; 97 std::string enable; 98 std::string value; 99 std::string realTimestamp; 100 while (std::getline(ss, item, ',')) { 101 std::stringstream itemSS(item); 102 std::string first; 103 std::string second; 104 std::getline(itemSS, first, ':'); 105 std::getline(itemSS, second, ':'); 106 if (first == "src") { 107 source = second; 108 } else if (first == "para0") { 109 eventName = second; 110 } else if (first == "time") { 111 realTimestamp = std::to_string(std::stoll(second) - SPTask::GetInstance().GetRealStartTime()); 112 timestamp = std::to_string(std::stoll(second) - params.startTime); 113 } else if (first == "enable") { 114 enable = second; 115 } else if (first == "value") { 116 value = second; 117 } 118 } 119 item = source + "," + timestamp + "," + eventName + "," + enable + "," + value + "\r\n"; 120 sdkDataRealtimeData += source + "_" + realTimestamp + "_" + eventName + "_" + enable + "_" + value + ";"; 121 return item; 122 } 123 OhDataReceive(int index,ServerParams & params)124 std::string SdkDataRecv::OhDataReceive(int index, ServerParams ¶ms) 125 { 126 char receiveBuf[MSG_MAX_LEN]; 127 std::string resStr; 128 int readLen = 0; 129 if ((readLen = read(params.receiveFd[index], receiveBuf, MSG_MAX_LEN)) <= 0) { 130 close(params.receiveFd[index]); 131 params.receiveFd[index] = -1; 132 return ""; 133 } 134 LOGI("Fd %d,%d, receove %s", index, params.receiveFd[index], receiveBuf); 135 receiveBuffer = receiveBuf; 136 137 bool processFlag = true; 138 while (processFlag) { 139 size_t start = receiveBuffer.find('{'); 140 if (start == std::string::npos) { 141 processFlag = false; 142 break; 143 } 144 145 size_t end = receiveBuffer.find('}', start); 146 if (end == std::string::npos) { 147 processFlag = false; 148 break; 149 } 150 151 std::size_t startPosition = start + 1; 152 std::size_t length = end > start ? end - start - 1 : 0; 153 if (startPosition >= receiveBuffer.size() || length > receiveBuffer.size() - startPosition) { 154 processFlag = false; 155 break; 156 } 157 158 std::string message = receiveBuffer.substr(startPosition, length); 159 resStr += ProcessData(message, params); 160 161 receiveBuffer.erase(0, end + 1); 162 const int bufferSizeCheck = 2; 163 if (receiveBuffer.size() <= bufferSizeCheck) { 164 processFlag = false; 165 } 166 } 167 168 if (!resStr.empty() && resStr.back() == '\n') { 169 resStr.pop_back(); 170 } 171 return resStr; 172 } 173 SetRunningState(bool state)174 void SdkDataRecv::SetRunningState(bool state) 175 { 176 collectRunring = state; 177 } 178 ServerThread(std::vector<std::string> & dataVec)179 void SdkDataRecv::ServerThread(std::vector<std::string> &dataVec) 180 { 181 for (int i = 0; i < OH_SOCKET_MAX; i++) { 182 sdkParams.receiveFd[i] = -1; 183 } 184 sdkParams.startTime = SPUtils::GetCurTime(); 185 sdkParams.serverFd = CreateOhSocketServer(OH_DATA_PORT); 186 if (sdkParams.serverFd < 0) { 187 LOGE("Create server failed."); 188 return; 189 } 190 if (pipe(sdkParams.pipFd) == -1) { 191 LOGE("Create service failed."); 192 close(sdkParams.serverFd); 193 return; 194 } 195 listenFd = sdkParams.pipFd[1]; 196 RunServerThread(dataVec, sdkParams); 197 } 198 RunServerThread(std::vector<std::string> & dataVec,ServerParams & params)199 void SdkDataRecv::RunServerThread(std::vector<std::string> &dataVec, ServerParams ¶ms) 200 { 201 while (collectRunring) { 202 SetUpFdSet(params); 203 if (select(maxFd + 1, &readFds, nullptr, nullptr, nullptr) <= 0) { 204 continue; 205 } 206 for (int i = 0; i < OH_SOCKET_MAX; i++) { 207 HandleReceiveFd(dataVec, i, params); 208 } 209 HandleServerFd(params); 210 } 211 CleanUpResources(params); 212 } 213 SetUpFdSet(ServerParams & params)214 void SdkDataRecv::SetUpFdSet(ServerParams ¶ms) 215 { 216 FD_ZERO(&readFds); 217 FD_SET(params.serverFd, &readFds); 218 FD_SET(params.pipFd[0], &readFds); 219 220 maxFd = std::max(params.serverFd, params.pipFd[0]); 221 for (int i = 0; i < OH_SOCKET_MAX; i++) { 222 if (params.receiveFd[i] >= 0) { 223 FD_SET(params.receiveFd[i], &readFds); 224 maxFd = std::max(maxFd, params.receiveFd[i]); 225 } 226 } 227 } 228 HandleReceiveFd(std::vector<std::string> & dataVec,int i,ServerParams & params)229 void SdkDataRecv::HandleReceiveFd(std::vector<std::string> &dataVec, int i, ServerParams ¶ms) 230 { 231 if (params.receiveFd[i] >= 0 && FD_ISSET(params.receiveFd[i], &readFds)) { 232 std::string data = OhDataReceive(i, params); 233 if (SPTask::GetInstance().GetRecordState()) { 234 dataVec.push_back(data); 235 } 236 } 237 } 238 HandleServerFd(ServerParams & params)239 void SdkDataRecv::HandleServerFd(ServerParams ¶ms) 240 { 241 if (!FD_ISSET(params.serverFd, &readFds)) { 242 return; 243 } 244 245 int fd = accept(params.serverFd, nullptr, nullptr); 246 if (fd < 0) { 247 return; 248 } 249 250 for (int i = 0; i < OH_SOCKET_MAX; i++) { 251 if (params.receiveFd[i] < 0) { 252 params.receiveFd[i] = fd; 253 if (fd > maxFd) { 254 maxFd = fd; 255 } 256 break; 257 } 258 } 259 } 260 CleanUpResources(ServerParams & params)261 void SdkDataRecv::CleanUpResources(ServerParams ¶ms) 262 { 263 if (params.serverFd != -1) { 264 close(params.serverFd); 265 params.serverFd = -1; 266 } 267 if (params.pipFd[0] != -1) { 268 close(params.pipFd[0]); 269 params.pipFd[0] = -1; 270 } 271 for (int i = 0; i < OH_SOCKET_MAX; i++) { 272 if (params.receiveFd[i] != -1) { 273 close(params.receiveFd[i]); 274 params.receiveFd[i] = -1; 275 } 276 } 277 } 278 GetListenFd()279 int SdkDataRecv::GetListenFd() 280 { 281 return listenFd; 282 } SetListenFd(int fd)283 void SdkDataRecv::SetListenFd(int fd) 284 { 285 listenFd = fd; 286 } 287 GetSdkDataRealtimeData(std::map<std::string,std::string> & dataMap)288 void SdkDataRecv::GetSdkDataRealtimeData(std::map<std::string, std::string> &dataMap) 289 { 290 if (sdkDataRealtimeData.size() > 0) { 291 std::map<std::string, std::string> sdkDataRealtimeDataMap; 292 sdkDataRealtimeDataMap["sdkData"] = sdkDataRealtimeData; 293 realtimeDataLock.lock(); 294 dataMap.insert(sdkDataRealtimeDataMap.begin(), sdkDataRealtimeDataMap.end()); 295 realtimeDataLock.unlock(); 296 sdkDataRealtimeData.clear(); 297 } 298 } 299 SetStartRecordTime()300 void SdkDataRecv::SetStartRecordTime() 301 { 302 sdkParams.startTime = SPUtils::GetCurTime(); 303 } 304 } 305 }