• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "http_server.h"
17 #include <chrono>
18 #include <csignal>
19 #include <cstdint>
20 #include <thread>
21 #ifdef _WIN32
22 #include <WinSock2.h>
23 #else
24 #include <poll.h>
25 #include <sys/socket.h>
26 #endif
27 #include "log.h"
28 #include "string_to_numerical.h"
29 namespace SysTuning {
30 namespace TraceStreamer {
RegisterRpcFunction(RpcServer * rpc)31 void HttpServer::RegisterRpcFunction(RpcServer* rpc)
32 {
33     rpcFunctions_.clear();
34 
35     using std::placeholders::_1;
36     using std::placeholders::_2;
37     using std::placeholders::_3;
38 
39     auto parsedata = std::bind(&RpcServer::ParseData, rpc, _1, _2, _3);
40     rpcFunctions_["/parsedata"] = parsedata;
41 
42     auto parsedataover = std::bind(&RpcServer::ParseDataOver, rpc, _1, _2, _3);
43     rpcFunctions_["/parsedataover"] = parsedataover;
44 
45     auto sqlquery = std::bind(&RpcServer::SqlQuery, rpc, _1, _2, _3);
46     rpcFunctions_["/sqlquery"] = sqlquery;
47 
48     auto sqloperate = std::bind(&RpcServer::SqlOperate, rpc, _1, _2, _3);
49     rpcFunctions_["/sqloperate"] = sqloperate;
50 
51     auto reset = std::bind(&RpcServer::Reset, rpc, _1, _2, _3);
52     rpcFunctions_["/reset"] = reset;
53 }
54 
55 #ifdef _WIN32
Run(int32_t port)56 void HttpServer::Run(int32_t port)
57 {
58     WSADATA ws{};
59     if (WSAStartup(MAKEWORD(WS_VERSION_FIRST, WS_VERSION_SEC), &ws) != 0) {
60         return;
61     }
62     if (!CreateSocket(port)) {
63         return;
64     }
65     WSAEVENT events[COUNT_SOCKET];
66     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
67         if ((events[i] = WSACreateEvent()) == WSA_INVALID_EVENT) {
68             TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
69             return;
70         }
71         WSAEventSelect(sockets_[i].GetFd(), events[i], FD_ACCEPT | FD_CLOSE);
72     }
73 
74     while (!isExit_) {
75         ClearDeadClientThread();
76 
77         int32_t index = WSAWaitForMultipleEvents(COUNT_SOCKET, events, false, pollTimeOut_, false);
78         if (index == WSA_WAIT_FAILED) {
79             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
80             break;
81         } else if (index == WSA_WAIT_TIMEOUT) {
82             continue;
83         }
84 
85         index = index - WSA_WAIT_EVENT_0;
86         WSANETWORKEVENTS event;
87         WSAEnumNetworkEvents(sockets_[index].GetFd(), events[index], &event);
88         if (event.lNetworkEvents & FD_ACCEPT) {
89             if (event.iErrorCode[FD_ACCEPT_BIT] != 0) {
90                 continue;
91             }
92 
93             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
94             if (sockets_[index].Accept(client->sock_)) {
95                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
96                 clientThreads_.push_back(std::move(client));
97             } else {
98                 TS_LOGE("http socket accept error");
99                 std::this_thread::sleep_for(std::chrono::seconds(1));
100             }
101         }
102     }
103 
104     for (const auto& it : clientThreads_) {
105         if (it->thread_.joinable()) {
106             it->sock_.Close();
107             it->thread_.join();
108         }
109     }
110     clientThreads_.clear();
111 
112     WSACleanup();
113 }
114 #else
Run(int32_t port)115 void HttpServer::Run(int32_t port)
116 {
117     if (SIG_ERR == signal(SIGPIPE, SIG_IGN)) {
118         return;
119     }
120 
121     if (!CreateSocket(port)) {
122         return;
123     }
124     TS_LOGI("http server running");
125     struct pollfd fds[COUNT_SOCKET];
126     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
127         fds[i] = {sockets_[i].GetFd(), POLLIN, 0};
128     }
129     while (!isExit_) {
130         ClearDeadClientThread();
131         if (poll(fds, sizeof(fds) / sizeof(pollfd), pollTimeOut_) <= 0) {
132             continue; // try again
133         }
134 
135         for (int32_t i = 0; i < 1; i++) {
136             if (fds[i].revents != POLLIN) {
137                 continue;
138             }
139             std::unique_ptr<ClientThread> client = std::make_unique<ClientThread>();
140             if (sockets_[i].Accept(client->sock_)) {
141                 client->thread_ = std::thread(&HttpServer::ProcessClient, this, std::ref(client->sock_));
142                 clientThreads_.push_back(std::move(client));
143             } else {
144                 std::this_thread::sleep_for(std::chrono::seconds(1));
145             }
146         }
147     }
148 
149     for (const auto& it : clientThreads_) {
150         if (it->thread_.joinable()) {
151             it->sock_.Close();
152             it->thread_.join();
153         }
154     }
155     clientThreads_.clear();
156 
157     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
158         sockets_[i].Close();
159     }
160     TS_LOGI("http server exit");
161 }
162 #endif
163 
Exit()164 void HttpServer::Exit()
165 {
166     isExit_ = true;
167     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
168         sockets_[i].Close();
169     }
170 }
171 
CreateSocket(int32_t port)172 bool HttpServer::CreateSocket(int32_t port)
173 {
174     for (int32_t i = 0; i < COUNT_SOCKET; i++) {
175         if (!sockets_[i].CreateSocket(i == 0 ? AF_INET : AF_INET6)) {
176             TS_LOGE("Create http socket error");
177             return false;
178         }
179         if (!sockets_[i].Bind(port)) {
180             TS_LOGE("bind http socket error");
181             return false;
182         }
183         if (!sockets_[i].Listen(SOMAXCONN)) {
184             TS_LOGE("listen http socket error");
185             return false;
186         }
187     }
188 
189     return true;
190 }
191 
ClearDeadClientThread()192 void HttpServer::ClearDeadClientThread()
193 {
194     for (const auto it = clientThreads_.begin(); it != clientThreads_.end();) {
195         if (it->get()->sock_.GetFd() != -1) {
196             it++;
197             continue;
198         }
199         if (it->get()->thread_.joinable()) {
200             it->get()->thread_.join();
201         }
202         it = clientThreads_.erase(it);
203     }
204 }
205 
206 #ifdef _WIN32
ProcessClient(HttpSocket & client)207 void HttpServer::ProcessClient(HttpSocket& client)
208 {
209     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
210     size_t recvLen = recvBuf.size();
211     size_t recvPos = 0;
212     RequestST reqST;
213     WSAEVENT recvEvent = WSACreateEvent();
214     if (recvEvent == WSA_INVALID_EVENT) {
215         TS_LOGE("WSACreateEvent error %d", WSAGetLastError());
216         return;
217     }
218     WSAEventSelect(client.GetFd(), recvEvent, FD_READ | FD_CLOSE);
219     while (!isExit_) {
220         int32_t index = WSAWaitForMultipleEvents(1, &recvEvent, false, pollTimeOut_, false);
221         if (index == WSA_WAIT_FAILED) {
222             TS_LOGE("WSAWaitForMultipleEvents error %d", WSAGetLastError());
223             break;
224         } else if (index == WSA_WAIT_TIMEOUT) {
225             if (reqST.stat != RequstParseStat::INIT) {
226                 reqST.stat = RequstParseStat::INIT;
227                 recvPos = 0;
228                 recvLen = recvBuf.size();
229             }
230             continue;
231         }
232 
233         WSANETWORKEVENTS event;
234         WSAEnumNetworkEvents(client.GetFd(), recvEvent, &event);
235         if (event.lNetworkEvents & FD_READ) {
236             if (event.iErrorCode[FD_READ_BIT] != 0) {
237                 continue;
238             }
239             if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
240                 break;
241             }
242             recvPos += recvLen;
243             ParseRequest(recvBuf.data(), recvPos, reqST);
244             recvLen = recvBuf.size() - recvPos;
245             if (reqST.stat == RequstParseStat::RECVING) {
246                 continue;
247             }
248             reqST.stat = RequstParseStat::INIT;
249         } else if (event.lNetworkEvents & FD_CLOSE) {
250             TS_LOGI("client close socket(%d)", client.GetFd());
251             break;
252         }
253     }
254     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
255 
256     client.Close();
257 }
258 #else
ProcessClient(HttpSocket & client)259 void HttpServer::ProcessClient(HttpSocket& client)
260 {
261     std::vector<uint8_t> recvBuf(MAXLEN_REQUEST);
262     size_t recvLen = recvBuf.size();
263     size_t recvPos = 0;
264     RequestST reqST;
265 
266     struct pollfd fd = {client.GetFd(), POLLIN, 0};
267     while (!isExit_) {
268         int32_t pollRet = poll(&fd, sizeof(fd) / sizeof(pollfd), pollTimeOut_);
269         if (pollRet < 0) {
270             TS_LOGE("poll client socket(%d) error: %d:%s", client.GetFd(), errno, strerror(errno));
271             break;
272         }
273         if (pollRet == 0) {
274             if (reqST.stat != RequstParseStat::INIT) {
275                 reqST.stat = RequstParseStat::INIT;
276                 recvPos = 0;
277                 recvLen = recvBuf.size();
278             }
279             continue;
280         }
281         if (!client.Recv(recvBuf.data() + recvPos, recvLen)) {
282             TS_LOGI("client exit");
283             break;
284         }
285         recvPos += recvLen;
286         ParseRequest(recvBuf.data(), recvPos, reqST);
287         recvLen = recvBuf.size() - recvPos;
288         if (reqST.stat == RequstParseStat::RECVING) {
289             continue;
290         }
291         reqST.stat = RequstParseStat::INIT;
292     }
293     TS_LOGI("recive client thread exit. socket(%d)", client.GetFd());
294 
295     client.Close();
296     TS_LOGI("thread exit");
297 }
298 #endif
299 
ParseRequest(const uint8_t * requst,size_t & len,RequestST & httpReq)300 void HttpServer::ParseRequest(const uint8_t* requst, size_t& len, RequestST& httpReq)
301 {
302     std::string_view reqStr(reinterpret_cast<const char*>(requst), len);
303     size_t bodyPos = reqStr.find("\r\n\r\n");
304     if (bodyPos == 0) {
305         len = 0;
306         httpReq.stat = RequstParseStat::BAD;
307         return;
308     } else if (bodyPos == std::string_view::npos) {
309         httpReq.stat = RequstParseStat::RECVING;
310         return;
311     }
312     std::string_view header = reqStr.substr(0, bodyPos);
313     bodyPos += strlen("\r\n\r\n");
314     httpReq.bodyLen = reqStr.size() - bodyPos;
315 
316     std::vector<std::string_view> headerlines = StringSplit(header, "\r\n");
317     // at least 1 line in headerlines, such as "GET /parsedata HTTP/1.1"
318     std::vector<std::string_view> requestItems = StringSplit(headerlines[0], " ");
319     const size_t indexHttpMethod = 0;
320     const size_t indexHttpUri = 1;
321     const size_t indexHttpVersion = 2;
322     const size_t countRequestItems = 3;
323     if (requestItems.size() != countRequestItems || requestItems[indexHttpVersion] != "HTTP/1.1") {
324         len = 0;
325         httpReq.stat = RequstParseStat::BAD;
326         return;
327     }
328     httpReq.method = requestItems[indexHttpMethod];
329     httpReq.uri = requestItems[indexHttpUri];
330 
331     for (size_t i = 1; i < headerlines.size(); i++) {
332         size_t tagPos = headerlines[i].find(":");
333         if (tagPos == std::string_view::npos) {
334             len = 0;
335             httpReq.stat = RequstParseStat::BAD;
336             return;
337         }
338         std::string_view tag = headerlines[i].substr(0, tagPos);
339         if (strncasecmp(tag.data(), "Content-Length", tag.size()) == 0) {
340             std::string value(headerlines[i].data() + tagPos + strlen(":"),
341                               headerlines[i].size() - tagPos - strlen(":"));
342             size_t conterntLen = atoi(value.c_str());
343             if (conterntLen > httpReq.bodyLen) {
344                 httpReq.stat = RequstParseStat::RECVING;
345                 return;
346             } else if (conterntLen < httpReq.bodyLen) {
347                 httpReq.bodyLen = conterntLen;
348             }
349         }
350     }
351 
352     if (httpReq.bodyLen > 0) {
353         httpReq.body = (requst + bodyPos);
354     }
355     httpReq.stat = RequstParseStat::OK;
356     len -= (bodyPos + httpReq.bodyLen);
357     return;
358 }
359 
StringSplit(std::string_view source,std::string_view split)360 std::vector<std::string_view> HttpServer::StringSplit(std::string_view source, std::string_view split)
361 {
362     std::vector<std::string_view> result;
363     if (!split.empty()) {
364         size_t pos = 0;
365         while ((pos = source.find(split)) != std::string_view::npos) {
366             // split
367             std::string_view token = source.substr(0, pos);
368             if (!token.empty()) {
369                 result.push_back(token);
370             }
371             source = source.substr(pos + split.size(), source.size() - token.size() - split.size());
372         }
373     }
374     // add last token
375     if (!source.empty()) {
376         result.push_back(source);
377     }
378     return result;
379 }
380 } // namespace TraceStreamer
381 } // namespace SysTuning
382