1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <thread>
21 #ifdef _WIN32
22 #include <WinSock2.h>
23 #else
24 #include <poll.h>
25 #include <sys/socket.h>
26 #endif
27 #include "log.h"
28 #include "string_to_numerical.h"
29 namespace SysTuning {
30 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)31 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
32 {
33 rpcFunctions_.clear();
34
35 using std::placeholders::_1;
36 using std::placeholders::_2;
37 using std::placeholders::_3;
38
39 auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
40 rpcFunctions_["/parsedata"] = parsedata;
41
42 auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
43 rpcFunctions_["/parsedataover"] = parsedataover;
44
45 auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
46 rpcFunctions_["/sqlquery"] = sqlquery;
47
48 auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
49 rpcFunctions_["/sqloperate"] = sqloperate;
50
51 auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
52 rpcFunctions_["/reset"] = reset;
53 }
54
55 #ifdef _WIN32
Run(int32_t port)56 void HttpServer::Run(int32_t port)
57 {
58 WSADATA ws{};
59 if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
60 return;
61 }
62 if (!CreateSocket(port)) {
63 return;
64 }
65 WSAEVENT events[COUNT_SOCKET];
66 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
67 if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
68 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
69 return;
70 }
71 WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
72 }
73
74 while (!isExit_) {
75 ClearDeadClientThread();
76
77 int32_t index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
78 if (index == WSA_WAIT_FAILED) {
79 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
80 break;
81 } else if (index == WSA_WAIT_TIMEOUT) {
82 continue;
83 }
84
85 index = index - WSA_WAIT_EVENT_0;
86 WSANETWORKEVENTS event;
87 WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
88 if (event.lNetworkEvents & FD_ACCEPT) {
89 if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
90 continue;
91 }
92
93 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
94 if (sockets_[index].Accept(client->sock_)) {
95 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
96 clientThreads_.push_back(std::move(client));
97 } else {
98 TS_LOGE("http socket accept error");
99 std::this_thread::sleep_for(std::chrono::seconds(1));
100 }
101 }
102 }
103
104 for (const auto& it : clientThreads_) {
105 if (it->thread_.joinable()) {
106 it->sock_.Close();
107 it->thread_.join();
108 }
109 }
110 clientThreads_.clear();
111
112 WSACleanup();
113 }
114 #else
Run(int32_t port)115 void HttpServer::Run(int32_t port)
116 {
117 if (SIG_ERR == signal(SIGPIPE, SIG_IGN)) {
118 return;
119 }
120
121 if (!CreateSocket(port)) {
122 return;
123 }
124 TS_LOGI("http server running");
125 struct pollfd fds[COUNT_SOCKET];
126 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
127 fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
128 }
129 while (!isExit_) {
130 ClearDeadClientThread();
131 if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
132 continue; // try again
133 }
134
135 for (int32_t i = 0; i < 1; i++) {
136 if (fds[i].revents != POLLIN) {
137 continue;
138 }
139 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
140 if (sockets_[i].Accept(client->sock_)) {
141 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
142 clientThreads_.push_back(std::move(client));
143 } else {
144 std::this_thread::sleep_for(std::chrono::seconds(1));
145 }
146 }
147 }
148
149 for (const auto& it : clientThreads_) {
150 if (it->thread_.joinable()) {
151 it->sock_.Close();
152 it->thread_.join();
153 }
154 }
155 clientThreads_.clear();
156
157 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
158 sockets_[i].Close();
159 }
160 TS_LOGI("http server exit");
161 }
162 #endif
163
Exit()164 void HttpServer::Exit()
165 {
166 isExit_ = true;
167 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
168 sockets_[i].Close();
169 }
170 }
171
CreateSocket(int32_t port)172 bool HttpServer::CreateSocket(int32_t port)
173 {
174 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
175 if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
176 TS_LOGE("Create http socket error");
177 return false;
178 }
179 if (!sockets_[i].Bind(port)) {
180 TS_LOGE("bind http socket error");
181 return false;
182 }
183 if (!sockets_[i].Listen(SOMAXCONN)) {
184 TS_LOGE("listen http socket error");
185 return false;
186 }
187 }
188
189 return true;
190 }
191
ClearDeadClientThread()192 void HttpServer::ClearDeadClientThread()
193 {
194 for (const auto it = clientThreads_.begin(); it != clientThreads_.end();) {
195 if (it->get()->sock_.GetFd() != -1) {
196 it++;
197 continue;
198 }
199 if (it->get()->thread_.joinable()) {
200 it->get()->thread_.join();
201 }
202 it = clientThreads_.erase(it);
203 }
204 }
205
206 #ifdef _WIN32
ProcessClient(HttpSocket & client)207 void HttpServer::ProcessClient(HttpSocket& client)
208 {
209 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
210 size_t recvLen = recvBuf.size();
211 size_t recvPos = 0;
212 RequestST reqST;
213 WSAEVENT recvEvent = WSACreateEvent();
214 if (recvEvent == WSA_INVALID_EVENT) {
215 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
216 return;
217 }
218 WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
219 while (!isExit_) {
220 int32_t index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
221 if (index == WSA_WAIT_FAILED) {
222 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
223 break;
224 } else if (index == WSA_WAIT_TIMEOUT) {
225 if (reqST.stat != RequstParseStat::INIT) {
226 reqST.stat = RequstParseStat::INIT;
227 recvPos = 0;
228 recvLen = recvBuf.size();
229 }
230 continue;
231 }
232
233 WSANETWORKEVENTS event;
234 WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
235 if (event.lNetworkEvents & FD_READ) {
236 if (event.iErrorCode[FD_READ_BIT] != 0) {
237 continue;
238 }
239 if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
240 break;
241 }
242 recvPos += recvLen;
243 ParseRequest(recvBuf.data(), recvPos, reqST);
244 recvLen = recvBuf.size() - recvPos;
245 if (reqST.stat == RequstParseStat::RECVING) {
246 continue;
247 }
248 reqST.stat = RequstParseStat::INIT;
249 } else if (event.lNetworkEvents & FD_CLOSE) {
250 TS_LOGI("client close socket(%d)", client.GetFd());
251 break;
252 }
253 }
254 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
255
256 client.Close();
257 }
258 #else
ProcessClient(HttpSocket & client)259 void HttpServer::ProcessClient(HttpSocket& client)
260 {
261 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
262 size_t recvLen = recvBuf.size();
263 size_t recvPos = 0;
264 RequestST reqST;
265
266 struct pollfd fd = {client.GetFd(), POLLIN, 0};
267 while (!isExit_) {
268 int32_t pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
269 if (pollRet < 0) {
270 TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
271 break;
272 }
273 if (pollRet == 0) {
274 if (reqST.stat != RequstParseStat::INIT) {
275 reqST.stat = RequstParseStat::INIT;
276 recvPos = 0;
277 recvLen = recvBuf.size();
278 }
279 continue;
280 }
281 if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
282 TS_LOGI("client exit");
283 break;
284 }
285 recvPos += recvLen;
286 ParseRequest(recvBuf.data(), recvPos, reqST);
287 recvLen = recvBuf.size() - recvPos;
288 if (reqST.stat == RequstParseStat::RECVING) {
289 continue;
290 }
291 reqST.stat = RequstParseStat::INIT;
292 }
293 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
294
295 client.Close();
296 TS_LOGI("thread exit");
297 }
298 #endif
299
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)300 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
301 {
302 std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
303 size_t bodyPos = reqStr.find("\r\n\r\n");
304 if (bodyPos == 0) {
305 len = 0;
306 httpReq.stat = RequstParseStat::BAD;
307 return;
308 } else if (bodyPos == std::string_view::npos) {
309 httpReq.stat = RequstParseStat::RECVING;
310 return;
311 }
312 std::string_view header = reqStr.substr(0, bodyPos);
313 bodyPos += strlen("\r\n\r\n");
314 httpReq.bodyLen = reqStr.size() - bodyPos;
315
316 std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
317 // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
318 std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
319 const size_t indexHttpMethod = 0;
320 const size_t indexHttpUri = 1;
321 const size_t indexHttpVersion = 2;
322 const size_t countRequestItems = 3;
323 if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
324 len = 0;
325 httpReq.stat = RequstParseStat::BAD;
326 return;
327 }
328 httpReq.method = requestItems[indexHttpMethod];
329 httpReq.uri = requestItems[indexHttpUri];
330
331 for (size_t i = 1; i < headerlines.size(); i++) {
332 size_t tagPos = headerlines[i].find(":");
333 if (tagPos == std::string_view::npos) {
334 len = 0;
335 httpReq.stat = RequstParseStat::BAD;
336 return;
337 }
338 std::string_view tag = headerlines[i].substr(0, tagPos);
339 if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
340 std::string value(headerlines[i].data() + tagPos + strlen(":"),
341 headerlines[i].size() - tagPos - strlen(":"));
342 size_t conterntLen = atoi(value.c_str());
343 if (conterntLen > httpReq.bodyLen) {
344 httpReq.stat = RequstParseStat::RECVING;
345 return;
346 } else if (conterntLen < httpReq.bodyLen) {
347 httpReq.bodyLen = conterntLen;
348 }
349 }
350 }
351
352 if (httpReq.bodyLen > 0) {
353 httpReq.body = (requst + bodyPos);
354 }
355 httpReq.stat = RequstParseStat::OK;
356 len -= (bodyPos + httpReq.bodyLen);
357 return;
358 }
359
StringSplit(std::string_view source,std::string_view split)360 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
361 {
362 std::vector<std::string_view> result;
363 if (!split.empty()) {
364 size_t pos = 0;
365 while ((pos = source.find(split)) != std::string_view::npos) {
366 // split
367 std::string_view token = source.substr(0, pos);
368 if (!token.empty()) {
369 result.push_back(token);
370 }
371 source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
372 }
373 }
374 // add last token
375 if (!source.empty()) {
376 result.push_back(source);
377 }
378 return result;
379 }
380 } // namespace TraceStreamer
381 } // namespace SysTuning
382