• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <strings.h> // for macx
21 #include <thread>
22 #ifdef _WIN32
23 #include <winsock2.h>
24 #else
25 #include <poll.h>
26 #include <sys/socket.h>
27 #endif
28 #include "log.h"
29 #include "string_to_numerical.h"
30 namespace SysTuning {
31 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)32 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
33 {
34     rpcFunctions_.clear();
35 
36     using std::placeholders::_1;
37     using std::placeholders::_2;
38     using std::placeholders::_3;
39 
40     auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
41     rpcFunctions_["/parsedata"] = parsedata;
42 
43     auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
44     rpcFunctions_["/parsedataover"] = parsedataover;
45 
46     auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
47     rpcFunctions_["/sqlquery"] = sqlquery;
48 
49     auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
50     rpcFunctions_["/sqloperate"] = sqloperate;
51 
52     auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
53     rpcFunctions_["/reset"] = reset;
54 }
55 
CloseAllThreads()56 void HttpServer::CloseAllThreads()
57 {
58     for (const auto& it : clientThreads_) {
59         if (it->thread_.joinable()) {
60             it->thread_.join();
61             it->sock_.Close();
62         }
63     }
64 }
65 
66 #ifdef _WIN32
Run(int32_t port)67 void HttpServer::Run(int32_t port)
68 {
69     WSADATA ws{};
70     if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
71         return;
72     }
73     if (!CreateSocket(port)) {
74         return;
75     }
76     WSAEVENT events[COUNT_SOCKET];
77     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
78         if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
79             TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
80             return;
81         }
82         WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
83     }
84 
85     while (!isExit_) {
86         ClearDeadClientThread();
87 
88         int32_t index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
89         if (index == WSA_WAIT_FAILED) {
90             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
91             break;
92         } else if (index == WSA_WAIT_TIMEOUT) {
93             continue;
94         }
95 
96         index = index - WSA_WAIT_EVENT_0;
97         WSANETWORKEVENTS event;
98         WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
99         if (event.lNetworkEvents & FD_ACCEPT) {
100             if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
101                 continue;
102             }
103 
104             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
105             if (sockets_[index].Accept(client->sock_)) {
106                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
107                 clientThreads_.push_back(std::move(client));
108             } else {
109                 TS_LOGE("http socket accept error");
110                 std::this_thread::sleep_for(std::chrono::seconds(1));
111             }
112         }
113     }
114     CloseAllThreads();
115     clientThreads_.clear();
116 
117     WSACleanup();
118 }
119 #else
Run(int32_t port)120 void HttpServer::Run(int32_t port)
121 {
122     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
123         return;
124     }
125 
126     if (!CreateSocket(port)) {
127         return;
128     }
129     TS_LOGI("http server running");
130     struct pollfd fds[COUNT_SOCKET];
131     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
132         fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
133     }
134     while (!isExit_) {
135         ClearDeadClientThread();
136         if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
137             continue; // try again
138         }
139 
140         for (int32_t i = 0; i < 1; i++) {
141             if (fds[i].revents != POLLIN) {
142                 continue;
143             }
144             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
145             if (sockets_[i].Accept(client->sock_)) {
146                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
147                 clientThreads_.push_back(std::move(client));
148             } else {
149                 std::this_thread::sleep_for(std::chrono::seconds(1));
150             }
151         }
152     }
153     CloseAllThreads();
154     clientThreads_.clear();
155 
156     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
157         sockets_[i].Close();
158     }
159     TS_LOGI("http server exit");
160 }
161 #endif
162 
Exit()163 void HttpServer::Exit()
164 {
165     isExit_ = true;
166     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
167         sockets_[i].Close();
168     }
169 }
170 
CreateSocket(int32_t port)171 bool HttpServer::CreateSocket(int32_t port)
172 {
173     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
174         if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
175             TS_LOGE("Create http socket error");
176             return false;
177         }
178         if (!sockets_[i].Bind(port)) {
179             TS_LOGE("bind http socket error");
180             return false;
181         }
182         if (!sockets_[i].Listen(SOMAXCONN)) {
183             TS_LOGE("listen http socket error");
184             return false;
185         }
186     }
187 
188     return true;
189 }
190 
ClearDeadClientThread()191 void HttpServer::ClearDeadClientThread()
192 {
193     for (auto it = clientThreads_.begin(); it != clientThreads_.end();) {
194         if (it->get()->sock_.GetFd() != -1) {
195             it++;
196             continue;
197         }
198         if (it->get()->thread_.joinable()) {
199             it->get()->thread_.join();
200         }
201         it = clientThreads_.erase(it);
202     }
203 }
204 
ProcessAndParseReq(size_t & recvPos,size_t & recvLen,std::vector<uint8_t> & recvBuf,RequestST & reqST,HttpSocket & client)205 bool HttpServer::ProcessAndParseReq(size_t& recvPos,
206                                     size_t& recvLen,
207                                     std::vector<uint8_t>& recvBuf,
208                                     RequestST& reqST,
209                                     HttpSocket& client)
210 {
211     if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
212         return false;
213     }
214     recvPos += recvLen;
215     ParseRequest(recvBuf.data(), recvPos, reqST);
216     recvLen = recvBuf.size() - recvPos;
217     if (reqST.stat == RequstParseStat::RECVING) {
218         return true;
219     }
220     ProcessRequest(client, reqST);
221     reqST.stat = RequstParseStat::INIT;
222     return true;
223 }
224 
225 #ifdef _WIN32
ProcessClient(HttpSocket & client)226 void HttpServer::ProcessClient(HttpSocket& client)
227 {
228     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
229     size_t recvLen = recvBuf.size();
230     size_t recvPos = 0;
231     RequestST reqST;
232     WSAEVENT recvEvent = WSACreateEvent();
233     if (recvEvent == WSA_INVALID_EVENT) {
234         TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
235         return;
236     }
237     WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
238     while (!isExit_) {
239         int32_t index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
240         if (index == WSA_WAIT_FAILED) {
241             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
242             break;
243         } else if (index == WSA_WAIT_TIMEOUT) {
244             if (reqST.stat != RequstParseStat::INIT) {
245                 ProcessRequest(client, reqST);
246                 reqST.stat = RequstParseStat::INIT;
247                 recvPos = 0;
248                 recvLen = recvBuf.size();
249             }
250             continue;
251         }
252 
253         WSANETWORKEVENTS event;
254         WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
255         if (event.lNetworkEvents & FD_READ) {
256             if (event.iErrorCode[FD_READ_BIT] != 0) {
257                 continue;
258             }
259             if (!ProcessAndParseReq(recvPos, recvLen, recvBuf, reqST, client)) {
260                 break;
261             }
262         } else if (event.lNetworkEvents & FD_CLOSE) {
263             TS_LOGI("client close socket(%d)", client.GetFd());
264             break;
265         }
266     }
267     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
268 
269     client.Close();
270 }
271 #else
ProcessClient(HttpSocket & client)272 void HttpServer::ProcessClient(HttpSocket& client)
273 {
274     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
275     size_t recvLen = recvBuf.size();
276     size_t recvPos = 0;
277     RequestST reqST;
278 
279     struct pollfd fd = {client.GetFd(), POLLIN, 0};
280     while (!isExit_) {
281         int32_t pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
282         if (pollRet < 0) {
283             TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
284             break;
285         }
286         if (pollRet == 0) {
287             if (reqST.stat != RequstParseStat::INIT) {
288                 ProcessRequest(client, reqST);
289                 reqST.stat = RequstParseStat::INIT;
290                 recvPos = 0;
291                 recvLen = recvBuf.size();
292             }
293             continue;
294         }
295         if (!ProcessAndParseReq(recvPos, recvLen, recvBuf, reqST, client)) {
296             break;
297         }
298     }
299     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
300 
301     client.Close();
302     TS_LOGI("thread exit");
303 }
304 #endif
305 
ProcessRequest(HttpSocket & client,RequestST & request)306 void HttpServer::ProcessRequest(HttpSocket& client, RequestST& request)
307 {
308     if (request.stat == RequstParseStat::RECVING) {
309         TS_LOGE("http request data missing, client %d\n", client.GetFd());
310         HttpResponse(client, "408 Request Time-out\r\n");
311         return;
312     } else if (request.stat != RequstParseStat::OK) {
313         TS_LOGE("bad http request, client %d\n", client.GetFd());
314         HttpResponse(client, "400 Bad Request\r\n");
315         return;
316     }
317     if (request.method == "OPTIONS") {
318         HttpResponse(client,
319                      "204 No Content\r\n"
320                      "Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n"
321                      "Access-Control-Allow-Headers: *\r\n"
322                      "Access-Control-Max-Age: 86400\r\n");
323         return;
324     } else if (request.method != "POST" && request.method != "GET") {
325         TS_LOGE("method(%s) not allowed, client %d", request.method.c_str(), client.GetFd());
326         HttpResponse(client, "405 Method Not Allowed\r\n");
327         return;
328     }
329     auto it = rpcFunctions_.find(request.uri);
330     if (it == rpcFunctions_.end()) {
331         TS_LOGE("http uri(%s) not found, client %d", request.uri.c_str(), client.GetFd());
332         HttpResponse(client, "404 Not Found\r\n");
333         return;
334     }
335     HttpResponse(client, "200 OK\r\n", true);
336     auto resultCallback = [&client](const std::string& result, int32_t) {
337         std::stringstream chunkLenbuff;
338         chunkLenbuff << std::hex << result.size() << "\r\n";
339         if (!client.Send(chunkLenbuff.str().data(), chunkLenbuff.str().size())) {
340             TS_LOGE("send client socket(%d) error", client.GetFd());
341             return;
342         }
343         if (!client.Send(result.data(), result.size())) {
344             TS_LOGE("send client socket(%d) error", client.GetFd());
345             return;
346         }
347         if (!client.Send("\r\n", strlen("\r\n"))) {
348             TS_LOGE("send client socket(%d) error", client.GetFd());
349             return;
350         }
351     };
352     it->second(request.body, request.bodyLen, resultCallback);
353     if (!client.Send("0\r\n\r\n", strlen("0\r\n\r\n"))) { // chunk tail
354         TS_LOGE("send client socket(%d) error", client.GetFd());
355     }
356 }
357 
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)358 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
359 {
360     std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
361     size_t bodyPos = reqStr.find("\r\n\r\n");
362     if (bodyPos == 0) {
363         len = 0;
364         httpReq.stat = RequstParseStat::BAD;
365         return;
366     } else if (bodyPos == std::string_view::npos) {
367         httpReq.stat = RequstParseStat::RECVING;
368         return;
369     }
370     std::string_view header = reqStr.substr(0, bodyPos);
371     bodyPos += strlen("\r\n\r\n");
372     httpReq.bodyLen = reqStr.size() - bodyPos;
373 
374     std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
375     // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
376     std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
377     const size_t indexHttpMethod = 0;
378     const size_t indexHttpUri = 1;
379     const size_t indexHttpVersion = 2;
380     const size_t countRequestItems = 3;
381     if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
382         len = 0;
383         httpReq.stat = RequstParseStat::BAD;
384         return;
385     }
386     httpReq.method = requestItems[indexHttpMethod];
387     httpReq.uri = requestItems[indexHttpUri];
388 
389     for (size_t i = 1; i < headerlines.size(); i++) {
390         size_t tagPos = headerlines[i].find(":");
391         if (tagPos == std::string_view::npos) {
392             len = 0;
393             httpReq.stat = RequstParseStat::BAD;
394             return;
395         }
396         std::string_view tag = headerlines[i].substr(0, tagPos);
397         if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
398             std::string value(headerlines[i].data() + tagPos + strlen(":"),
399                               headerlines[i].size() - tagPos - strlen(":"));
400             size_t conterntLen = atoi(value.c_str());
401             if (conterntLen > httpReq.bodyLen) {
402                 httpReq.stat = RequstParseStat::RECVING;
403                 return;
404             } else if (conterntLen < httpReq.bodyLen) {
405                 httpReq.bodyLen = conterntLen;
406             }
407         }
408     }
409 
410     if (httpReq.bodyLen > 0) {
411         httpReq.body = (requst + bodyPos);
412     }
413     httpReq.stat = RequstParseStat::OK;
414     len -= (bodyPos + httpReq.bodyLen);
415     return;
416 }
417 
HttpResponse(HttpSocket & client,const std::string & status,bool hasBody)418 void HttpServer::HttpResponse(HttpSocket& client, const std::string& status, bool hasBody)
419 {
420     std::string res;
421     const size_t maxLenResponse = 1024;
422     res.reserve(maxLenResponse);
423     res += "HTTP/1.1 ";
424     res += status;
425 
426     res += "Connection: Keep-Alive\r\n";
427     if (hasBody) {
428         res += "Content-Type: application/json\r\n";
429         res += "Transfer-Encoding: chunked\r\n";
430     }
431     res += "\r\n";
432     if (!client.Send(res.data(), res.size())) {
433         TS_LOGE("send client socket(%d) error", client.GetFd());
434     }
435 }
436 
StringSplit(std::string_view source,std::string_view split)437 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
438 {
439     std::vector<std::string_view> result;
440     if (!split.empty()) {
441         size_t pos = 0;
442         while ((pos = source.find(split)) != std::string_view::npos) {
443             // split
444             std::string_view token = source.substr(0, pos);
445             if (!token.empty()) {
446                 result.push_back(token);
447             }
448             source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
449         }
450     }
451     // add last token
452     if (!source.empty()) {
453         result.push_back(source);
454     }
455     return result;
456 }
457 } // namespace TraceStreamer
458 } // namespace SysTuning
459