• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <strings.h> // for macx
21 #include <thread>
22 #ifdef _WIN32
23 #include <WinSock2.h>
24 #else
25 #include <poll.h>
26 #include <sys/socket.h>
27 #endif
28 #include "log.h"
29 #include "string_to_numerical.h"
30 namespace SysTuning {
31 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)32 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
33 {
34     rpcFunctions_.clear();
35 
36     using std::placeholders::_1;
37     using std::placeholders::_2;
38     using std::placeholders::_3;
39 
40     auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
41     rpcFunctions_["/parsedata"] = parsedata;
42 
43     auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
44     rpcFunctions_["/parsedataover"] = parsedataover;
45 
46     auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
47     rpcFunctions_["/sqlquery"] = sqlquery;
48 
49     auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
50     rpcFunctions_["/sqloperate"] = sqloperate;
51 
52     auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
53     rpcFunctions_["/reset"] = reset;
54 }
55 
56 #ifdef _WIN32
Run(int port)57 void HttpServer::Run(int port)
58 {
59     WSADATA ws{};
60     if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
61         return;
62     }
63     if (!CreateSocket(port)) {
64         return;
65     }
66     WSAEVENT events[COUNT_SOCKET];
67     for (int i = 0; i < COUNT_SOCKET; i++) {
68         if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
69             TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
70             return;
71         }
72         WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
73     }
74 
75     while (!isExit_) {
76         ClearDeadClientThread();
77 
78         int index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
79         if (index == WSA_WAIT_FAILED) {
80             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
81             break;
82         } else if (index == WSA_WAIT_TIMEOUT) {
83             continue;
84         }
85 
86         index = index - WSA_WAIT_EVENT_0;
87         WSANETWORKEVENTS event;
88         WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
89         if (event.lNetworkEvents & FD_ACCEPT) {
90             if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
91                 continue;
92             }
93 
94             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
95             if (sockets_[index].Accept(client->sock_)) {
96                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
97                 clientThreads_.push_back(std::move(client));
98             } else {
99                 TS_LOGE("http socket accept error");
100                 std::this_thread::sleep_for(std::chrono::seconds(1));
101             }
102         }
103     }
104 
105     for (const auto& it : clientThreads_) {
106         if (it->thread_.joinable()) {
107             it->sock_.Close();
108             it->thread_.join();
109         }
110     }
111     clientThreads_.clear();
112 
113     WSACleanup();
114 }
115 #else
Run(int port)116 void HttpServer::Run(int port)
117 {
118     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
119         return;
120     }
121 
122     if (!CreateSocket(port)) {
123         return;
124     }
125     TS_LOGI("http server running");
126     struct pollfd fds[COUNT_SOCKET];
127     for (int i = 0; i < COUNT_SOCKET; i++) {
128         fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
129     }
130     while (!isExit_) {
131         ClearDeadClientThread();
132         if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
133             continue; // try again
134         }
135 
136         for (int i = 0; i < 1; i++) {
137             if (fds[i].revents != POLLIN) {
138                 continue;
139             }
140             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
141             if (sockets_[i].Accept(client->sock_)) {
142                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
143                 clientThreads_.push_back(std::move(client));
144             } else {
145                 std::this_thread::sleep_for(std::chrono::seconds(1));
146             }
147         }
148     }
149 
150     for (const auto& it : clientThreads_) {
151         if (it->thread_.joinable()) {
152             it->sock_.Close();
153             it->thread_.join();
154         }
155     }
156     clientThreads_.clear();
157 
158     for (int i = 0; i < COUNT_SOCKET; i++) {
159         sockets_[i].Close();
160     }
161     TS_LOGI("http server exit");
162 }
163 #endif
164 
Exit()165 void HttpServer::Exit()
166 {
167     isExit_ = true;
168     for (int i = 0; i < COUNT_SOCKET; i++) {
169         sockets_[i].Close();
170     }
171 }
172 
CreateSocket(int port)173 bool HttpServer::CreateSocket(int port)
174 {
175     for (int i = 0; i < COUNT_SOCKET; i++) {
176         if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
177             TS_LOGE("Create http socket error");
178             return false;
179         }
180         if (!sockets_[i].Bind(port)) {
181             TS_LOGE("bind http socket error");
182             return false;
183         }
184         if (!sockets_[i].Listen(SOMAXCONN)) {
185             TS_LOGE("listen http socket error");
186             return false;
187         }
188     }
189 
190     return true;
191 }
192 
ClearDeadClientThread()193 void HttpServer::ClearDeadClientThread()
194 {
195     for (auto it = clientThreads_.begin(); it != clientThreads_.end();) {
196         if (it->get()->sock_.GetFd() != -1) {
197             it++;
198             continue;
199         }
200         if (it->get()->thread_.joinable()) {
201             it->get()->thread_.join();
202         }
203         it = clientThreads_.erase(it);
204     }
205 }
206 
207 #ifdef _WIN32
ProcessClient(HttpSocket & client)208 void HttpServer::ProcessClient(HttpSocket& client)
209 {
210     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
211     size_t recvLen = recvBuf.size();
212     size_t recvPos = 0;
213     RequestST reqST;
214     WSAEVENT recvEvent = WSACreateEvent();
215     if (recvEvent == WSA_INVALID_EVENT) {
216         TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
217         return;
218     }
219     WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
220     while (!isExit_) {
221         int index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
222         if (index == WSA_WAIT_FAILED) {
223             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
224             break;
225         } else if (index == WSA_WAIT_TIMEOUT) {
226             if (reqST.stat != RequstParseStat::INIT) {
227                 ProcessRequest(client, reqST);
228                 reqST.stat = RequstParseStat::INIT;
229                 recvPos = 0;
230                 recvLen = recvBuf.size();
231             }
232             continue;
233         }
234 
235         WSANETWORKEVENTS event;
236         WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
237         if (event.lNetworkEvents & FD_READ) {
238             if (event.iErrorCode[FD_READ_BIT] != 0) {
239                 continue;
240             }
241             if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
242                 break;
243             }
244             recvPos += recvLen;
245             ParseRequest(recvBuf.data(), recvPos, reqST);
246             recvLen = recvBuf.size() - recvPos;
247             if (reqST.stat == RequstParseStat::RECVING) {
248                 continue;
249             }
250             ProcessRequest(client, reqST);
251             reqST.stat = RequstParseStat::INIT;
252         } else if (event.lNetworkEvents & FD_CLOSE) {
253             TS_LOGI("client close socket(%d)", client.GetFd());
254             break;
255         }
256     }
257     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
258 
259     client.Close();
260 }
261 #else
ProcessClient(HttpSocket & client)262 void HttpServer::ProcessClient(HttpSocket& client)
263 {
264     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
265     size_t recvLen = recvBuf.size();
266     size_t recvPos = 0;
267     RequestST reqST;
268 
269     struct pollfd fd = {client.GetFd(), POLLIN, 0};
270     while (!isExit_) {
271         int pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
272         if (pollRet < 0) {
273             TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
274             break;
275         }
276         if (pollRet == 0) {
277             if (reqST.stat != RequstParseStat::INIT) {
278                 ProcessRequest(client, reqST);
279                 reqST.stat = RequstParseStat::INIT;
280                 recvPos = 0;
281                 recvLen = recvBuf.size();
282             }
283             continue;
284         }
285         if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
286             TS_LOGI("client exit");
287             break;
288         }
289         recvPos += recvLen;
290         ParseRequest(recvBuf.data(), recvPos, reqST);
291         recvLen = recvBuf.size() - recvPos;
292         if (reqST.stat == RequstParseStat::RECVING) {
293             continue;
294         }
295         ProcessRequest(client, reqST);
296         reqST.stat = RequstParseStat::INIT;
297     }
298     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
299 
300     client.Close();
301     TS_LOGI("thread exit");
302 }
303 #endif
304 
ProcessRequest(HttpSocket & client,RequestST & request)305 void HttpServer::ProcessRequest(HttpSocket& client, RequestST& request)
306 {
307     if (request.stat == RequstParseStat::RECVING) {
308         TS_LOGE("http request data missing, client %d\n", client.GetFd());
309         HttpResponse(client, "408 Request Time-out\r\n");
310         return;
311     } else if (request.stat != RequstParseStat::OK) {
312         TS_LOGE("bad http request, client %d\n", client.GetFd());
313         HttpResponse(client, "400 Bad Request\r\n");
314         return;
315     }
316     if (request.method == "OPTIONS") {
317         HttpResponse(client,
318                      "204 No Content\r\n"
319                      "Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n"
320                      "Access-Control-Allow-Headers: *\r\n"
321                      "Access-Control-Max-Age: 86400\r\n");
322         return;
323     } else if (request.method != "POST" && request.method != "GET") {
324         TS_LOGE("method(%s) not allowed, client %d", request.method.c_str(), client.GetFd());
325         HttpResponse(client, "405 Method Not Allowed\r\n");
326         return;
327     }
328     auto it = rpcFunctions_.find(request.uri);
329     if (it == rpcFunctions_.end()) {
330         TS_LOGE("http uri(%s) not found, client %d", request.uri.c_str(), client.GetFd());
331         HttpResponse(client, "404 Not Found\r\n");
332         return;
333     }
334     HttpResponse(client, "200 OK\r\n", true);
335     auto resultCallback = [&client](const std::string& result, int) {
336         std::stringstream chunkLenbuff;
337         chunkLenbuff << std::hex << result.size() << "\r\n";
338         if (!client.Send(chunkLenbuff.str().data(), chunkLenbuff.str().size())) {
339             TS_LOGE("send client socket(%d) error", client.GetFd());
340             return;
341         }
342         if (!client.Send(result.data(), result.size())) {
343             TS_LOGE("send client socket(%d) error", client.GetFd());
344             return;
345         }
346         if (!client.Send("\r\n", strlen("\r\n"))) {
347             TS_LOGE("send client socket(%d) error", client.GetFd());
348             return;
349         }
350     };
351     it->second(request.body, request.bodyLen, resultCallback);
352     if (!client.Send("0\r\n\r\n", strlen("0\r\n\r\n"))) { // chunk tail
353         TS_LOGE("send client socket(%d) error", client.GetFd());
354     }
355 }
356 
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)357 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
358 {
359     std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
360     size_t bodyPos = reqStr.find("\r\n\r\n");
361     if (bodyPos == 0) {
362         len = 0;
363         httpReq.stat = RequstParseStat::BAD;
364         return;
365     } else if (bodyPos == std::string_view::npos) {
366         httpReq.stat = RequstParseStat::RECVING;
367         return;
368     }
369     std::string_view header = reqStr.substr(0, bodyPos);
370     bodyPos += strlen("\r\n\r\n");
371     httpReq.bodyLen = reqStr.size() - bodyPos;
372 
373     std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
374     // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
375     std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
376     const size_t indexHttpMethod = 0;
377     const size_t indexHttpUri = 1;
378     const size_t indexHttpVersion = 2;
379     const size_t countRequestItems = 3;
380     if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
381         len = 0;
382         httpReq.stat = RequstParseStat::BAD;
383         return;
384     }
385     httpReq.method = requestItems[indexHttpMethod];
386     httpReq.uri = requestItems[indexHttpUri];
387 
388     for (size_t i = 1; i < headerlines.size(); i++) {
389         size_t tagPos = headerlines[i].find(":");
390         if (tagPos == std::string_view::npos) {
391             len = 0;
392             httpReq.stat = RequstParseStat::BAD;
393             return;
394         }
395         std::string_view tag = headerlines[i].substr(0, tagPos);
396         if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
397             std::string value(headerlines[i].data() + tagPos + strlen(":"),
398                               headerlines[i].size() - tagPos - strlen(":"));
399             size_t conterntLen = atoi(value.c_str());
400             if (conterntLen > httpReq.bodyLen) {
401                 httpReq.stat = RequstParseStat::RECVING;
402                 return;
403             } else if (conterntLen < httpReq.bodyLen) {
404                 httpReq.bodyLen = conterntLen;
405             }
406         }
407     }
408 
409     if (httpReq.bodyLen > 0) {
410         httpReq.body = (requst + bodyPos);
411     }
412     httpReq.stat = RequstParseStat::OK;
413     len -= (bodyPos + httpReq.bodyLen);
414     return;
415 }
416 
HttpResponse(HttpSocket & client,const std::string & status,bool hasBody)417 void HttpServer::HttpResponse(HttpSocket& client, const std::string& status, bool hasBody)
418 {
419     std::string res;
420     const size_t maxLenResponse = 1024;
421     res.reserve(maxLenResponse);
422     res += "HTTP/1.1 ";
423     res += status;
424 
425     res += "Connection: Keep-Alive\r\n";
426     if (hasBody) {
427         res += "Content-Type: application/json\r\n";
428         res += "Transfer-Encoding: chunked\r\n";
429     }
430     res += "\r\n";
431     if (!client.Send(res.data(), res.size())) {
432         TS_LOGE("send client socket(%d) error", client.GetFd());
433     }
434 }
435 
StringSplit(std::string_view source,std::string_view split)436 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
437 {
438     std::vector<std::string_view> result;
439     if (!split.empty()) {
440         size_t pos = 0;
441         while ((pos = source.find(split)) != std::string_view::npos) {
442             // split
443             std::string_view token = source.substr(0, pos);
444             if (!token.empty()) {
445                 result.push_back(token);
446             }
447             source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
448         }
449     }
450     // add last token
451     if (!source.empty()) {
452         result.push_back(source);
453     }
454     return result;
455 }
456 } // namespace TraceStreamer
457 } // namespace SysTuning
458