• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/rpc.h"
18 
19 #include <cinttypes>
20 #include <cstdint>
21 #include <cstdio>
22 #include <cstring>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/status.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/ext/base/version.h"
32 #include "perfetto/ext/protozero/proto_ring_buffer.h"
33 #include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
34 #include "perfetto/protozero/field.h"
35 #include "perfetto/protozero/proto_utils.h"
36 #include "perfetto/protozero/scattered_heap_buffer.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "perfetto/trace_processor/metatrace_config.h"
39 #include "perfetto/trace_processor/trace_processor.h"
40 #include "src/trace_processor/tp_metatrace.h"
41 #include "src/trace_processor/util/status_macros.h"
42 
43 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
44 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
45 
46 namespace perfetto::trace_processor {
47 
48 namespace {
49 // Writes a "Loading trace ..." update every N bytes.
50 constexpr size_t kProgressUpdateBytes = 50ul * 1000 * 1000;
51 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
52 using RpcProto = protos::pbzero::TraceProcessorRpc;
53 
54 // Most RPC messages are either very small or a query results.
55 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
56 // avoid extra heap allocations for the nominal case.
57 constexpr auto kSliceSize =
58     QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
59 
60 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
61 // copies by doing direct scattered calls from the fragmented heap buffer onto
62 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
63 // fragmentation anyways). It also takes care of prefixing each message with
64 // the proto preamble and varint size.
65 class Response {
66  public:
67   Response(int64_t seq, int method);
68   Response(const Response&) = delete;
69   Response& operator=(const Response&) = delete;
operator ->()70   RpcProto* operator->() { return msg_; }
71   void Send(Rpc::RpcResponseFunction);
72 
73  private:
74   RpcProto* msg_ = nullptr;
75 
76   // The reason why we use TraceProcessorRpcStream as root message is because
77   // the RPC wire protocol expects each message to be prefixed with a proto
78   // preamble and varint size. This happens to be the same serialization of a
79   // repeated field (this is really the same trick we use between
80   // Trace and TracePacket in trace.proto)
81   protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
82 };
83 
Response(int64_t seq,int method)84 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
85   msg_ = buf_->add_msg();
86   msg_->set_seq(seq);
87   msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
88 }
89 
Send(Rpc::RpcResponseFunction send_fn)90 void Response::Send(Rpc::RpcResponseFunction send_fn) {
91   buf_->Finalize();
92   for (const auto& slice : buf_.GetSlices()) {
93     auto range = slice.GetUsedRange();
94     send_fn(range.begin, static_cast<uint32_t>(range.size()));
95   }
96 }
97 
98 }  // namespace
99 
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)100 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
101     : trace_processor_(std::move(preloaded_instance)) {
102   if (!trace_processor_)
103     ResetTraceProcessorInternal(Config());
104 }
105 
Rpc()106 Rpc::Rpc() : Rpc(nullptr) {}
107 Rpc::~Rpc() = default;
108 
ResetTraceProcessorInternal(const Config & config)109 void Rpc::ResetTraceProcessorInternal(const Config& config) {
110   trace_processor_config_ = config;
111   trace_processor_ = TraceProcessor::CreateInstance(config);
112   bytes_parsed_ = bytes_last_progress_ = 0;
113   t_parse_started_ = base::GetWallTimeNs().count();
114   // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
115   // This is invoked from the same client to clear the current trace state
116   // before loading a new one. The IPC channel is orthogonal to that and the
117   // message numbering continues regardless of the reset.
118 }
119 
OnRpcRequest(const void * data,size_t len)120 void Rpc::OnRpcRequest(const void* data, size_t len) {
121   rxbuf_.Append(data, len);
122   for (;;) {
123     auto msg = rxbuf_.ReadMessage();
124     if (!msg.valid()) {
125       if (msg.fatal_framing_error) {
126         protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
127         err_msg->add_msg()->set_fatal_error("RPC framing error");
128         auto err = err_msg.SerializeAsArray();
129         rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
130         rpc_response_fn_(nullptr, 0);  // Disconnect.
131       }
132       break;
133     }
134     ParseRpcRequest(msg.start, msg.len);
135   }
136 }
137 
138 namespace {
139 
140 using ProtoEnum = protos::pbzero::MetatraceCategories;
MetatraceCategoriesToPublicEnum(ProtoEnum categories)141 TraceProcessor::MetatraceCategories MetatraceCategoriesToPublicEnum(
142     ProtoEnum categories) {
143   TraceProcessor::MetatraceCategories result =
144       TraceProcessor::MetatraceCategories::NONE;
145   if (categories & ProtoEnum::QUERY_TIMELINE) {
146     result = static_cast<TraceProcessor::MetatraceCategories>(
147         result | TraceProcessor::MetatraceCategories::QUERY_TIMELINE);
148   }
149   if (categories & ProtoEnum::QUERY_DETAILED) {
150     result = static_cast<TraceProcessor::MetatraceCategories>(
151         result | TraceProcessor::MetatraceCategories::QUERY_DETAILED);
152   }
153   if (categories & ProtoEnum::FUNCTION_CALL) {
154     result = static_cast<TraceProcessor::MetatraceCategories>(
155         result | TraceProcessor::MetatraceCategories::FUNCTION_CALL);
156   }
157   if (categories & ProtoEnum::DB) {
158     result = static_cast<TraceProcessor::MetatraceCategories>(
159         result | TraceProcessor::MetatraceCategories::DB);
160   }
161   if (categories & ProtoEnum::API_TIMELINE) {
162     result = static_cast<TraceProcessor::MetatraceCategories>(
163         result | TraceProcessor::MetatraceCategories::API_TIMELINE);
164   }
165   return result;
166 }
167 
168 }  // namespace
169 
170 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
171 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)172 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
173   RpcProto::Decoder req(data, len);
174 
175   // We allow restarting the sequence from 0. This happens when refreshing the
176   // browser while using the external trace_processor_shell --httpd.
177   if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
178     char err_str[255];
179     // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
180     snprintf(err_str, sizeof(err_str),
181              "RPC request out of order. Expected %" PRId64 ", got %" PRId64
182              " (ERR:rpc_seq)",
183              rx_seq_id_ + 1, req.seq());
184     PERFETTO_ELOG("%s", err_str);
185     protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
186     err_msg->add_msg()->set_fatal_error(err_str);
187     auto err = err_msg.SerializeAsArray();
188     rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
189     rpc_response_fn_(nullptr, 0);  // Disconnect.
190     return;
191   }
192   rx_seq_id_ = req.seq();
193 
194   // The static cast is to prevent that the compiler breaks future proofness.
195   const int req_type = static_cast<int>(req.request());
196   static const char kErrFieldNotSet[] = "RPC error: request field not set";
197   switch (req_type) {
198     case RpcProto::TPM_APPEND_TRACE_DATA: {
199       Response resp(tx_seq_id_++, req_type);
200       auto* result = resp->set_append_result();
201       if (!req.has_append_trace_data()) {
202         result->set_error(kErrFieldNotSet);
203       } else {
204         protozero::ConstBytes byte_range = req.append_trace_data();
205         base::Status res = Parse(byte_range.data, byte_range.size);
206         if (!res.ok()) {
207           result->set_error(res.message());
208         }
209       }
210       resp.Send(rpc_response_fn_);
211       break;
212     }
213     case RpcProto::TPM_FINALIZE_TRACE_DATA: {
214       Response resp(tx_seq_id_++, req_type);
215       auto* result = resp->set_finalize_data_result();
216       base::Status res = NotifyEndOfFile();
217       if (!res.ok()) {
218         result->set_error(res.message());
219       }
220       resp.Send(rpc_response_fn_);
221       break;
222     }
223     case RpcProto::TPM_QUERY_STREAMING: {
224       if (!req.has_query_args()) {
225         Response resp(tx_seq_id_++, req_type);
226         auto* result = resp->set_query_result();
227         result->set_error(kErrFieldNotSet);
228         resp.Send(rpc_response_fn_);
229       } else {
230         protozero::ConstBytes args = req.query_args();
231         protos::pbzero::QueryArgs::Decoder query(args.data, args.size);
232         std::string sql = query.sql_query().ToStdString();
233 
234         PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_QUERY",
235                           [&](metatrace::Record* r) {
236                             r->AddArg("SQL", sql);
237                             if (query.has_tag()) {
238                               r->AddArg("tag", query.tag());
239                             }
240                           });
241 
242         auto it = trace_processor_->ExecuteQuery(sql);
243         QueryResultSerializer serializer(std::move(it));
244         for (bool has_more = true; has_more;) {
245           const auto seq_id = tx_seq_id_++;
246           Response resp(seq_id, req_type);
247           has_more = serializer.Serialize(resp->set_query_result());
248           const uint32_t resp_size = resp->Finalize();
249           if (resp_size < protozero::proto_utils::kMaxMessageLength) {
250             // This is the nominal case.
251             resp.Send(rpc_response_fn_);
252             continue;
253           }
254           // In rare cases a query can end up with a batch which is too big.
255           // Normally batches are automatically split before hitting the limit,
256           // but one can come up with a query where a single cell is > 256MB.
257           // If this happens, just bail out gracefully rather than creating an
258           // unparsable proto which will cause a RPC framing error.
259           // If we hit this, we have to discard `resp` because it's
260           // unavoidably broken (due to have overflown the 4-bytes size) and
261           // can't be parsed. Instead create a new response with the error.
262           Response err_resp(seq_id, req_type);
263           auto* qres = err_resp->set_query_result();
264           qres->add_batch()->set_is_last_batch(true);
265           qres->set_error(
266               "The query ended up with a response that is too big (" +
267               std::to_string(resp_size) +
268               " bytes). This usually happens when a single row is >= 256 MiB. "
269               "See also WRITE_FILE for dealing with large rows.");
270           err_resp.Send(rpc_response_fn_);
271           break;
272         }
273       }
274       break;
275     }
276     case RpcProto::TPM_COMPUTE_METRIC: {
277       Response resp(tx_seq_id_++, req_type);
278       auto* result = resp->set_metric_result();
279       if (!req.has_compute_metric_args()) {
280         result->set_error(kErrFieldNotSet);
281       } else {
282         protozero::ConstBytes args = req.compute_metric_args();
283         ComputeMetricInternal(args.data, args.size, result);
284       }
285       resp.Send(rpc_response_fn_);
286       break;
287     }
288     case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
289       Response resp(tx_seq_id_++, req_type);
290       auto descriptor_set = trace_processor_->GetMetricDescriptors();
291       auto* result = resp->set_metric_descriptors();
292       result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
293       resp.Send(rpc_response_fn_);
294       break;
295     }
296     case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
297       trace_processor_->RestoreInitialTables();
298       Response resp(tx_seq_id_++, req_type);
299       resp.Send(rpc_response_fn_);
300       break;
301     }
302     case RpcProto::TPM_ENABLE_METATRACE: {
303       using protos::pbzero::MetatraceCategories;
304       protozero::ConstBytes args = req.enable_metatrace_args();
305       EnableMetatrace(args.data, args.size);
306 
307       Response resp(tx_seq_id_++, req_type);
308       resp.Send(rpc_response_fn_);
309       break;
310     }
311     case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
312       Response resp(tx_seq_id_++, req_type);
313       DisableAndReadMetatraceInternal(resp->set_metatrace());
314       resp.Send(rpc_response_fn_);
315       break;
316     }
317     case RpcProto::TPM_GET_STATUS: {
318       Response resp(tx_seq_id_++, req_type);
319       std::vector<uint8_t> status = GetStatus();
320       resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
321       resp.Send(rpc_response_fn_);
322       break;
323     }
324     case RpcProto::TPM_RESET_TRACE_PROCESSOR: {
325       Response resp(tx_seq_id_++, req_type);
326       protozero::ConstBytes args = req.reset_trace_processor_args();
327       ResetTraceProcessor(args.data, args.size);
328       resp.Send(rpc_response_fn_);
329       break;
330     }
331     case RpcProto::TPM_REGISTER_SQL_PACKAGE: {
332       Response resp(tx_seq_id_++, req_type);
333       base::Status status = RegisterSqlPackage(req.register_sql_package_args());
334       auto* res = resp->set_register_sql_package_result();
335       if (!status.ok()) {
336         res->set_error(status.message());
337       }
338       resp.Send(rpc_response_fn_);
339       break;
340     }
341     case RpcProto::TPM_ANALYZE_STRUCTURED_QUERY: {
342       Response resp(tx_seq_id_++, req_type);
343       protozero::ConstBytes args = req.analyze_structured_query_args();
344       protos::pbzero::AnalyzeStructuredQueryArgs::Decoder decoder(args.data,
345                                                                   args.size);
346       std::vector<StructuredQueryBytes> queries;
347       for (auto it = decoder.queries(); it; ++it) {
348         StructuredQueryBytes n;
349         n.format = StructuredQueryBytes::Format::kBinaryProto;
350         n.ptr = it->data();
351         n.size = it->size();
352         queries.push_back(n);
353       }
354 
355       std::vector<AnalyzedStructuredQuery> analyzed_queries;
356       base::Status status = trace_processor_->AnalyzeStructuredQueries(
357           queries, &analyzed_queries);
358       auto* analyze_result = resp->set_analyze_structured_query_result();
359       if (!status.ok()) {
360         analyze_result->set_error(status.message());
361       }
362 
363       for (const auto& r : analyzed_queries) {
364         auto* query_res = analyze_result->add_results();
365         query_res->set_sql(r.sql);
366         query_res->set_textproto(r.textproto);
367         for (const std::string& m : r.modules) {
368           query_res->add_modules(m);
369         }
370         for (const std::string& p : r.preambles) {
371           query_res->add_preambles(p);
372         }
373       }
374       resp.Send(rpc_response_fn_);
375       break;
376     }
377     default: {
378       // This can legitimately happen if the client is newer. We reply with a
379       // generic "unkown request" response, so the client can do feature
380       // detection
381       PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
382       Response resp(tx_seq_id_++, req_type);
383       resp->set_invalid_request(
384           static_cast<RpcProto::TraceProcessorMethod>(req_type));
385       resp.Send(rpc_response_fn_);
386       break;
387     }
388   }  // switch(req_type)
389 }
390 
Parse(const uint8_t * data,size_t len)391 base::Status Rpc::Parse(const uint8_t* data, size_t len) {
392   PERFETTO_TP_TRACE(
393       metatrace::Category::API_TIMELINE, "RPC_PARSE",
394       [&](metatrace::Record* r) { r->AddArg("length", std::to_string(len)); });
395   if (eof_) {
396     // Reset the trace processor state if another trace has been previously
397     // loaded. Use the same TraceProcessor Config.
398     ResetTraceProcessorInternal(trace_processor_config_);
399   }
400 
401   eof_ = false;
402   bytes_parsed_ += len;
403   MaybePrintProgress();
404 
405   if (len == 0)
406     return base::OkStatus();
407 
408   // TraceProcessor needs take ownership of the memory chunk.
409   std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
410   memcpy(data_copy.get(), data, len);
411   return trace_processor_->Parse(std::move(data_copy), len);
412 }
413 
NotifyEndOfFile()414 base::Status Rpc::NotifyEndOfFile() {
415   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE,
416                     "RPC_NOTIFY_END_OF_FILE");
417 
418   eof_ = true;
419   RETURN_IF_ERROR(trace_processor_->NotifyEndOfFile());
420   MaybePrintProgress();
421   return base::OkStatus();
422 }
423 
ResetTraceProcessor(const uint8_t * args,size_t len)424 void Rpc::ResetTraceProcessor(const uint8_t* args, size_t len) {
425   protos::pbzero::ResetTraceProcessorArgs::Decoder reset_trace_processor_args(
426       args, len);
427   Config config;
428   if (reset_trace_processor_args.has_drop_track_event_data_before()) {
429     config.drop_track_event_data_before =
430         reset_trace_processor_args.drop_track_event_data_before() ==
431                 protos::pbzero::ResetTraceProcessorArgs::
432                     TRACK_EVENT_RANGE_OF_INTEREST
433             ? DropTrackEventDataBefore::kTrackEventRangeOfInterest
434             : DropTrackEventDataBefore::kNoDrop;
435   }
436   if (reset_trace_processor_args.has_ingest_ftrace_in_raw_table()) {
437     config.ingest_ftrace_in_raw_table =
438         reset_trace_processor_args.ingest_ftrace_in_raw_table();
439   }
440   if (reset_trace_processor_args.has_analyze_trace_proto_content()) {
441     config.analyze_trace_proto_content =
442         reset_trace_processor_args.analyze_trace_proto_content();
443   }
444   if (reset_trace_processor_args.has_ftrace_drop_until_all_cpus_valid()) {
445     config.soft_drop_ftrace_data_before =
446         reset_trace_processor_args.ftrace_drop_until_all_cpus_valid()
447             ? SoftDropFtraceDataBefore::kAllPerCpuBuffersValid
448             : SoftDropFtraceDataBefore::kNoDrop;
449   }
450   using Args = protos::pbzero::ResetTraceProcessorArgs;
451   switch (reset_trace_processor_args.parsing_mode()) {
452     case Args::ParsingMode::DEFAULT:
453       config.parsing_mode = ParsingMode::kDefault;
454       break;
455     case Args::ParsingMode::TOKENIZE_ONLY:
456       config.parsing_mode = ParsingMode::kTokenizeOnly;
457       break;
458     case Args::ParsingMode::TOKENIZE_AND_SORT:
459       config.parsing_mode = ParsingMode::kTokenizeAndSort;
460       break;
461   }
462   ResetTraceProcessorInternal(config);
463 }
464 
RegisterSqlPackage(protozero::ConstBytes bytes)465 base::Status Rpc::RegisterSqlPackage(protozero::ConstBytes bytes) {
466   protos::pbzero::RegisterSqlPackageArgs::Decoder args(bytes);
467   SqlPackage package;
468   package.name = args.package_name().ToStdString();
469   package.allow_override = args.allow_override();
470   for (auto it = args.modules(); it; ++it) {
471     protos::pbzero::RegisterSqlPackageArgs::Module::Decoder m(*it);
472     package.modules.emplace_back(m.name().ToStdString(), m.sql().ToStdString());
473   }
474   return trace_processor_->RegisterSqlPackage(package);
475 }
476 
MaybePrintProgress()477 void Rpc::MaybePrintProgress() {
478   if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
479     bytes_last_progress_ = bytes_parsed_;
480     auto t_load_s =
481         static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
482         1e9;
483     fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
484             static_cast<double>(bytes_parsed_) / 1e6,
485             static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
486             (eof_ ? "\n" : ""));
487     fflush(stderr);
488   }
489 }
490 
Query(const uint8_t * args,size_t len,const QueryResultBatchCallback & result_callback)491 void Rpc::Query(const uint8_t* args,
492                 size_t len,
493                 const QueryResultBatchCallback& result_callback) {
494   protos::pbzero::QueryArgs::Decoder query(args, len);
495   std::string sql = query.sql_query().ToStdString();
496   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_QUERY",
497                     [&](metatrace::Record* r) {
498                       r->AddArg("SQL", sql);
499                       if (query.has_tag()) {
500                         r->AddArg("tag", query.tag());
501                       }
502                     });
503 
504   auto it = trace_processor_->ExecuteQuery(sql);
505 
506   QueryResultSerializer serializer(std::move(it));
507 
508   protozero::HeapBuffered<protos::pbzero::QueryResult> buffered(kSliceSize,
509                                                                 kSliceSize);
510   for (bool has_more = true; has_more;) {
511     has_more = serializer.Serialize(buffered.get());
512     const auto& res = buffered.GetSlices();
513     for (uint32_t i = 0; i < res.size(); ++i) {
514       auto used = res[i].GetUsedRange();
515       result_callback(used.begin, used.size(), has_more || i < res.size() - 1);
516     }
517     if (res.size() == 0 && !has_more) {
518       result_callback(nullptr, 0, false);
519     }
520     buffered.Reset();
521   }
522 }
523 
RestoreInitialTables()524 void Rpc::RestoreInitialTables() {
525   trace_processor_->RestoreInitialTables();
526 }
527 
ComputeMetric(const uint8_t * args,size_t len)528 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
529   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
530   ComputeMetricInternal(args, len, result.get());
531   return result.SerializeAsArray();
532 }
533 
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)534 void Rpc::ComputeMetricInternal(const uint8_t* data,
535                                 size_t len,
536                                 protos::pbzero::ComputeMetricResult* result) {
537   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
538   std::vector<std::string> metric_names;
539   for (auto it = args.metric_names(); it; ++it) {
540     metric_names.emplace_back(it->as_std_string());
541   }
542 
543   PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_COMPUTE_METRIC",
544                     [&](metatrace::Record* r) {
545                       for (const auto& metric : metric_names) {
546                         r->AddArg("Metric", metric);
547                         r->AddArg("Format", std::to_string(args.format()));
548                       }
549                     });
550 
551   PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
552                 metric_names.empty() ? "" : metric_names.front().c_str(),
553                 args.format());
554   switch (args.format()) {
555     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
556       std::vector<uint8_t> metrics_proto;
557       base::Status status =
558           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
559       if (status.ok()) {
560         result->set_metrics(metrics_proto.data(), metrics_proto.size());
561       } else {
562         result->set_error(status.message());
563       }
564       break;
565     }
566     case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
567       std::string metrics_string;
568       base::Status status = trace_processor_->ComputeMetricText(
569           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
570           &metrics_string);
571       if (status.ok()) {
572         result->set_metrics_as_prototext(metrics_string);
573       } else {
574         result->set_error(status.message());
575       }
576       break;
577     }
578     case protos::pbzero::ComputeMetricArgs::JSON: {
579       std::string metrics_string;
580       base::Status status = trace_processor_->ComputeMetricText(
581           metric_names, TraceProcessor::MetricResultFormat::kJson,
582           &metrics_string);
583       if (status.ok()) {
584         result->set_metrics_as_json(metrics_string);
585       } else {
586         result->set_error(status.message());
587       }
588       break;
589     }
590   }
591 }
592 
EnableMetatrace(const uint8_t * data,size_t len)593 void Rpc::EnableMetatrace(const uint8_t* data, size_t len) {
594   using protos::pbzero::MetatraceCategories;
595   TraceProcessor::MetatraceConfig config;
596   protos::pbzero::EnableMetatraceArgs::Decoder args(data, len);
597   config.categories = MetatraceCategoriesToPublicEnum(
598       static_cast<MetatraceCategories>(args.categories()));
599   trace_processor_->EnableMetatrace(config);
600 }
601 
DisableAndReadMetatrace()602 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
603   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
604   DisableAndReadMetatraceInternal(result.get());
605   return result.SerializeAsArray();
606 }
607 
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)608 void Rpc::DisableAndReadMetatraceInternal(
609     protos::pbzero::DisableAndReadMetatraceResult* result) {
610   std::vector<uint8_t> trace_proto;
611   base::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
612   if (status.ok()) {
613     result->set_metatrace(trace_proto.data(), trace_proto.size());
614   } else {
615     result->set_error(status.message());
616   }
617 }
618 
GetStatus()619 std::vector<uint8_t> Rpc::GetStatus() {
620   protozero::HeapBuffered<protos::pbzero::StatusResult> status;
621   status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
622   status->set_human_readable_version(base::GetVersionString());
623   if (const char* version_code = base::GetVersionCode(); version_code) {
624     status->set_version_code(version_code);
625   }
626   status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
627   return status.SerializeAsArray();
628 }
629 
630 }  // namespace perfetto::trace_processor
631