• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/build_config.h"
18 
19 #if PERFETTO_BUILDFLAG(PERFETTO_TP_HTTPD)
20 
21 #include "src/trace_processor/rpc/httpd.h"
22 
23 #include <map>
24 #include <string>
25 
26 #include "perfetto/ext/base/paged_memory.h"
27 #include "perfetto/ext/base/string_utils.h"
28 #include "perfetto/ext/base/string_view.h"
29 #include "perfetto/ext/base/unix_socket.h"
30 #include "perfetto/ext/base/unix_task_runner.h"
31 #include "perfetto/protozero/scattered_heap_buffer.h"
32 #include "perfetto/trace_processor/trace_processor.h"
33 #include "src/trace_processor/rpc/rpc.h"
34 
35 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
36 
37 namespace perfetto {
38 namespace trace_processor {
39 
40 namespace {
41 
42 constexpr char kBindPort[] = "9001";
43 constexpr size_t kOmitContentLength = static_cast<size_t>(-1);
44 
45 // 32 MiB payload + 128K for HTTP headers.
46 constexpr size_t kMaxRequestSize = (32 * 1024 + 128) * 1024;
47 
48 // Owns the socket and data for one HTTP client connection.
49 struct Client {
Clientperfetto::trace_processor::__anonad5fa8100111::Client50   Client(std::unique_ptr<base::UnixSocket> s)
51       : sock(std::move(s)),
52         rxbuf(base::PagedMemory::Allocate(kMaxRequestSize)) {}
rxbuf_availperfetto::trace_processor::__anonad5fa8100111::Client53   size_t rxbuf_avail() { return rxbuf.size() - rxbuf_used; }
54 
55   std::unique_ptr<base::UnixSocket> sock;
56   base::PagedMemory rxbuf;
57   size_t rxbuf_used = 0;
58 };
59 
60 struct HttpRequest {
61   base::StringView method;
62   base::StringView uri;
63   base::StringView origin;
64   base::StringView body;
65   int id = 0;
66 };
67 
68 class HttpServer : public base::UnixSocket::EventListener {
69  public:
70   explicit HttpServer(std::unique_ptr<TraceProcessor>);
71   ~HttpServer() override;
72   void Run(const char*, const char*);
73 
74  private:
75   size_t ParseOneHttpRequest(Client* client);
76   void HandleRequest(Client*, const HttpRequest&);
77 
78   void OnNewIncomingConnection(base::UnixSocket*,
79                                std::unique_ptr<base::UnixSocket>) override;
80   void OnConnect(base::UnixSocket* self, bool connected) override;
81   void OnDisconnect(base::UnixSocket* self) override;
82   void OnDataAvailable(base::UnixSocket* self) override;
83 
84   Rpc trace_processor_rpc_;
85   base::UnixTaskRunner task_runner_;
86   std::unique_ptr<base::UnixSocket> sock4_;
87   std::unique_ptr<base::UnixSocket> sock6_;
88   std::vector<Client> clients_;
89 };
90 
Append(std::vector<char> & buf,const char * str)91 void Append(std::vector<char>& buf, const char* str) {
92   buf.insert(buf.end(), str, str + strlen(str));
93 }
94 
Append(std::vector<char> & buf,const std::string & str)95 void Append(std::vector<char>& buf, const std::string& str) {
96   buf.insert(buf.end(), str.begin(), str.end());
97 }
98 
HttpReply(base::UnixSocket * sock,const char * http_code,std::initializer_list<const char * > headers={},const uint8_t * content=nullptr,size_t content_length=0)99 void HttpReply(base::UnixSocket* sock,
100                const char* http_code,
101                std::initializer_list<const char*> headers = {},
102                const uint8_t* content = nullptr,
103                size_t content_length = 0) {
104   std::vector<char> response;
105   response.reserve(4096);
106   Append(response, "HTTP/1.1 ");
107   Append(response, http_code);
108   Append(response, "\r\n");
109   for (const char* hdr : headers) {
110     Append(response, hdr);
111     Append(response, "\r\n");
112   }
113   if (content_length != kOmitContentLength) {
114     Append(response, "Content-Length: ");
115     Append(response, std::to_string(content_length));
116     Append(response, "\r\n");
117   }
118   Append(response, "\r\n");                      // End-of-headers marker.
119   sock->Send(response.data(), response.size());  // Send response headers.
120   if (content_length > 0 && content_length != kOmitContentLength)
121     sock->Send(content, content_length);  // Send response payload.
122 }
123 
ShutdownBadRequest(base::UnixSocket * sock,const char * reason)124 void ShutdownBadRequest(base::UnixSocket* sock, const char* reason) {
125   HttpReply(sock, "500 Bad Request", {},
126             reinterpret_cast<const uint8_t*>(reason), strlen(reason));
127   sock->Shutdown(/*notify=*/true);
128 }
129 
HttpServer(std::unique_ptr<TraceProcessor> preloaded_instance)130 HttpServer::HttpServer(std::unique_ptr<TraceProcessor> preloaded_instance)
131     : trace_processor_rpc_(std::move(preloaded_instance)) {}
132 HttpServer::~HttpServer() = default;
133 
Run(const char * kBindAddr4,const char * kBindAddr6)134 void HttpServer::Run(const char* kBindAddr4, const char* kBindAddr6) {
135   PERFETTO_ILOG("[HTTP] Starting RPC server on %s and %s", kBindAddr4,
136                 kBindAddr6);
137 
138   sock4_ = base::UnixSocket::Listen(kBindAddr4, this, &task_runner_,
139                                     base::SockFamily::kInet,
140                                     base::SockType::kStream);
141   bool ipv4_listening = sock4_ && sock4_->is_listening();
142   if (!ipv4_listening) {
143     PERFETTO_ILOG("Failed to listen on IPv4 socket");
144   }
145 
146   sock6_ = base::UnixSocket::Listen(kBindAddr6, this, &task_runner_,
147                                     base::SockFamily::kInet6,
148                                     base::SockType::kStream);
149   bool ipv6_listening = sock6_ && sock6_->is_listening();
150   if (!ipv6_listening) {
151     PERFETTO_ILOG("Failed to listen on IPv6 socket");
152   }
153 
154   PERFETTO_CHECK(ipv4_listening || ipv6_listening);
155 
156   task_runner_.Run();
157 }
158 
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> sock)159 void HttpServer::OnNewIncomingConnection(
160     base::UnixSocket*,
161     std::unique_ptr<base::UnixSocket> sock) {
162   PERFETTO_LOG("[HTTP] New connection");
163   clients_.emplace_back(std::move(sock));
164 }
165 
OnConnect(base::UnixSocket *,bool)166 void HttpServer::OnConnect(base::UnixSocket*, bool) {}
167 
OnDisconnect(base::UnixSocket * sock)168 void HttpServer::OnDisconnect(base::UnixSocket* sock) {
169   PERFETTO_LOG("[HTTP] Client disconnected");
170   for (auto it = clients_.begin(); it != clients_.end(); ++it) {
171     if (it->sock.get() == sock) {
172       clients_.erase(it);
173       return;
174     }
175   }
176   PERFETTO_DFATAL("[HTTP] untracked client in OnDisconnect()");
177 }
178 
OnDataAvailable(base::UnixSocket * sock)179 void HttpServer::OnDataAvailable(base::UnixSocket* sock) {
180   Client* client = nullptr;
181   for (auto it = clients_.begin(); it != clients_.end() && !client; ++it)
182     client = (it->sock.get() == sock) ? &*it : nullptr;
183   PERFETTO_CHECK(client);
184 
185   char* rxbuf = reinterpret_cast<char*>(client->rxbuf.Get());
186   for (;;) {
187     size_t avail = client->rxbuf_avail();
188     PERFETTO_CHECK(avail <= kMaxRequestSize);
189     if (avail == 0)
190       return ShutdownBadRequest(sock, "Request body too big");
191     size_t rsize = sock->Receive(&rxbuf[client->rxbuf_used], avail);
192     client->rxbuf_used += rsize;
193     if (rsize == 0 || client->rxbuf_avail() == 0)
194       break;
195   }
196 
197   // At this point |rxbuf| can contain a partial HTTP request, a full one or
198   // more (in case of HTTP Keepalive pipelining).
199   for (;;) {
200     size_t bytes_consumed = ParseOneHttpRequest(client);
201     if (bytes_consumed == 0)
202       break;
203     memmove(rxbuf, &rxbuf[bytes_consumed], client->rxbuf_used - bytes_consumed);
204     client->rxbuf_used -= bytes_consumed;
205   }
206 }
207 
208 // Parses the HTTP request and invokes HandleRequest(). It returns the size of
209 // the HTTP header + body that has been processed or 0 if there isn't enough
210 // data for a full HTTP request in the buffer.
ParseOneHttpRequest(Client * client)211 size_t HttpServer::ParseOneHttpRequest(Client* client) {
212   auto* rxbuf = reinterpret_cast<char*>(client->rxbuf.Get());
213   base::StringView buf_view(rxbuf, client->rxbuf_used);
214   size_t pos = 0;
215   size_t body_offset = 0;
216   size_t body_size = 0;
217   bool has_parsed_first_line = false;
218   HttpRequest http_req;
219 
220   // This loop parses the HTTP request headers and sets the |body_offset|.
221   for (;;) {
222     size_t next = buf_view.find("\r\n", pos);
223     size_t col;
224     if (next == std::string::npos)
225       break;
226 
227     if (!has_parsed_first_line) {
228       // Parse the "GET /xxx HTTP/1.1" line.
229       has_parsed_first_line = true;
230       size_t space = buf_view.find(' ');
231       if (space == std::string::npos || space + 2 >= client->rxbuf_used) {
232         ShutdownBadRequest(client->sock.get(), "Malformed HTTP request");
233         return 0;
234       }
235       http_req.method = buf_view.substr(0, space);
236       size_t uri_size = buf_view.find(' ', space + 1) - space - 1;
237       http_req.uri = buf_view.substr(space + 1, uri_size);
238     } else if (next == pos) {
239       // The CR-LF marker that separates headers from body.
240       body_offset = next + 2;
241       break;
242     } else if ((col = buf_view.find(':', pos)) < next) {
243       // Parse HTTP headers. They look like: "Content-Length: 1234".
244       auto hdr_name = buf_view.substr(pos, col - pos);
245       auto hdr_value = buf_view.substr(col + 2, next - col - 2);
246       if (hdr_name.CaseInsensitiveEq("content-length")) {
247         body_size = static_cast<size_t>(atoi(hdr_value.ToStdString().c_str()));
248       } else if (hdr_name.CaseInsensitiveEq("origin")) {
249         http_req.origin = hdr_value;
250       } else if (hdr_name.CaseInsensitiveEq("x-seq-id")) {
251         http_req.id = atoi(hdr_value.ToStdString().c_str());
252       }
253     }
254     pos = next + 2;
255   }
256 
257   // If we have a full header but not yet the full body, return and try again
258   // next time we receive some more data.
259   size_t http_req_size = body_offset + body_size;
260   if (!body_offset || client->rxbuf_used < http_req_size)
261     return 0;
262 
263   http_req.body = base::StringView(&rxbuf[body_offset], body_size);
264   HandleRequest(client, http_req);
265   return http_req_size;
266 }
267 
HandleRequest(Client * client,const HttpRequest & req)268 void HttpServer::HandleRequest(Client* client, const HttpRequest& req) {
269   static int last_req_id = 0;
270   if (req.id) {
271     if (last_req_id && req.id != last_req_id + 1 && req.id != 1)
272       PERFETTO_ELOG("HTTP Request out of order");
273     last_req_id = req.id;
274   }
275 
276   PERFETTO_LOG("[HTTP] %04d %s %s (body: %zu bytes)", req.id,
277                req.method.ToStdString().c_str(), req.uri.ToStdString().c_str(),
278                req.body.size());
279   std::string allow_origin_hdr =
280       "Access-Control-Allow-Origin: " + req.origin.ToStdString();
281 
282   // This is the default. Overridden by the /query handler for chunked replies.
283   char transfer_encoding_hdr[255] = "Transfer-Encoding: identity";
284   std::initializer_list<const char*> headers = {
285       "Connection: Keep-Alive",                //
286       "Cache-Control: no-cache",               //
287       "Keep-Alive: timeout=5, max=1000",       //
288       "Content-Type: application/x-protobuf",  //
289       transfer_encoding_hdr,                   //
290       allow_origin_hdr.c_str()};
291 
292   if (req.method == "OPTIONS") {
293     // CORS headers.
294     return HttpReply(client->sock.get(), "204 No Content",
295                      {
296                          "Access-Control-Allow-Methods: POST, GET, OPTIONS",
297                          "Access-Control-Allow-Headers: *",
298                          "Access-Control-Max-Age: 86400",
299                          allow_origin_hdr.c_str(),
300                      });
301   }
302 
303   if (req.uri == "/parse") {
304     trace_processor_rpc_.Parse(
305         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
306     return HttpReply(client->sock.get(), "200 OK", headers);
307   }
308 
309   if (req.uri == "/notify_eof") {
310     trace_processor_rpc_.NotifyEndOfFile();
311     return HttpReply(client->sock.get(), "200 OK", headers);
312   }
313 
314   if (req.uri == "/restore_initial_tables") {
315     trace_processor_rpc_.RestoreInitialTables();
316     return HttpReply(client->sock.get(), "200 OK", headers);
317   }
318 
319   // New endpoint, returns data in batches using chunked transfer encoding.
320   // The batch size is determined by |cells_per_batch_| and
321   // |batch_split_threshold_| in query_result_serializer.h.
322   // This is temporary, it will be switched to WebSockets soon.
323   if (req.uri == "/query") {
324     std::vector<uint8_t> response;
325 
326     // Start the chunked reply.
327     strncpy(transfer_encoding_hdr, "Transfer-Encoding: chunked",
328             sizeof(transfer_encoding_hdr));
329     base::UnixSocket* cli_sock = client->sock.get();
330     HttpReply(cli_sock, "200 OK", headers, nullptr, kOmitContentLength);
331 
332     // |on_result_chunk| will be called nested within the same callstack of the
333     // rpc.Query() call. No further calls will be made once Query() returns.
334     auto on_result_chunk = [&](const uint8_t* buf, size_t len, bool has_more) {
335       PERFETTO_DLOG("Sending response chunk, len=%zu eof=%d", len, !has_more);
336       char chunk_hdr[32];
337       auto hdr_len = static_cast<size_t>(sprintf(chunk_hdr, "%zx\r\n", len));
338       cli_sock->Send(chunk_hdr, hdr_len);
339       cli_sock->Send(buf, len);
340       cli_sock->Send("\r\n", 2);
341       if (!has_more) {
342         hdr_len = static_cast<size_t>(sprintf(chunk_hdr, "0\r\n\r\n"));
343         cli_sock->Send(chunk_hdr, hdr_len);
344       }
345     };
346     trace_processor_rpc_.Query(
347         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size(),
348         on_result_chunk);
349     return;
350   }
351 
352   // Legacy endpoint.
353   // Returns a columnar-oriented one-shot result. Very inefficient for large
354   // result sets. Very inefficient in general too.
355   if (req.uri == "/raw_query") {
356     std::vector<uint8_t> response = trace_processor_rpc_.RawQuery(
357         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
358     return HttpReply(client->sock.get(), "200 OK", headers, response.data(),
359                      response.size());
360   }
361 
362   if (req.uri == "/status") {
363     protozero::HeapBuffered<protos::pbzero::StatusResult> res;
364     res->set_loaded_trace_name(
365         trace_processor_rpc_.GetCurrentTraceName().c_str());
366     std::vector<uint8_t> buf = res.SerializeAsArray();
367     return HttpReply(client->sock.get(), "200 OK", headers, buf.data(),
368                      buf.size());
369   }
370 
371   if (req.uri == "/compute_metric") {
372     std::vector<uint8_t> res = trace_processor_rpc_.ComputeMetric(
373         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
374     return HttpReply(client->sock.get(), "200 OK", headers, res.data(),
375                      res.size());
376   }
377 
378   if (req.uri == "/get_metric_descriptors") {
379     std::vector<uint8_t> res = trace_processor_rpc_.GetMetricDescriptors(
380         reinterpret_cast<const uint8_t*>(req.body.data()), req.body.size());
381     return HttpReply(client->sock.get(), "200 OK", headers, res.data(),
382                      res.size());
383   }
384 
385   if (req.uri == "/enable_metatrace") {
386     trace_processor_rpc_.EnableMetatrace();
387     return HttpReply(client->sock.get(), "200 OK", headers);
388   }
389 
390   if (req.uri == "/disable_and_read_metatrace") {
391     std::vector<uint8_t> res = trace_processor_rpc_.DisableAndReadMetatrace();
392     return HttpReply(client->sock.get(), "200 OK", headers, res.data(),
393                      res.size());
394   }
395 
396   return HttpReply(client->sock.get(), "404 Not Found", headers);
397 }
398 
399 }  // namespace
400 
RunHttpRPCServer(std::unique_ptr<TraceProcessor> preloaded_instance,std::string port_number)401 void RunHttpRPCServer(std::unique_ptr<TraceProcessor> preloaded_instance,
402                       std::string port_number) {
403   HttpServer srv(std::move(preloaded_instance));
404   std::string port = port_number.empty() ? kBindPort : port_number;
405   std::string ipv4_addr = "127.0.0.1:" + port;
406   std::string ipv6_addr = "[::1]:" + port;
407   srv.Run(ipv4_addr.c_str(), ipv6_addr.c_str());
408 }
409 
410 }  // namespace trace_processor
411 }  // namespace perfetto
412 
413 #endif  // PERFETTO_TP_HTTPD
414