1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <strings.h> // for macx
21 #include <thread>
22 #ifdef _WIN32
23 #include <WinSock2.h>
24 #else
25 #include <poll.h>
26 #include <sys/socket.h>
27 #endif
28 #include "log.h"
29 #include "string_to_numerical.h"
30 namespace SysTuning {
31 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)32 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
33 {
34 rpcFunctions_.clear();
35
36 using std::placeholders::_1;
37 using std::placeholders::_2;
38 using std::placeholders::_3;
39
40 auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
41 rpcFunctions_["/parsedata"] = parsedata;
42
43 auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
44 rpcFunctions_["/parsedataover"] = parsedataover;
45
46 auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
47 rpcFunctions_["/sqlquery"] = sqlquery;
48
49 auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
50 rpcFunctions_["/sqloperate"] = sqloperate;
51
52 auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
53 rpcFunctions_["/reset"] = reset;
54 }
55
56 #ifdef _WIN32
Run(int port)57 void HttpServer::Run(int port)
58 {
59 WSADATA ws{};
60 if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
61 return;
62 }
63 if (!CreateSocket(port)) {
64 return;
65 }
66 WSAEVENT events[COUNT_SOCKET];
67 for (int i = 0; i < COUNT_SOCKET; i++) {
68 if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
69 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
70 return;
71 }
72 WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
73 }
74
75 while (!isExit_) {
76 ClearDeadClientThread();
77
78 int index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
79 if (index == WSA_WAIT_FAILED) {
80 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
81 break;
82 } else if (index == WSA_WAIT_TIMEOUT) {
83 continue;
84 }
85
86 index = index - WSA_WAIT_EVENT_0;
87 WSANETWORKEVENTS event;
88 WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
89 if (event.lNetworkEvents & FD_ACCEPT) {
90 if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
91 continue;
92 }
93
94 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
95 if (sockets_[index].Accept(client->sock_)) {
96 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
97 clientThreads_.push_back(std::move(client));
98 } else {
99 TS_LOGE("http socket accept error");
100 std::this_thread::sleep_for(std::chrono::seconds(1));
101 }
102 }
103 }
104
105 for (const auto& it : clientThreads_) {
106 if (it->thread_.joinable()) {
107 it->sock_.Close();
108 it->thread_.join();
109 }
110 }
111 clientThreads_.clear();
112
113 WSACleanup();
114 }
115 #else
Run(int port)116 void HttpServer::Run(int port)
117 {
118 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
119 return;
120 }
121
122 if (!CreateSocket(port)) {
123 return;
124 }
125 TS_LOGI("http server running");
126 struct pollfd fds[COUNT_SOCKET];
127 for (int i = 0; i < COUNT_SOCKET; i++) {
128 fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
129 }
130 while (!isExit_) {
131 ClearDeadClientThread();
132 if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
133 continue; // try again
134 }
135
136 for (int i = 0; i < 1; i++) {
137 if (fds[i].revents != POLLIN) {
138 continue;
139 }
140 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
141 if (sockets_[i].Accept(client->sock_)) {
142 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
143 clientThreads_.push_back(std::move(client));
144 } else {
145 std::this_thread::sleep_for(std::chrono::seconds(1));
146 }
147 }
148 }
149
150 for (const auto& it : clientThreads_) {
151 if (it->thread_.joinable()) {
152 it->sock_.Close();
153 it->thread_.join();
154 }
155 }
156 clientThreads_.clear();
157
158 for (int i = 0; i < COUNT_SOCKET; i++) {
159 sockets_[i].Close();
160 }
161 TS_LOGI("http server exit");
162 }
163 #endif
164
Exit()165 void HttpServer::Exit()
166 {
167 isExit_ = true;
168 for (int i = 0; i < COUNT_SOCKET; i++) {
169 sockets_[i].Close();
170 }
171 }
172
CreateSocket(int port)173 bool HttpServer::CreateSocket(int port)
174 {
175 for (int i = 0; i < COUNT_SOCKET; i++) {
176 if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
177 TS_LOGE("Create http socket error");
178 return false;
179 }
180 if (!sockets_[i].Bind(port)) {
181 TS_LOGE("bind http socket error");
182 return false;
183 }
184 if (!sockets_[i].Listen(SOMAXCONN)) {
185 TS_LOGE("listen http socket error");
186 return false;
187 }
188 }
189
190 return true;
191 }
192
ClearDeadClientThread()193 void HttpServer::ClearDeadClientThread()
194 {
195 for (auto it = clientThreads_.begin(); it != clientThreads_.end();) {
196 if (it->get()->sock_.GetFd() != -1) {
197 it++;
198 continue;
199 }
200 if (it->get()->thread_.joinable()) {
201 it->get()->thread_.join();
202 }
203 it = clientThreads_.erase(it);
204 }
205 }
206
207 #ifdef _WIN32
ProcessClient(HttpSocket & client)208 void HttpServer::ProcessClient(HttpSocket& client)
209 {
210 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
211 size_t recvLen = recvBuf.size();
212 size_t recvPos = 0;
213 RequestST reqST;
214 WSAEVENT recvEvent = WSACreateEvent();
215 if (recvEvent == WSA_INVALID_EVENT) {
216 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
217 return;
218 }
219 WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
220 while (!isExit_) {
221 int index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
222 if (index == WSA_WAIT_FAILED) {
223 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
224 break;
225 } else if (index == WSA_WAIT_TIMEOUT) {
226 if (reqST.stat != RequstParseStat::INIT) {
227 ProcessRequest(client, reqST);
228 reqST.stat = RequstParseStat::INIT;
229 recvPos = 0;
230 recvLen = recvBuf.size();
231 }
232 continue;
233 }
234
235 WSANETWORKEVENTS event;
236 WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
237 if (event.lNetworkEvents & FD_READ) {
238 if (event.iErrorCode[FD_READ_BIT] != 0) {
239 continue;
240 }
241 if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
242 break;
243 }
244 recvPos += recvLen;
245 ParseRequest(recvBuf.data(), recvPos, reqST);
246 recvLen = recvBuf.size() - recvPos;
247 if (reqST.stat == RequstParseStat::RECVING) {
248 continue;
249 }
250 ProcessRequest(client, reqST);
251 reqST.stat = RequstParseStat::INIT;
252 } else if (event.lNetworkEvents & FD_CLOSE) {
253 TS_LOGI("client close socket(%d)", client.GetFd());
254 break;
255 }
256 }
257 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
258
259 client.Close();
260 }
261 #else
ProcessClient(HttpSocket & client)262 void HttpServer::ProcessClient(HttpSocket& client)
263 {
264 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
265 size_t recvLen = recvBuf.size();
266 size_t recvPos = 0;
267 RequestST reqST;
268
269 struct pollfd fd = {client.GetFd(), POLLIN, 0};
270 while (!isExit_) {
271 int pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
272 if (pollRet < 0) {
273 TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
274 break;
275 }
276 if (pollRet == 0) {
277 if (reqST.stat != RequstParseStat::INIT) {
278 ProcessRequest(client, reqST);
279 reqST.stat = RequstParseStat::INIT;
280 recvPos = 0;
281 recvLen = recvBuf.size();
282 }
283 continue;
284 }
285 if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
286 TS_LOGI("client exit");
287 break;
288 }
289 recvPos += recvLen;
290 ParseRequest(recvBuf.data(), recvPos, reqST);
291 recvLen = recvBuf.size() - recvPos;
292 if (reqST.stat == RequstParseStat::RECVING) {
293 continue;
294 }
295 ProcessRequest(client, reqST);
296 reqST.stat = RequstParseStat::INIT;
297 }
298 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
299
300 client.Close();
301 TS_LOGI("thread exit");
302 }
303 #endif
304
ProcessRequest(HttpSocket & client,RequestST & request)305 void HttpServer::ProcessRequest(HttpSocket& client, RequestST& request)
306 {
307 if (request.stat == RequstParseStat::RECVING) {
308 TS_LOGE("http request data missing, client %d\n", client.GetFd());
309 HttpResponse(client, "408 Request Time-out\r\n");
310 return;
311 } else if (request.stat != RequstParseStat::OK) {
312 TS_LOGE("bad http request, client %d\n", client.GetFd());
313 HttpResponse(client, "400 Bad Request\r\n");
314 return;
315 }
316 if (request.method == "OPTIONS") {
317 HttpResponse(client,
318 "204 No Content\r\n"
319 "Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n"
320 "Access-Control-Allow-Headers: *\r\n"
321 "Access-Control-Max-Age: 86400\r\n");
322 return;
323 } else if (request.method != "POST" && request.method != "GET") {
324 TS_LOGE("method(%s) not allowed, client %d", request.method.c_str(), client.GetFd());
325 HttpResponse(client, "405 Method Not Allowed\r\n");
326 return;
327 }
328 auto it = rpcFunctions_.find(request.uri);
329 if (it == rpcFunctions_.end()) {
330 TS_LOGE("http uri(%s) not found, client %d", request.uri.c_str(), client.GetFd());
331 HttpResponse(client, "404 Not Found\r\n");
332 return;
333 }
334 HttpResponse(client, "200 OK\r\n", true);
335 auto resultCallback = [&client](const std::string& result, int) {
336 std::stringstream chunkLenbuff;
337 chunkLenbuff << std::hex << result.size() << "\r\n";
338 if (!client.Send(chunkLenbuff.str().data(), chunkLenbuff.str().size())) {
339 TS_LOGE("send client socket(%d) error", client.GetFd());
340 return;
341 }
342 if (!client.Send(result.data(), result.size())) {
343 TS_LOGE("send client socket(%d) error", client.GetFd());
344 return;
345 }
346 if (!client.Send("\r\n", strlen("\r\n"))) {
347 TS_LOGE("send client socket(%d) error", client.GetFd());
348 return;
349 }
350 };
351 it->second(request.body, request.bodyLen, resultCallback);
352 if (!client.Send("0\r\n\r\n", strlen("0\r\n\r\n"))) { // chunk tail
353 TS_LOGE("send client socket(%d) error", client.GetFd());
354 }
355 }
356
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)357 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
358 {
359 std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
360 size_t bodyPos = reqStr.find("\r\n\r\n");
361 if (bodyPos == 0) {
362 len = 0;
363 httpReq.stat = RequstParseStat::BAD;
364 return;
365 } else if (bodyPos == std::string_view::npos) {
366 httpReq.stat = RequstParseStat::RECVING;
367 return;
368 }
369 std::string_view header = reqStr.substr(0, bodyPos);
370 bodyPos += strlen("\r\n\r\n");
371 httpReq.bodyLen = reqStr.size() - bodyPos;
372
373 std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
374 // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
375 std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
376 const size_t indexHttpMethod = 0;
377 const size_t indexHttpUri = 1;
378 const size_t indexHttpVersion = 2;
379 const size_t countRequestItems = 3;
380 if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
381 len = 0;
382 httpReq.stat = RequstParseStat::BAD;
383 return;
384 }
385 httpReq.method = requestItems[indexHttpMethod];
386 httpReq.uri = requestItems[indexHttpUri];
387
388 for (size_t i = 1; i < headerlines.size(); i++) {
389 size_t tagPos = headerlines[i].find(":");
390 if (tagPos == std::string_view::npos) {
391 len = 0;
392 httpReq.stat = RequstParseStat::BAD;
393 return;
394 }
395 std::string_view tag = headerlines[i].substr(0, tagPos);
396 if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
397 std::string value(headerlines[i].data() + tagPos + strlen(":"),
398 headerlines[i].size() - tagPos - strlen(":"));
399 size_t conterntLen = atoi(value.c_str());
400 if (conterntLen > httpReq.bodyLen) {
401 httpReq.stat = RequstParseStat::RECVING;
402 return;
403 } else if (conterntLen < httpReq.bodyLen) {
404 httpReq.bodyLen = conterntLen;
405 }
406 }
407 }
408
409 if (httpReq.bodyLen > 0) {
410 httpReq.body = (requst + bodyPos);
411 }
412 httpReq.stat = RequstParseStat::OK;
413 len -= (bodyPos + httpReq.bodyLen);
414 return;
415 }
416
HttpResponse(HttpSocket & client,const std::string & status,bool hasBody)417 void HttpServer::HttpResponse(HttpSocket& client, const std::string& status, bool hasBody)
418 {
419 std::string res;
420 const size_t maxLenResponse = 1024;
421 res.reserve(maxLenResponse);
422 res += "HTTP/1.1 ";
423 res += status;
424
425 res += "Connection: Keep-Alive\r\n";
426 if (hasBody) {
427 res += "Content-Type: application/json\r\n";
428 res += "Transfer-Encoding: chunked\r\n";
429 }
430 res += "\r\n";
431 if (!client.Send(res.data(), res.size())) {
432 TS_LOGE("send client socket(%d) error", client.GetFd());
433 }
434 }
435
StringSplit(std::string_view source,std::string_view split)436 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
437 {
438 std::vector<std::string_view> result;
439 if (!split.empty()) {
440 size_t pos = 0;
441 while ((pos = source.find(split)) != std::string_view::npos) {
442 // split
443 std::string_view token = source.substr(0, pos);
444 if (!token.empty()) {
445 result.push_back(token);
446 }
447 source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
448 }
449 }
450 // add last token
451 if (!source.empty()) {
452 result.push_back(source);
453 }
454 return result;
455 }
456 } // namespace TraceStreamer
457 } // namespace SysTuning
458