/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "perfetto/base/build_config.h" #if PERFETTO_BUILDFLAG(PERFETTO_TP_HTTPD) #include "src/trace_processor/rpc/httpd.h" #include #include #include "perfetto/ext/base/paged_memory.h" #include "perfetto/ext/base/string_utils.h" #include "perfetto/ext/base/string_view.h" #include "perfetto/ext/base/unix_socket.h" #include "perfetto/ext/base/unix_task_runner.h" #include "perfetto/protozero/scattered_heap_buffer.h" #include "perfetto/trace_processor/trace_processor.h" #include "src/trace_processor/rpc/rpc.h" #include "protos/perfetto/trace_processor/trace_processor.pbzero.h" namespace perfetto { namespace trace_processor { namespace { constexpr char kBindPort[] = "9001"; constexpr size_t kOmitContentLength = static_cast(-1); // 32 MiB payload + 128K for HTTP headers. constexpr size_t kMaxRequestSize = (32 * 1024 + 128) * 1024; // Owns the socket and data for one HTTP client connection. struct Client { Client(std::unique_ptr s) : sock(std::move(s)), rxbuf(base::PagedMemory::Allocate(kMaxRequestSize)) {} size_t rxbuf_avail() { return rxbuf.size() - rxbuf_used; } std::unique_ptr sock; base::PagedMemory rxbuf; size_t rxbuf_used = 0; }; struct HttpRequest { base::StringView method; base::StringView uri; base::StringView origin; base::StringView body; int id = 0; }; class HttpServer : public base::UnixSocket::EventListener { public: explicit HttpServer(std::unique_ptr); ~HttpServer() override; void Run(const char*, const char*); private: size_t ParseOneHttpRequest(Client* client); void HandleRequest(Client*, const HttpRequest&); void OnNewIncomingConnection(base::UnixSocket*, std::unique_ptr) override; void OnConnect(base::UnixSocket* self, bool connected) override; void OnDisconnect(base::UnixSocket* self) override; void OnDataAvailable(base::UnixSocket* self) override; Rpc trace_processor_rpc_; base::UnixTaskRunner task_runner_; std::unique_ptr sock4_; std::unique_ptr sock6_; std::vector clients_; }; void Append(std::vector& buf, const char* str) { buf.insert(buf.end(), str, str + strlen(str)); } void Append(std::vector& buf, const std::string& str) { buf.insert(buf.end(), str.begin(), str.end()); } void HttpReply(base::UnixSocket* sock, const char* http_code, std::initializer_list headers = {}, const uint8_t* content = nullptr, size_t content_length = 0) { std::vector response; response.reserve(4096); Append(response, "HTTP/1.1 "); Append(response, http_code); Append(response, "\r\n"); for (const char* hdr : headers) { Append(response, hdr); Append(response, "\r\n"); } if (content_length != kOmitContentLength) { Append(response, "Content-Length: "); Append(response, std::to_string(content_length)); Append(response, "\r\n"); } Append(response, "\r\n"); // End-of-headers marker. sock->Send(response.data(), response.size()); // Send response headers. if (content_length > 0 && content_length != kOmitContentLength) sock->Send(content, content_length); // Send response payload. } void ShutdownBadRequest(base::UnixSocket* sock, const char* reason) { HttpReply(sock, "500 Bad Request", {}, reinterpret_cast(reason), strlen(reason)); sock->Shutdown(/*notify=*/true); } HttpServer::HttpServer(std::unique_ptr preloaded_instance) : trace_processor_rpc_(std::move(preloaded_instance)) {} HttpServer::~HttpServer() = default; void HttpServer::Run(const char* kBindAddr4, const char* kBindAddr6) { PERFETTO_ILOG("[HTTP] Starting RPC server on %s and %s", kBindAddr4, kBindAddr6); sock4_ = base::UnixSocket::Listen(kBindAddr4, this, &task_runner_, base::SockFamily::kInet, base::SockType::kStream); bool ipv4_listening = sock4_ && sock4_->is_listening(); if (!ipv4_listening) { PERFETTO_ILOG("Failed to listen on IPv4 socket"); } sock6_ = base::UnixSocket::Listen(kBindAddr6, this, &task_runner_, base::SockFamily::kInet6, base::SockType::kStream); bool ipv6_listening = sock6_ && sock6_->is_listening(); if (!ipv6_listening) { PERFETTO_ILOG("Failed to listen on IPv6 socket"); } PERFETTO_CHECK(ipv4_listening || ipv6_listening); task_runner_.Run(); } void HttpServer::OnNewIncomingConnection( base::UnixSocket*, std::unique_ptr sock) { PERFETTO_LOG("[HTTP] New connection"); clients_.emplace_back(std::move(sock)); } void HttpServer::OnConnect(base::UnixSocket*, bool) {} void HttpServer::OnDisconnect(base::UnixSocket* sock) { PERFETTO_LOG("[HTTP] Client disconnected"); for (auto it = clients_.begin(); it != clients_.end(); ++it) { if (it->sock.get() == sock) { clients_.erase(it); return; } } PERFETTO_DFATAL("[HTTP] untracked client in OnDisconnect()"); } void HttpServer::OnDataAvailable(base::UnixSocket* sock) { Client* client = nullptr; for (auto it = clients_.begin(); it != clients_.end() && !client; ++it) client = (it->sock.get() == sock) ? &*it : nullptr; PERFETTO_CHECK(client); char* rxbuf = reinterpret_cast(client->rxbuf.Get()); for (;;) { size_t avail = client->rxbuf_avail(); PERFETTO_CHECK(avail <= kMaxRequestSize); if (avail == 0) return ShutdownBadRequest(sock, "Request body too big"); size_t rsize = sock->Receive(&rxbuf[client->rxbuf_used], avail); client->rxbuf_used += rsize; if (rsize == 0 || client->rxbuf_avail() == 0) break; } // At this point |rxbuf| can contain a partial HTTP request, a full one or // more (in case of HTTP Keepalive pipelining). for (;;) { size_t bytes_consumed = ParseOneHttpRequest(client); if (bytes_consumed == 0) break; memmove(rxbuf, &rxbuf[bytes_consumed], client->rxbuf_used - bytes_consumed); client->rxbuf_used -= bytes_consumed; } } // Parses the HTTP request and invokes HandleRequest(). It returns the size of // the HTTP header + body that has been processed or 0 if there isn't enough // data for a full HTTP request in the buffer. size_t HttpServer::ParseOneHttpRequest(Client* client) { auto* rxbuf = reinterpret_cast(client->rxbuf.Get()); base::StringView buf_view(rxbuf, client->rxbuf_used); size_t pos = 0; size_t body_offset = 0; size_t body_size = 0; bool has_parsed_first_line = false; HttpRequest http_req; // This loop parses the HTTP request headers and sets the |body_offset|. for (;;) { size_t next = buf_view.find("\r\n", pos); size_t col; if (next == std::string::npos) break; if (!has_parsed_first_line) { // Parse the "GET /xxx HTTP/1.1" line. has_parsed_first_line = true; size_t space = buf_view.find(' '); if (space == std::string::npos || space + 2 >= client->rxbuf_used) { ShutdownBadRequest(client->sock.get(), "Malformed HTTP request"); return 0; } http_req.method = buf_view.substr(0, space); size_t uri_size = buf_view.find(' ', space + 1) - space - 1; http_req.uri = buf_view.substr(space + 1, uri_size); } else if (next == pos) { // The CR-LF marker that separates headers from body. body_offset = next + 2; break; } else if ((col = buf_view.find(':', pos)) < next) { // Parse HTTP headers. They look like: "Content-Length: 1234". auto hdr_name = buf_view.substr(pos, col - pos); auto hdr_value = buf_view.substr(col + 2, next - col - 2); if (hdr_name.CaseInsensitiveEq("content-length")) { body_size = static_cast(atoi(hdr_value.ToStdString().c_str())); } else if (hdr_name.CaseInsensitiveEq("origin")) { http_req.origin = hdr_value; } else if (hdr_name.CaseInsensitiveEq("x-seq-id")) { http_req.id = atoi(hdr_value.ToStdString().c_str()); } } pos = next + 2; } // If we have a full header but not yet the full body, return and try again // next time we receive some more data. size_t http_req_size = body_offset + body_size; if (!body_offset || client->rxbuf_used < http_req_size) return 0; http_req.body = base::StringView(&rxbuf[body_offset], body_size); HandleRequest(client, http_req); return http_req_size; } void HttpServer::HandleRequest(Client* client, const HttpRequest& req) { static int last_req_id = 0; if (req.id) { if (last_req_id && req.id != last_req_id + 1 && req.id != 1) PERFETTO_ELOG("HTTP Request out of order"); last_req_id = req.id; } PERFETTO_LOG("[HTTP] %04d %s %s (body: %zu bytes)", req.id, req.method.ToStdString().c_str(), req.uri.ToStdString().c_str(), req.body.size()); std::string allow_origin_hdr = "Access-Control-Allow-Origin: " + req.origin.ToStdString(); // This is the default. Overridden by the /query handler for chunked replies. char transfer_encoding_hdr[255] = "Transfer-Encoding: identity"; std::initializer_list headers = { "Connection: Keep-Alive", // "Cache-Control: no-cache", // "Keep-Alive: timeout=5, max=1000", // "Content-Type: application/x-protobuf", // transfer_encoding_hdr, // allow_origin_hdr.c_str()}; if (req.method == "OPTIONS") { // CORS headers. return HttpReply(client->sock.get(), "204 No Content", { "Access-Control-Allow-Methods: POST, GET, OPTIONS", "Access-Control-Allow-Headers: *", "Access-Control-Max-Age: 86400", allow_origin_hdr.c_str(), }); } if (req.uri == "/parse") { trace_processor_rpc_.Parse( reinterpret_cast(req.body.data()), req.body.size()); return HttpReply(client->sock.get(), "200 OK", headers); } if (req.uri == "/notify_eof") { trace_processor_rpc_.NotifyEndOfFile(); return HttpReply(client->sock.get(), "200 OK", headers); } if (req.uri == "/restore_initial_tables") { trace_processor_rpc_.RestoreInitialTables(); return HttpReply(client->sock.get(), "200 OK", headers); } // New endpoint, returns data in batches using chunked transfer encoding. // The batch size is determined by |cells_per_batch_| and // |batch_split_threshold_| in query_result_serializer.h. // This is temporary, it will be switched to WebSockets soon. if (req.uri == "/query") { std::vector response; // Start the chunked reply. strncpy(transfer_encoding_hdr, "Transfer-Encoding: chunked", sizeof(transfer_encoding_hdr)); base::UnixSocket* cli_sock = client->sock.get(); HttpReply(cli_sock, "200 OK", headers, nullptr, kOmitContentLength); // |on_result_chunk| will be called nested within the same callstack of the // rpc.Query() call. No further calls will be made once Query() returns. auto on_result_chunk = [&](const uint8_t* buf, size_t len, bool has_more) { PERFETTO_DLOG("Sending response chunk, len=%zu eof=%d", len, !has_more); char chunk_hdr[32]; auto hdr_len = static_cast(sprintf(chunk_hdr, "%zx\r\n", len)); cli_sock->Send(chunk_hdr, hdr_len); cli_sock->Send(buf, len); cli_sock->Send("\r\n", 2); if (!has_more) { hdr_len = static_cast(sprintf(chunk_hdr, "0\r\n\r\n")); cli_sock->Send(chunk_hdr, hdr_len); } }; trace_processor_rpc_.Query( reinterpret_cast(req.body.data()), req.body.size(), on_result_chunk); return; } // Legacy endpoint. // Returns a columnar-oriented one-shot result. Very inefficient for large // result sets. Very inefficient in general too. if (req.uri == "/raw_query") { std::vector response = trace_processor_rpc_.RawQuery( reinterpret_cast(req.body.data()), req.body.size()); return HttpReply(client->sock.get(), "200 OK", headers, response.data(), response.size()); } if (req.uri == "/status") { protozero::HeapBuffered res; res->set_loaded_trace_name( trace_processor_rpc_.GetCurrentTraceName().c_str()); std::vector buf = res.SerializeAsArray(); return HttpReply(client->sock.get(), "200 OK", headers, buf.data(), buf.size()); } if (req.uri == "/compute_metric") { std::vector res = trace_processor_rpc_.ComputeMetric( reinterpret_cast(req.body.data()), req.body.size()); return HttpReply(client->sock.get(), "200 OK", headers, res.data(), res.size()); } if (req.uri == "/get_metric_descriptors") { std::vector res = trace_processor_rpc_.GetMetricDescriptors( reinterpret_cast(req.body.data()), req.body.size()); return HttpReply(client->sock.get(), "200 OK", headers, res.data(), res.size()); } if (req.uri == "/enable_metatrace") { trace_processor_rpc_.EnableMetatrace(); return HttpReply(client->sock.get(), "200 OK", headers); } if (req.uri == "/disable_and_read_metatrace") { std::vector res = trace_processor_rpc_.DisableAndReadMetatrace(); return HttpReply(client->sock.get(), "200 OK", headers, res.data(), res.size()); } return HttpReply(client->sock.get(), "404 Not Found", headers); } } // namespace void RunHttpRPCServer(std::unique_ptr preloaded_instance, std::string port_number) { HttpServer srv(std::move(preloaded_instance)); std::string port = port_number.empty() ? kBindPort : port_number; std::string ipv4_addr = "" + port; std::string ipv6_addr = "[::1]:" + port; srv.Run(ipv4_addr.c_str(), ipv6_addr.c_str()); } } // namespace trace_processor } // namespace perfetto #endif // PERFETTO_TP_HTTPD