1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <strings.h> // for macx
21 #include <thread>
22 #ifdef _WIN32
23 #include <winsock2.h>
24 #else
25 #include <poll.h>
26 #include <sys/socket.h>
27 #endif
28 #include "log.h"
29 #include "string_to_numerical.h"
30 namespace SysTuning {
31 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)32 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
33 {
34 rpcFunctions_.clear();
35
36 using std::placeholders::_1;
37 using std::placeholders::_2;
38 using std::placeholders::_3;
39
40 auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
41 rpcFunctions_["/parsedata"] = parsedata;
42
43 auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
44 rpcFunctions_["/parsedataover"] = parsedataover;
45
46 auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
47 rpcFunctions_["/sqlquery"] = sqlquery;
48
49 auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
50 rpcFunctions_["/sqloperate"] = sqloperate;
51
52 auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
53 rpcFunctions_["/reset"] = reset;
54 }
55
CloseAllThreads()56 void HttpServer::CloseAllThreads()
57 {
58 for (const auto& it : clientThreads_) {
59 if (it->thread_.joinable()) {
60 it->thread_.join();
61 it->sock_.Close();
62 }
63 }
64 }
65
66 #ifdef _WIN32
Run(int32_t port)67 void HttpServer::Run(int32_t port)
68 {
69 WSADATA ws{};
70 if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
71 return;
72 }
73 if (!CreateSocket(port)) {
74 return;
75 }
76 WSAEVENT events[COUNT_SOCKET];
77 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
78 if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
79 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
80 return;
81 }
82 WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
83 }
84
85 while (!isExit_) {
86 ClearDeadClientThread();
87
88 int32_t index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
89 if (index == WSA_WAIT_FAILED) {
90 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
91 break;
92 } else if (index == WSA_WAIT_TIMEOUT) {
93 continue;
94 }
95
96 index = index - WSA_WAIT_EVENT_0;
97 WSANETWORKEVENTS event;
98 WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
99 if (event.lNetworkEvents & FD_ACCEPT) {
100 if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
101 continue;
102 }
103
104 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
105 if (sockets_[index].Accept(client->sock_)) {
106 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
107 clientThreads_.push_back(std::move(client));
108 } else {
109 TS_LOGE("http socket accept error");
110 std::this_thread::sleep_for(std::chrono::seconds(1));
111 }
112 }
113 }
114 CloseAllThreads();
115 clientThreads_.clear();
116
117 WSACleanup();
118 }
119 #else
Run(int32_t port)120 void HttpServer::Run(int32_t port)
121 {
122 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
123 return;
124 }
125
126 if (!CreateSocket(port)) {
127 return;
128 }
129 TS_LOGI("http server running");
130 struct pollfd fds[COUNT_SOCKET];
131 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
132 fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
133 }
134 while (!isExit_) {
135 ClearDeadClientThread();
136 if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
137 continue; // try again
138 }
139
140 for (int32_t i = 0; i < 1; i++) {
141 if (fds[i].revents != POLLIN) {
142 continue;
143 }
144 std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
145 if (sockets_[i].Accept(client->sock_)) {
146 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
147 clientThreads_.push_back(std::move(client));
148 } else {
149 std::this_thread::sleep_for(std::chrono::seconds(1));
150 }
151 }
152 }
153 CloseAllThreads();
154 clientThreads_.clear();
155
156 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
157 sockets_[i].Close();
158 }
159 TS_LOGI("http server exit");
160 }
161 #endif
162
Exit()163 void HttpServer::Exit()
164 {
165 isExit_ = true;
166 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
167 sockets_[i].Close();
168 }
169 }
170
CreateSocket(int32_t port)171 bool HttpServer::CreateSocket(int32_t port)
172 {
173 for (int32_t i = 0; i < COUNT_SOCKET; i++) {
174 if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
175 TS_LOGE("Create http socket error");
176 return false;
177 }
178 if (!sockets_[i].Bind(port)) {
179 TS_LOGE("bind http socket error");
180 return false;
181 }
182 if (!sockets_[i].Listen(SOMAXCONN)) {
183 TS_LOGE("listen http socket error");
184 return false;
185 }
186 }
187
188 return true;
189 }
190
ClearDeadClientThread()191 void HttpServer::ClearDeadClientThread()
192 {
193 for (auto it = clientThreads_.begin(); it != clientThreads_.end();) {
194 if (it->get()->sock_.GetFd() != -1) {
195 it++;
196 continue;
197 }
198 if (it->get()->thread_.joinable()) {
199 it->get()->thread_.join();
200 }
201 it = clientThreads_.erase(it);
202 }
203 }
204
ProcessAndParseReq(size_t & recvPos,size_t & recvLen,std::vector<uint8_t> & recvBuf,RequestST & reqST,HttpSocket & client)205 bool HttpServer::ProcessAndParseReq(size_t& recvPos,
206 size_t& recvLen,
207 std::vector<uint8_t>& recvBuf,
208 RequestST& reqST,
209 HttpSocket& client)
210 {
211 if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
212 return false;
213 }
214 recvPos += recvLen;
215 ParseRequest(recvBuf.data(), recvPos, reqST);
216 recvLen = recvBuf.size() - recvPos;
217 if (reqST.stat == RequstParseStat::RECVING) {
218 return true;
219 }
220 ProcessRequest(client, reqST);
221 reqST.stat = RequstParseStat::INIT;
222 return true;
223 }
224
225 #ifdef _WIN32
ProcessClient(HttpSocket & client)226 void HttpServer::ProcessClient(HttpSocket& client)
227 {
228 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
229 size_t recvLen = recvBuf.size();
230 size_t recvPos = 0;
231 RequestST reqST;
232 WSAEVENT recvEvent = WSACreateEvent();
233 if (recvEvent == WSA_INVALID_EVENT) {
234 TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
235 return;
236 }
237 WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
238 while (!isExit_) {
239 int32_t index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
240 if (index == WSA_WAIT_FAILED) {
241 TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
242 break;
243 } else if (index == WSA_WAIT_TIMEOUT) {
244 if (reqST.stat != RequstParseStat::INIT) {
245 ProcessRequest(client, reqST);
246 reqST.stat = RequstParseStat::INIT;
247 recvPos = 0;
248 recvLen = recvBuf.size();
249 }
250 continue;
251 }
252
253 WSANETWORKEVENTS event;
254 WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
255 if (event.lNetworkEvents & FD_READ) {
256 if (event.iErrorCode[FD_READ_BIT] != 0) {
257 continue;
258 }
259 if (!ProcessAndParseReq(recvPos, recvLen, recvBuf, reqST, client)) {
260 break;
261 }
262 } else if (event.lNetworkEvents & FD_CLOSE) {
263 TS_LOGI("client close socket(%d)", client.GetFd());
264 break;
265 }
266 }
267 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
268
269 client.Close();
270 }
271 #else
ProcessClient(HttpSocket & client)272 void HttpServer::ProcessClient(HttpSocket& client)
273 {
274 std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
275 size_t recvLen = recvBuf.size();
276 size_t recvPos = 0;
277 RequestST reqST;
278
279 struct pollfd fd = {client.GetFd(), POLLIN, 0};
280 while (!isExit_) {
281 int32_t pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
282 if (pollRet < 0) {
283 TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
284 break;
285 }
286 if (pollRet == 0) {
287 if (reqST.stat != RequstParseStat::INIT) {
288 ProcessRequest(client, reqST);
289 reqST.stat = RequstParseStat::INIT;
290 recvPos = 0;
291 recvLen = recvBuf.size();
292 }
293 continue;
294 }
295 if (!ProcessAndParseReq(recvPos, recvLen, recvBuf, reqST, client)) {
296 break;
297 }
298 }
299 TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
300
301 client.Close();
302 TS_LOGI("thread exit");
303 }
304 #endif
305
ProcessRequest(HttpSocket & client,RequestST & request)306 void HttpServer::ProcessRequest(HttpSocket& client, RequestST& request)
307 {
308 if (request.stat == RequstParseStat::RECVING) {
309 TS_LOGE("http request data missing, client %d\n", client.GetFd());
310 HttpResponse(client, "408 Request Time-out\r\n");
311 return;
312 } else if (request.stat != RequstParseStat::OK) {
313 TS_LOGE("bad http request, client %d\n", client.GetFd());
314 HttpResponse(client, "400 Bad Request\r\n");
315 return;
316 }
317 if (request.method == "OPTIONS") {
318 HttpResponse(client,
319 "204 No Content\r\n"
320 "Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n"
321 "Access-Control-Allow-Headers: *\r\n"
322 "Access-Control-Max-Age: 86400\r\n");
323 return;
324 } else if (request.method != "POST" && request.method != "GET") {
325 TS_LOGE("method(%s) not allowed, client %d", request.method.c_str(), client.GetFd());
326 HttpResponse(client, "405 Method Not Allowed\r\n");
327 return;
328 }
329 auto it = rpcFunctions_.find(request.uri);
330 if (it == rpcFunctions_.end()) {
331 TS_LOGE("http uri(%s) not found, client %d", request.uri.c_str(), client.GetFd());
332 HttpResponse(client, "404 Not Found\r\n");
333 return;
334 }
335 HttpResponse(client, "200 OK\r\n", true);
336 auto resultCallback = [&client](const std::string& result, int32_t) {
337 std::stringstream chunkLenbuff;
338 chunkLenbuff << std::hex << result.size() << "\r\n";
339 if (!client.Send(chunkLenbuff.str().data(), chunkLenbuff.str().size())) {
340 TS_LOGE("send client socket(%d) error", client.GetFd());
341 return;
342 }
343 if (!client.Send(result.data(), result.size())) {
344 TS_LOGE("send client socket(%d) error", client.GetFd());
345 return;
346 }
347 if (!client.Send("\r\n", strlen("\r\n"))) {
348 TS_LOGE("send client socket(%d) error", client.GetFd());
349 return;
350 }
351 };
352 it->second(request.body, request.bodyLen, resultCallback);
353 if (!client.Send("0\r\n\r\n", strlen("0\r\n\r\n"))) { // chunk tail
354 TS_LOGE("send client socket(%d) error", client.GetFd());
355 }
356 }
357
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)358 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
359 {
360 std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
361 size_t bodyPos = reqStr.find("\r\n\r\n");
362 if (bodyPos == 0) {
363 len = 0;
364 httpReq.stat = RequstParseStat::BAD;
365 return;
366 } else if (bodyPos == std::string_view::npos) {
367 httpReq.stat = RequstParseStat::RECVING;
368 return;
369 }
370 std::string_view header = reqStr.substr(0, bodyPos);
371 bodyPos += strlen("\r\n\r\n");
372 httpReq.bodyLen = reqStr.size() - bodyPos;
373
374 std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
375 // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
376 std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
377 const size_t indexHttpMethod = 0;
378 const size_t indexHttpUri = 1;
379 const size_t indexHttpVersion = 2;
380 const size_t countRequestItems = 3;
381 if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
382 len = 0;
383 httpReq.stat = RequstParseStat::BAD;
384 return;
385 }
386 httpReq.method = requestItems[indexHttpMethod];
387 httpReq.uri = requestItems[indexHttpUri];
388
389 for (size_t i = 1; i < headerlines.size(); i++) {
390 size_t tagPos = headerlines[i].find(":");
391 if (tagPos == std::string_view::npos) {
392 len = 0;
393 httpReq.stat = RequstParseStat::BAD;
394 return;
395 }
396 std::string_view tag = headerlines[i].substr(0, tagPos);
397 if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
398 std::string value(headerlines[i].data() + tagPos + strlen(":"),
399 headerlines[i].size() - tagPos - strlen(":"));
400 size_t conterntLen = atoi(value.c_str());
401 if (conterntLen > httpReq.bodyLen) {
402 httpReq.stat = RequstParseStat::RECVING;
403 return;
404 } else if (conterntLen < httpReq.bodyLen) {
405 httpReq.bodyLen = conterntLen;
406 }
407 }
408 }
409
410 if (httpReq.bodyLen > 0) {
411 httpReq.body = (requst + bodyPos);
412 }
413 httpReq.stat = RequstParseStat::OK;
414 len -= (bodyPos + httpReq.bodyLen);
415 return;
416 }
417
HttpResponse(HttpSocket & client,const std::string & status,bool hasBody)418 void HttpServer::HttpResponse(HttpSocket& client, const std::string& status, bool hasBody)
419 {
420 std::string res;
421 const size_t maxLenResponse = 1024;
422 res.reserve(maxLenResponse);
423 res += "HTTP/1.1 ";
424 res += status;
425
426 res += "Connection: Keep-Alive\r\n";
427 if (hasBody) {
428 res += "Content-Type: application/json\r\n";
429 res += "Transfer-Encoding: chunked\r\n";
430 }
431 res += "\r\n";
432 if (!client.Send(res.data(), res.size())) {
433 TS_LOGE("send client socket(%d) error", client.GetFd());
434 }
435 }
436
StringSplit(std::string_view source,std::string_view split)437 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
438 {
439 std::vector<std::string_view> result;
440 if (!split.empty()) {
441 size_t pos = 0;
442 while ((pos = source.find(split)) != std::string_view::npos) {
443 // split
444 std::string_view token = source.substr(0, pos);
445 if (!token.empty()) {
446 result.push_back(token);
447 }
448 source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
449 }
450 }
451 // add last token
452 if (!source.empty()) {
453 result.push_back(source);
454 }
455 return result;
456 }
457 } // namespace TraceStreamer
458 } // namespace SysTuning
459