• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/rpc.h"
18 
19 #include <string.h>
20 
21 #include <vector>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/time.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/protozero/scattered_heap_buffer.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "perfetto/trace_processor/trace_processor.h"
30 #include "src/protozero/proto_ring_buffer.h"
31 #include "src/trace_processor/rpc/query_result_serializer.h"
32 #include "src/trace_processor/tp_metatrace.h"
33 
34 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
35 
36 namespace perfetto {
37 namespace trace_processor {
38 
39 namespace {
40 // Writes a "Loading trace ..." update every N bytes.
41 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
42 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
43 using RpcProto = protos::pbzero::TraceProcessorRpc;
44 
45 // Most RPC messages are either very small or a query results.
46 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
47 // avoid extra heap allocations for the nominal case.
48 constexpr auto kSliceSize =
49     QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
50 
51 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
52 // copies by doing direct scattered calls from the fragmented heap buffer onto
53 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
54 // fragmentation anyways). It also takes care of prefixing each message with
55 // the proto preamble and varint size.
56 class Response {
57  public:
58   Response(int64_t seq, int method);
59   Response(const Response&) = delete;
60   Response& operator=(const Response&) = delete;
operator ->()61   RpcProto* operator->() { return msg_; }
62   void Send(Rpc::RpcResponseFunction);
63 
64  private:
65   RpcProto* msg_ = nullptr;
66 
67   // The reason why we use TraceProcessorRpcStream as root message is because
68   // the RPC wire protocol expects each message to be prefixed with a proto
69   // preamble and varint size. This happens to be the same serialization of a
70   // repeated field (this is really the same trick we use between
71   // Trace and TracePacket in trace.proto)
72   protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
73 };
74 
Response(int64_t seq,int method)75 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
76   msg_ = buf_->add_msg();
77   msg_->set_seq(seq);
78   msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
79 }
80 
Send(Rpc::RpcResponseFunction send_fn)81 void Response::Send(Rpc::RpcResponseFunction send_fn) {
82   buf_->Finalize();
83   for (const auto& slice : buf_.GetSlices()) {
84     auto range = slice.GetUsedRange();
85     send_fn(range.begin, static_cast<uint32_t>(range.size()));
86   }
87 }
88 
89 }  // namespace
90 
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)91 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
92     : trace_processor_(std::move(preloaded_instance)) {
93   if (!trace_processor_)
94     ResetTraceProcessorInternal(Config());
95 }
96 
Rpc()97 Rpc::Rpc() : Rpc(nullptr) {}
98 Rpc::~Rpc() = default;
99 
ResetTraceProcessorInternal(const Config & config)100 void Rpc::ResetTraceProcessorInternal(const Config& config) {
101   trace_processor_config_ = config;
102   trace_processor_ = TraceProcessor::CreateInstance(config);
103   bytes_parsed_ = bytes_last_progress_ = 0;
104   t_parse_started_ = base::GetWallTimeNs().count();
105   // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
106   // This is invoked from the same client to clear the current trace state
107   // before loading a new one. The IPC channel is orthogonal to that and the
108   // message numbering continues regardless of the reset.
109 }
110 
OnRpcRequest(const void * data,size_t len)111 void Rpc::OnRpcRequest(const void* data, size_t len) {
112   rxbuf_.Append(data, len);
113   for (;;) {
114     auto msg = rxbuf_.ReadMessage();
115     if (!msg.valid()) {
116       if (msg.fatal_framing_error) {
117         protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
118         err_msg->add_msg()->set_fatal_error("RPC framing error");
119         auto err = err_msg.SerializeAsArray();
120         rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
121         rpc_response_fn_(nullptr, 0);  // Disconnect.
122       }
123       break;
124     }
125     ParseRpcRequest(msg.start, msg.len);
126   }
127 }
128 
129 namespace {
130 
131 using ProtoEnum = protos::pbzero::MetatraceCategories;
MetatraceCategoriesToPublicEnum(ProtoEnum categories)132 TraceProcessor::MetatraceCategories MetatraceCategoriesToPublicEnum(
133     ProtoEnum categories) {
134   switch (categories) {
135     case ProtoEnum::TOPLEVEL:
136       return TraceProcessor::MetatraceCategories::TOPLEVEL;
137     case ProtoEnum::QUERY:
138       return TraceProcessor::MetatraceCategories::QUERY;
139     case ProtoEnum::FUNCTION:
140       return TraceProcessor::MetatraceCategories::FUNCTION;
141     case ProtoEnum::ALL:
142       return TraceProcessor::MetatraceCategories::ALL;
143     case ProtoEnum::NONE:
144       return TraceProcessor::MetatraceCategories::NONE;
145   }
146   return TraceProcessor::MetatraceCategories::NONE;
147 }
148 
149 }  // namespace
150 
151 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
152 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)153 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
154   RpcProto::Decoder req(data, len);
155 
156   // We allow restarting the sequence from 0. This happens when refreshing the
157   // browser while using the external trace_processor_shell --httpd.
158   if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
159     char err_str[255];
160     // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
161     snprintf(err_str, sizeof(err_str),
162              "RPC request out of order. Expected %" PRId64 ", got %" PRId64
163              " (ERR:rpc_seq)",
164              rx_seq_id_ + 1, req.seq());
165     PERFETTO_ELOG("%s", err_str);
166     protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
167     err_msg->add_msg()->set_fatal_error(err_str);
168     auto err = err_msg.SerializeAsArray();
169     rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
170     rpc_response_fn_(nullptr, 0);  // Disconnect.
171     return;
172   }
173   rx_seq_id_ = req.seq();
174 
175   // The static cast is to prevent that the compiler breaks future proofness.
176   const int req_type = static_cast<int>(req.request());
177   static const char kErrFieldNotSet[] = "RPC error: request field not set";
178   switch (req_type) {
179     case RpcProto::TPM_APPEND_TRACE_DATA: {
180       Response resp(tx_seq_id_++, req_type);
181       auto* result = resp->set_append_result();
182       if (!req.has_append_trace_data()) {
183         result->set_error(kErrFieldNotSet);
184       } else {
185         protozero::ConstBytes byte_range = req.append_trace_data();
186         util::Status res = Parse(byte_range.data, byte_range.size);
187         if (!res.ok()) {
188           result->set_error(res.message());
189         }
190       }
191       resp.Send(rpc_response_fn_);
192       break;
193     }
194     case RpcProto::TPM_FINALIZE_TRACE_DATA: {
195       Response resp(tx_seq_id_++, req_type);
196       NotifyEndOfFile();
197       resp.Send(rpc_response_fn_);
198       break;
199     }
200     case RpcProto::TPM_QUERY_STREAMING: {
201       if (!req.has_query_args()) {
202         Response resp(tx_seq_id_++, req_type);
203         auto* result = resp->set_query_result();
204         result->set_error(kErrFieldNotSet);
205         resp.Send(rpc_response_fn_);
206       } else {
207         protozero::ConstBytes args = req.query_args();
208         auto it = QueryInternal(args.data, args.size);
209         QueryResultSerializer serializer(std::move(it));
210         for (bool has_more = true; has_more;) {
211           Response resp(tx_seq_id_++, req_type);
212           has_more = serializer.Serialize(resp->set_query_result());
213           resp.Send(rpc_response_fn_);
214         }
215       }
216       break;
217     }
218     case RpcProto::TPM_COMPUTE_METRIC: {
219       Response resp(tx_seq_id_++, req_type);
220       auto* result = resp->set_metric_result();
221       if (!req.has_compute_metric_args()) {
222         result->set_error(kErrFieldNotSet);
223       } else {
224         protozero::ConstBytes args = req.compute_metric_args();
225         ComputeMetricInternal(args.data, args.size, result);
226       }
227       resp.Send(rpc_response_fn_);
228       break;
229     }
230     case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
231       Response resp(tx_seq_id_++, req_type);
232       auto descriptor_set = trace_processor_->GetMetricDescriptors();
233       auto* result = resp->set_metric_descriptors();
234       result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
235       resp.Send(rpc_response_fn_);
236       break;
237     }
238     case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
239       trace_processor_->RestoreInitialTables();
240       Response resp(tx_seq_id_++, req_type);
241       resp.Send(rpc_response_fn_);
242       break;
243     }
244     case RpcProto::TPM_ENABLE_METATRACE: {
245       using protos::pbzero::MetatraceCategories;
246       protozero::ConstBytes args = req.enable_metatrace_args();
247       EnableMetatrace(args.data, args.size);
248 
249       Response resp(tx_seq_id_++, req_type);
250       resp.Send(rpc_response_fn_);
251       break;
252     }
253     case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
254       Response resp(tx_seq_id_++, req_type);
255       DisableAndReadMetatraceInternal(resp->set_metatrace());
256       resp.Send(rpc_response_fn_);
257       break;
258     }
259     case RpcProto::TPM_GET_STATUS: {
260       Response resp(tx_seq_id_++, req_type);
261       std::vector<uint8_t> status = GetStatus();
262       resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
263       resp.Send(rpc_response_fn_);
264       break;
265     }
266     case RpcProto::TPM_RESET_TRACE_PROCESSOR: {
267       Response resp(tx_seq_id_++, req_type);
268       protozero::ConstBytes args = req.reset_trace_processor_args();
269       ResetTraceProcessor(args.data, args.size);
270       resp.Send(rpc_response_fn_);
271       break;
272     }
273     default: {
274       // This can legitimately happen if the client is newer. We reply with a
275       // generic "unkown request" response, so the client can do feature
276       // detection
277       PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
278       Response resp(tx_seq_id_++, req_type);
279       resp->set_invalid_request(
280           static_cast<RpcProto::TraceProcessorMethod>(req_type));
281       resp.Send(rpc_response_fn_);
282       break;
283     }
284   }  // switch(req_type)
285 }
286 
Parse(const uint8_t * data,size_t len)287 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
288   PERFETTO_TP_TRACE(
289       metatrace::Category::TOPLEVEL, "RPC_PARSE",
290       [&](metatrace::Record* r) { r->AddArg("length", std::to_string(len)); });
291   if (eof_) {
292     // Reset the trace processor state if another trace has been previously
293     // loaded. Use the same TraceProcessor Config.
294     ResetTraceProcessorInternal(trace_processor_config_);
295   }
296 
297   eof_ = false;
298   bytes_parsed_ += len;
299   MaybePrintProgress();
300 
301   if (len == 0)
302     return util::OkStatus();
303 
304   // TraceProcessor needs take ownership of the memory chunk.
305   std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
306   memcpy(data_copy.get(), data, len);
307   return trace_processor_->Parse(std::move(data_copy), len);
308 }
309 
NotifyEndOfFile()310 void Rpc::NotifyEndOfFile() {
311   PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_NOTIFY_END_OF_FILE");
312 
313   trace_processor_->NotifyEndOfFile();
314   eof_ = true;
315   MaybePrintProgress();
316 }
317 
ResetTraceProcessor(const uint8_t * args,size_t len)318 void Rpc::ResetTraceProcessor(const uint8_t* args, size_t len) {
319   protos::pbzero::ResetTraceProcessorArgs::Decoder reset_trace_processor_args(
320       args, len);
321   Config config;
322   if (reset_trace_processor_args.has_drop_track_event_data_before()) {
323     config.drop_track_event_data_before =
324         reset_trace_processor_args.drop_track_event_data_before() ==
325                 protos::pbzero::ResetTraceProcessorArgs::
326                     TRACK_EVENT_RANGE_OF_INTEREST
327             ? DropTrackEventDataBefore::kTrackEventRangeOfInterest
328             : DropTrackEventDataBefore::kNoDrop;
329   }
330   if (reset_trace_processor_args.has_ingest_ftrace_in_raw_table()) {
331     config.ingest_ftrace_in_raw_table =
332         reset_trace_processor_args.ingest_ftrace_in_raw_table();
333   }
334   if (reset_trace_processor_args.has_analyze_trace_proto_content()) {
335     config.analyze_trace_proto_content =
336         reset_trace_processor_args.analyze_trace_proto_content();
337   }
338   ResetTraceProcessorInternal(config);
339 }
340 
MaybePrintProgress()341 void Rpc::MaybePrintProgress() {
342   if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
343     bytes_last_progress_ = bytes_parsed_;
344     auto t_load_s =
345         static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
346         1e9;
347     fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
348             static_cast<double>(bytes_parsed_) / 1e6,
349             static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
350             (eof_ ? "\n" : ""));
351     fflush(stderr);
352   }
353 }
354 
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)355 void Rpc::Query(const uint8_t* args,
356                 size_t len,
357                 QueryResultBatchCallback result_callback) {
358   auto it = QueryInternal(args, len);
359   QueryResultSerializer serializer(std::move(it));
360 
361   std::vector<uint8_t> res;
362   for (bool has_more = true; has_more;) {
363     has_more = serializer.Serialize(&res);
364     result_callback(res.data(), res.size(), has_more);
365     res.clear();
366   }
367 }
368 
QueryInternal(const uint8_t * args,size_t len)369 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
370   protos::pbzero::QueryArgs::Decoder query(args, len);
371   std::string sql = query.sql_query().ToStdString();
372   PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
373   PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_QUERY",
374                     [&](metatrace::Record* r) {
375                       r->AddArg("SQL", sql);
376                       if (query.has_tag()) {
377                         r->AddArg("tag", query.tag());
378                       }
379                     });
380 
381   return trace_processor_->ExecuteQuery(sql.c_str());
382 }
383 
RestoreInitialTables()384 void Rpc::RestoreInitialTables() {
385   trace_processor_->RestoreInitialTables();
386 }
387 
ComputeMetric(const uint8_t * args,size_t len)388 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
389   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
390   ComputeMetricInternal(args, len, result.get());
391   return result.SerializeAsArray();
392 }
393 
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)394 void Rpc::ComputeMetricInternal(const uint8_t* data,
395                                 size_t len,
396                                 protos::pbzero::ComputeMetricResult* result) {
397   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
398   std::vector<std::string> metric_names;
399   for (auto it = args.metric_names(); it; ++it) {
400     metric_names.emplace_back(it->as_std_string());
401   }
402 
403   PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_COMPUTE_METRIC",
404                     [&](metatrace::Record* r) {
405                       for (const auto& metric : metric_names) {
406                         r->AddArg("Metric", metric);
407                         r->AddArg("Format", std::to_string(args.format()));
408                       }
409                     });
410 
411   PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
412                 metric_names.empty() ? "" : metric_names.front().c_str(),
413                 args.format());
414   switch (args.format()) {
415     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
416       std::vector<uint8_t> metrics_proto;
417       util::Status status =
418           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
419       if (status.ok()) {
420         result->set_metrics(metrics_proto.data(), metrics_proto.size());
421       } else {
422         result->set_error(status.message());
423       }
424       break;
425     }
426     case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
427       std::string metrics_string;
428       util::Status status = trace_processor_->ComputeMetricText(
429           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
430           &metrics_string);
431       if (status.ok()) {
432         result->set_metrics_as_prototext(metrics_string);
433       } else {
434         result->set_error(status.message());
435       }
436       break;
437     }
438   }
439 }
440 
EnableMetatrace(const uint8_t * data,size_t len)441 void Rpc::EnableMetatrace(const uint8_t* data, size_t len) {
442   using protos::pbzero::MetatraceCategories;
443   TraceProcessor::MetatraceConfig config;
444   protos::pbzero::EnableMetatraceArgs::Decoder args(data, len);
445   config.categories = MetatraceCategoriesToPublicEnum(
446       static_cast<MetatraceCategories>(args.categories()));
447   trace_processor_->EnableMetatrace(config);
448 }
449 
DisableAndReadMetatrace()450 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
451   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
452   DisableAndReadMetatraceInternal(result.get());
453   return result.SerializeAsArray();
454 }
455 
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)456 void Rpc::DisableAndReadMetatraceInternal(
457     protos::pbzero::DisableAndReadMetatraceResult* result) {
458   std::vector<uint8_t> trace_proto;
459   util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
460   if (status.ok()) {
461     result->set_metatrace(trace_proto.data(), trace_proto.size());
462   } else {
463     result->set_error(status.message());
464   }
465 }
466 
GetStatus()467 std::vector<uint8_t> Rpc::GetStatus() {
468   protozero::HeapBuffered<protos::pbzero::StatusResult> status;
469   status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
470   status->set_human_readable_version(base::GetVersionString());
471   status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
472   return status.SerializeAsArray();
473 }
474 
475 }  // namespace trace_processor
476 }  // namespace perfetto
477