• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/rpc.h"
18 
19 #include <cinttypes>
20 #include <cstdint>
21 #include <cstdio>
22 #include <cstring>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/status.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/ext/base/version.h"
32 #include "perfetto/ext/protozero/proto_ring_buffer.h"
33 #include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
34 #include "perfetto/protozero/field.h"
35 #include "perfetto/protozero/scattered_heap_buffer.h"
36 #include "perfetto/trace_processor/basic_types.h"
37 #include "perfetto/trace_processor/metatrace_config.h"
38 #include "perfetto/trace_processor/trace_processor.h"
39 #include "src/trace_processor/tp_metatrace.h"
40 
41 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
42 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
43 
44 namespace perfetto::trace_processor {
45 
46 namespace {
47 // Writes a "Loading trace ..." update every N bytes.
48 constexpr size_t kProgressUpdateBytes = 50ul * 1000 * 1000;
49 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
50 using RpcProto = protos::pbzero::TraceProcessorRpc;
51 
52 // Most RPC messages are either very small or a query results.
53 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
54 // avoid extra heap allocations for the nominal case.
55 constexpr auto kSliceSize =
56     QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
57 
58 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
59 // copies by doing direct scattered calls from the fragmented heap buffer onto
60 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
61 // fragmentation anyways). It also takes care of prefixing each message with
62 // the proto preamble and varint size.
63 class Response {
64  public:
65   Response(int64_t seq, int method);
66   Response(const Response&) = delete;
67   Response& operator=(const Response&) = delete;
operator ->()68   RpcProto* operator->() { return msg_; }
69   void Send(Rpc::RpcResponseFunction);
70 
71  private:
72   RpcProto* msg_ = nullptr;
73 
74   // The reason why we use TraceProcessorRpcStream as root message is because
75   // the RPC wire protocol expects each message to be prefixed with a proto
76   // preamble and varint size. This happens to be the same serialization of a
77   // repeated field (this is really the same trick we use between
78   // Trace and TracePacket in trace.proto)
79   protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
80 };
81 
Response(int64_t seq,int method)82 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
83   msg_ = buf_->add_msg();
84   msg_->set_seq(seq);
85   msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
86 }
87 
Send(Rpc::RpcResponseFunction send_fn)88 void Response::Send(Rpc::RpcResponseFunction send_fn) {
89   buf_->Finalize();
90   for (const auto& slice : buf_.GetSlices()) {
91     auto range = slice.GetUsedRange();
92     send_fn(range.begin, static_cast<uint32_t>(range.size()));
93   }
94 }
95 
96 }  // namespace
97 
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)98 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
99     : trace_processor_(std::move(preloaded_instance)) {
100   if (!trace_processor_)
101     ResetTraceProcessorInternal(Config());
102 }
103 
Rpc()104 Rpc::Rpc() : Rpc(nullptr) {}
105 Rpc::~Rpc() = default;
106 
ResetTraceProcessorInternal(const Config & config)107 void Rpc::ResetTraceProcessorInternal(const Config& config) {
108   trace_processor_config_ = config;
109   trace_processor_ = TraceProcessor::CreateInstance(config);
110   bytes_parsed_ = bytes_last_progress_ = 0;
111   t_parse_started_ = base::GetWallTimeNs().count();
112   // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
113   // This is invoked from the same client to clear the current trace state
114   // before loading a new one. The IPC channel is orthogonal to that and the
115   // message numbering continues regardless of the reset.
116 }
117 
OnRpcRequest(const void * data,size_t len)118 void Rpc::OnRpcRequest(const void* data, size_t len) {
119   rxbuf_.Append(data, len);
120   for (;;) {
121     auto msg = rxbuf_.ReadMessage();
122     if (!msg.valid()) {
123       if (msg.fatal_framing_error) {
124         protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
125         err_msg->add_msg()->set_fatal_error("RPC framing error");
126         auto err = err_msg.SerializeAsArray();
127         rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
128         rpc_response_fn_(nullptr, 0);  // Disconnect.
129       }
130       break;
131     }
132     ParseRpcRequest(msg.start, msg.len);
133   }
134 }
135 
136 namespace {
137 
138 using ProtoEnum = protos::pbzero::MetatraceCategories;
MetatraceCategoriesToPublicEnum(ProtoEnum categories)139 TraceProcessor::MetatraceCategories MetatraceCategoriesToPublicEnum(
140     ProtoEnum categories) {
141   TraceProcessor::MetatraceCategories result =
142       TraceProcessor::MetatraceCategories::NONE;
143   if (categories & ProtoEnum::QUERY_TIMELINE) {
144     result = static_cast<TraceProcessor::MetatraceCategories>(
145         result | TraceProcessor::MetatraceCategories::QUERY_TIMELINE);
146   }
147   if (categories & ProtoEnum::QUERY_DETAILED) {
148     result = static_cast<TraceProcessor::MetatraceCategories>(
149         result | TraceProcessor::MetatraceCategories::QUERY_DETAILED);
150   }
151   if (categories & ProtoEnum::FUNCTION_CALL) {
152     result = static_cast<TraceProcessor::MetatraceCategories>(
153         result | TraceProcessor::MetatraceCategories::FUNCTION_CALL);
154   }
155   if (categories & ProtoEnum::DB) {
156     result = static_cast<TraceProcessor::MetatraceCategories>(
157         result | TraceProcessor::MetatraceCategories::DB);
158   }
159   if (categories & ProtoEnum::API_TIMELINE) {
160     result = static_cast<TraceProcessor::MetatraceCategories>(
161         result | TraceProcessor::MetatraceCategories::API_TIMELINE);
162   }
163   return result;
164 }
165 
166 }  // namespace
167 
168 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
169 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)170 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
171   RpcProto::Decoder req(data, len);
172 
173   // We allow restarting the sequence from 0. This happens when refreshing the
174   // browser while using the external trace_processor_shell --httpd.
175   if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
176     char err_str[255];
177     // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
178     snprintf(err_str, sizeof(err_str),
179              "RPC request out of order. Expected %" PRId64 ", got %" PRId64
180              " (ERR:rpc_seq)",
181              rx_seq_id_ + 1, req.seq());
182     PERFETTO_ELOG("%s", err_str);
183     protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
184     err_msg->add_msg()->set_fatal_error(err_str);
185     auto err = err_msg.SerializeAsArray();
186     rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
187     rpc_response_fn_(nullptr, 0);  // Disconnect.
188     return;
189   }
190   rx_seq_id_ = req.seq();
191 
192   // The static cast is to prevent that the compiler breaks future proofness.
193   const int req_type = static_cast<int>(req.request());
194   static const char kErrFieldNotSet[] = "RPC error: request field not set";
195   switch (req_type) {
196     case RpcProto::TPM_APPEND_TRACE_DATA: {
197       Response resp(tx_seq_id_++, req_type);
198       auto* result = resp->set_append_result();
199       if (!req.has_append_trace_data()) {
200         result->set_error(kErrFieldNotSet);
201       } else {
202         protozero::ConstBytes byte_range = req.append_trace_data();
203         base::Status res = Parse(byte_range.data, byte_range.size);
204         if (!res.ok()) {
205           result->set_error(res.message());
206         }
207       }
208       resp.Send(rpc_response_fn_);
209       break;
210     }
211     case RpcProto::TPM_FINALIZE_TRACE_DATA: {
212       Response resp(tx_seq_id_++, req_type);
213       NotifyEndOfFile();
214       resp.Send(rpc_response_fn_);
215       break;
216     }
217     case RpcProto::TPM_QUERY_STREAMING: {
218       if (!req.has_query_args()) {
219         Response resp(tx_seq_id_++, req_type);
220         auto* result = resp->set_query_result();
221         result->set_error(kErrFieldNotSet);
222         resp.Send(rpc_response_fn_);
223       } else {
224         protozero::ConstBytes args = req.query_args();
225         auto it = QueryInternal(args.data, args.size);
226         QueryResultSerializer serializer(std::move(it));
227         for (bool has_more = true; has_more;) {
228           Response resp(tx_seq_id_++, req_type);
229           has_more = serializer.Serialize(resp->set_query_result());
230           resp.Send(rpc_response_fn_);
231         }
232       }
233       break;
234     }
235     case RpcProto::TPM_COMPUTE_METRIC: {
236       Response resp(tx_seq_id_++, req_type);
237       auto* result = resp->set_metric_result();
238       if (!req.has_compute_metric_args()) {
239         result->set_error(kErrFieldNotSet);
240       } else {
241         protozero::ConstBytes args = req.compute_metric_args();
242         ComputeMetricInternal(args.data, args.size, result);
243       }
244       resp.Send(rpc_response_fn_);
245       break;
246     }
247     case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
248       Response resp(tx_seq_id_++, req_type);
249       auto descriptor_set = trace_processor_->GetMetricDescriptors();
250       auto* result = resp->set_metric_descriptors();
251       result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
252       resp.Send(rpc_response_fn_);
253       break;
254     }
255     case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
256       trace_processor_->RestoreInitialTables();
257       Response resp(tx_seq_id_++, req_type);
258       resp.Send(rpc_response_fn_);
259       break;
260     }
261     case RpcProto::TPM_ENABLE_METATRACE: {
262       using protos::pbzero::MetatraceCategories;
263       protozero::ConstBytes args = req.enable_metatrace_args();
264       EnableMetatrace(args.data, args.size);
265 
266       Response resp(tx_seq_id_++, req_type);
267       resp.Send(rpc_response_fn_);
268       break;
269     }
270     case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
271       Response resp(tx_seq_id_++, req_type);
272       DisableAndReadMetatraceInternal(resp->set_metatrace());
273       resp.Send(rpc_response_fn_);
274       break;
275     }
276     case RpcProto::TPM_GET_STATUS: {
277       Response resp(tx_seq_id_++, req_type);
278       std::vector<uint8_t> status = GetStatus();
279       resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
280       resp.Send(rpc_response_fn_);
281       break;
282     }
283     case RpcProto::TPM_RESET_TRACE_PROCESSOR: {
284       Response resp(tx_seq_id_++, req_type);
285       protozero::ConstBytes args = req.reset_trace_processor_args();
286       ResetTraceProcessor(args.data, args.size);
287       resp.Send(rpc_response_fn_);
288       break;
289     }
290     default: {
291       // This can legitimately happen if the client is newer. We reply with a
292       // generic "unkown request" response, so the client can do feature
293       // detection
294       PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
295       Response resp(tx_seq_id_++, req_type);
296       resp->set_invalid_request(
297           static_cast<RpcProto::TraceProcessorMethod>(req_type));
298       resp.Send(rpc_response_fn_);
299       break;
300     }
301   }  // switch(req_type)
302 }
303 
Parse(const uint8_t * data,size_t len)304 base::Status Rpc::Parse(const uint8_t* data, size_t len) {
305   PERFETTO_TP_TRACE(
306       metatrace::Category::API_TIMELINE, "RPC_PARSE",
307       [&](metatrace::Record* r) { r->AddArg("length", std::to_string(len)); });
308   if (eof_) {
309     // Reset the trace processor state if another trace has been previously
310     // loaded. Use the same TraceProcessor Config.
311     ResetTraceProcessorInternal(trace_processor_config_);
312   }
313 
314   eof_ = false;
315   bytes_parsed_ += len;
316   MaybePrintProgress();
317 
318   if (len == 0)
319     return base::OkStatus();
320 
321   // TraceProcessor needs take ownership of the memory chunk.
322   std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
323   memcpy(data_copy.get(), data, len);
324   return trace_processor_->Parse(std::move(data_copy), len);
325 }
326 
NotifyEndOfFile()327 void Rpc::NotifyEndOfFile() {
328   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE,
329                     "RPC_NOTIFY_END_OF_FILE");
330 
331   trace_processor_->NotifyEndOfFile();
332   eof_ = true;
333   MaybePrintProgress();
334 }
335 
ResetTraceProcessor(const uint8_t * args,size_t len)336 void Rpc::ResetTraceProcessor(const uint8_t* args, size_t len) {
337   protos::pbzero::ResetTraceProcessorArgs::Decoder reset_trace_processor_args(
338       args, len);
339   Config config;
340   if (reset_trace_processor_args.has_drop_track_event_data_before()) {
341     config.drop_track_event_data_before =
342         reset_trace_processor_args.drop_track_event_data_before() ==
343                 protos::pbzero::ResetTraceProcessorArgs::
344                     TRACK_EVENT_RANGE_OF_INTEREST
345             ? DropTrackEventDataBefore::kTrackEventRangeOfInterest
346             : DropTrackEventDataBefore::kNoDrop;
347   }
348   if (reset_trace_processor_args.has_ingest_ftrace_in_raw_table()) {
349     config.ingest_ftrace_in_raw_table =
350         reset_trace_processor_args.ingest_ftrace_in_raw_table();
351   }
352   if (reset_trace_processor_args.has_analyze_trace_proto_content()) {
353     config.analyze_trace_proto_content =
354         reset_trace_processor_args.analyze_trace_proto_content();
355   }
356   if (reset_trace_processor_args.has_ftrace_drop_until_all_cpus_valid()) {
357     config.soft_drop_ftrace_data_before =
358         reset_trace_processor_args.ftrace_drop_until_all_cpus_valid()
359             ? SoftDropFtraceDataBefore::kAllPerCpuBuffersValid
360             : SoftDropFtraceDataBefore::kNoDrop;
361   }
362   ResetTraceProcessorInternal(config);
363 }
364 
MaybePrintProgress()365 void Rpc::MaybePrintProgress() {
366   if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
367     bytes_last_progress_ = bytes_parsed_;
368     auto t_load_s =
369         static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
370         1e9;
371     fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
372             static_cast<double>(bytes_parsed_) / 1e6,
373             static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
374             (eof_ ? "\n" : ""));
375     fflush(stderr);
376   }
377 }
378 
Query(const uint8_t * args,size_t len,const QueryResultBatchCallback & result_callback)379 void Rpc::Query(const uint8_t* args,
380                 size_t len,
381                 const QueryResultBatchCallback& result_callback) {
382   auto it = QueryInternal(args, len);
383   QueryResultSerializer serializer(std::move(it));
384 
385   std::vector<uint8_t> res;
386   for (bool has_more = true; has_more;) {
387     has_more = serializer.Serialize(&res);
388     result_callback(res.data(), res.size(), has_more);
389     res.clear();
390   }
391 }
392 
QueryInternal(const uint8_t * args,size_t len)393 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
394   protos::pbzero::QueryArgs::Decoder query(args, len);
395   std::string sql = query.sql_query().ToStdString();
396   PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
397   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_QUERY",
398                     [&](metatrace::Record* r) {
399                       r->AddArg("SQL", sql);
400                       if (query.has_tag()) {
401                         r->AddArg("tag", query.tag());
402                       }
403                     });
404 
405   return trace_processor_->ExecuteQuery(sql);
406 }
407 
RestoreInitialTables()408 void Rpc::RestoreInitialTables() {
409   trace_processor_->RestoreInitialTables();
410 }
411 
ComputeMetric(const uint8_t * args,size_t len)412 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
413   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
414   ComputeMetricInternal(args, len, result.get());
415   return result.SerializeAsArray();
416 }
417 
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)418 void Rpc::ComputeMetricInternal(const uint8_t* data,
419                                 size_t len,
420                                 protos::pbzero::ComputeMetricResult* result) {
421   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
422   std::vector<std::string> metric_names;
423   for (auto it = args.metric_names(); it; ++it) {
424     metric_names.emplace_back(it->as_std_string());
425   }
426 
427   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_COMPUTE_METRIC",
428                     [&](metatrace::Record* r) {
429                       for (const auto& metric : metric_names) {
430                         r->AddArg("Metric", metric);
431                         r->AddArg("Format", std::to_string(args.format()));
432                       }
433                     });
434 
435   PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
436                 metric_names.empty() ? "" : metric_names.front().c_str(),
437                 args.format());
438   switch (args.format()) {
439     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
440       std::vector<uint8_t> metrics_proto;
441       base::Status status =
442           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
443       if (status.ok()) {
444         result->set_metrics(metrics_proto.data(), metrics_proto.size());
445       } else {
446         result->set_error(status.message());
447       }
448       break;
449     }
450     case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
451       std::string metrics_string;
452       base::Status status = trace_processor_->ComputeMetricText(
453           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
454           &metrics_string);
455       if (status.ok()) {
456         result->set_metrics_as_prototext(metrics_string);
457       } else {
458         result->set_error(status.message());
459       }
460       break;
461     }
462     case protos::pbzero::ComputeMetricArgs::JSON: {
463       std::string metrics_string;
464       base::Status status = trace_processor_->ComputeMetricText(
465           metric_names, TraceProcessor::MetricResultFormat::kJson,
466           &metrics_string);
467       if (status.ok()) {
468         result->set_metrics_as_json(metrics_string);
469       } else {
470         result->set_error(status.message());
471       }
472       break;
473     }
474   }
475 }
476 
EnableMetatrace(const uint8_t * data,size_t len)477 void Rpc::EnableMetatrace(const uint8_t* data, size_t len) {
478   using protos::pbzero::MetatraceCategories;
479   TraceProcessor::MetatraceConfig config;
480   protos::pbzero::EnableMetatraceArgs::Decoder args(data, len);
481   config.categories = MetatraceCategoriesToPublicEnum(
482       static_cast<MetatraceCategories>(args.categories()));
483   trace_processor_->EnableMetatrace(config);
484 }
485 
DisableAndReadMetatrace()486 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
487   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
488   DisableAndReadMetatraceInternal(result.get());
489   return result.SerializeAsArray();
490 }
491 
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)492 void Rpc::DisableAndReadMetatraceInternal(
493     protos::pbzero::DisableAndReadMetatraceResult* result) {
494   std::vector<uint8_t> trace_proto;
495   base::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
496   if (status.ok()) {
497     result->set_metatrace(trace_proto.data(), trace_proto.size());
498   } else {
499     result->set_error(status.message());
500   }
501 }
502 
GetStatus()503 std::vector<uint8_t> Rpc::GetStatus() {
504   protozero::HeapBuffered<protos::pbzero::StatusResult> status;
505   status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
506   status->set_human_readable_version(base::GetVersionString());
507   if (const char* version_code = base::GetVersionCode(); version_code) {
508     status->set_version_code(version_code);
509   }
510   status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
511   return status.SerializeAsArray();
512 }
513 
514 }  // namespace perfetto::trace_processor
515