• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/rpc.h"
18 
19 #include <string.h>
20 
21 #include <vector>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/time.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/protozero/scattered_heap_buffer.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "perfetto/trace_processor/trace_processor.h"
30 #include "src/protozero/proto_ring_buffer.h"
31 #include "src/trace_processor/rpc/query_result_serializer.h"
32 #include "src/trace_processor/tp_metatrace.h"
33 
34 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
35 
36 namespace perfetto {
37 namespace trace_processor {
38 
39 namespace {
40 // Writes a "Loading trace ..." update every N bytes.
41 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
42 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
43 using RpcProto = protos::pbzero::TraceProcessorRpc;
44 
45 // Most RPC messages are either very small or a query results.
46 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
47 // avoid extra heap allocations for the nominal case.
48 constexpr auto kSliceSize =
49     QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
50 
51 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
52 // copies by doing direct scattered calls from the fragmented heap buffer onto
53 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
54 // fragmentation anyways). It also takes care of prefixing each message with
55 // the proto preamble and varint size.
56 class Response {
57  public:
58   Response(int64_t seq, int method);
59   Response(const Response&) = delete;
60   Response& operator=(const Response&) = delete;
operator ->()61   RpcProto* operator->() { return msg_; }
62   void Send(Rpc::RpcResponseFunction);
63 
64  private:
65   RpcProto* msg_ = nullptr;
66 
67   // The reason why we use TraceProcessorRpcStream as root message is because
68   // the RPC wire protocol expects each message to be prefixed with a proto
69   // preamble and varint size. This happens to be the same serialization of a
70   // repeated field (this is really the same trick we use between
71   // Trace and TracePacket in trace.proto)
72   protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
73 };
74 
Response(int64_t seq,int method)75 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
76   msg_ = buf_->add_msg();
77   msg_->set_seq(seq);
78   msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
79 }
80 
Send(Rpc::RpcResponseFunction send_fn)81 void Response::Send(Rpc::RpcResponseFunction send_fn) {
82   buf_->Finalize();
83   for (const auto& slice : buf_.GetSlices()) {
84     auto range = slice.GetUsedRange();
85     send_fn(range.begin, static_cast<uint32_t>(range.size()));
86   }
87 }
88 
89 }  // namespace
90 
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)91 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
92     : trace_processor_(std::move(preloaded_instance)) {
93   if (!trace_processor_)
94     ResetTraceProcessor();
95 }
96 
Rpc()97 Rpc::Rpc() : Rpc(nullptr) {}
98 Rpc::~Rpc() = default;
99 
ResetTraceProcessor()100 void Rpc::ResetTraceProcessor() {
101   trace_processor_ = TraceProcessor::CreateInstance(Config());
102   bytes_parsed_ = bytes_last_progress_ = 0;
103   t_parse_started_ = base::GetWallTimeNs().count();
104   // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
105   // This is invoked from the same client to clear the current trace state
106   // before loading a new one. The IPC channel is orthogonal to that and the
107   // message numbering continues regardless of the reset.
108 }
109 
OnRpcRequest(const void * data,size_t len)110 void Rpc::OnRpcRequest(const void* data, size_t len) {
111   rxbuf_.Append(data, len);
112   for (;;) {
113     auto msg = rxbuf_.ReadMessage();
114     if (!msg.valid()) {
115       if (msg.fatal_framing_error) {
116         protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
117         err_msg->add_msg()->set_fatal_error("RPC framing error");
118         auto err = err_msg.SerializeAsArray();
119         rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
120         rpc_response_fn_(nullptr, 0);  // Disconnect.
121       }
122       break;
123     }
124     ParseRpcRequest(msg.start, msg.len);
125   }
126 }
127 
128 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
129 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)130 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
131   RpcProto::Decoder req(data, len);
132 
133   // We allow restarting the sequence from 0. This happens when refreshing the
134   // browser while using the external trace_processor_shell --httpd.
135   if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
136     char err_str[255];
137     // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
138     sprintf(err_str,
139             "RPC request out of order. Expected %" PRId64 ", got %" PRId64
140             " (ERR:rpc_seq)",
141             rx_seq_id_ + 1, req.seq());
142     PERFETTO_ELOG("%s", err_str);
143     protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
144     err_msg->add_msg()->set_fatal_error(err_str);
145     auto err = err_msg.SerializeAsArray();
146     rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
147     rpc_response_fn_(nullptr, 0);  // Disconnect.
148     return;
149   }
150   rx_seq_id_ = req.seq();
151 
152   // The static cast is to prevent that the compiler breaks future proofness.
153   const int req_type = static_cast<int>(req.request());
154   static const char kErrFieldNotSet[] = "RPC error: request field not set";
155   switch (req_type) {
156     case RpcProto::TPM_APPEND_TRACE_DATA: {
157       Response resp(tx_seq_id_++, req_type);
158       auto* result = resp->set_append_result();
159       if (!req.has_append_trace_data()) {
160         result->set_error(kErrFieldNotSet);
161       } else {
162         protozero::ConstBytes byte_range = req.append_trace_data();
163         util::Status res = Parse(byte_range.data, byte_range.size);
164         if (!res.ok()) {
165           result->set_error(res.message());
166         }
167       }
168       resp.Send(rpc_response_fn_);
169       break;
170     }
171     case RpcProto::TPM_FINALIZE_TRACE_DATA: {
172       Response resp(tx_seq_id_++, req_type);
173       NotifyEndOfFile();
174       resp.Send(rpc_response_fn_);
175       break;
176     }
177     case RpcProto::TPM_QUERY_STREAMING: {
178       if (!req.has_query_args()) {
179         Response resp(tx_seq_id_++, req_type);
180         auto* result = resp->set_query_result();
181         result->set_error(kErrFieldNotSet);
182         resp.Send(rpc_response_fn_);
183       } else {
184         protozero::ConstBytes args = req.query_args();
185         auto it = QueryInternal(args.data, args.size);
186         QueryResultSerializer serializer(std::move(it));
187         for (bool has_more = true; has_more;) {
188           Response resp(tx_seq_id_++, req_type);
189           has_more = serializer.Serialize(resp->set_query_result());
190           resp.Send(rpc_response_fn_);
191         }
192       }
193       break;
194     }
195     case RpcProto::TPM_QUERY_RAW_DEPRECATED: {
196       Response resp(tx_seq_id_++, req_type);
197       auto* result = resp->set_raw_query_result();
198       if (!req.has_raw_query_args()) {
199         result->set_error(kErrFieldNotSet);
200       } else {
201         protozero::ConstBytes args = req.raw_query_args();
202         RawQueryInternal(args.data, args.size, result);
203       }
204       resp.Send(rpc_response_fn_);
205       break;
206     }
207     case RpcProto::TPM_COMPUTE_METRIC: {
208       Response resp(tx_seq_id_++, req_type);
209       auto* result = resp->set_metric_result();
210       if (!req.has_compute_metric_args()) {
211         result->set_error(kErrFieldNotSet);
212       } else {
213         protozero::ConstBytes args = req.compute_metric_args();
214         ComputeMetricInternal(args.data, args.size, result);
215       }
216       resp.Send(rpc_response_fn_);
217       break;
218     }
219     case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
220       Response resp(tx_seq_id_++, req_type);
221       auto descriptor_set = trace_processor_->GetMetricDescriptors();
222       auto* result = resp->set_metric_descriptors();
223       result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
224       resp.Send(rpc_response_fn_);
225       break;
226     }
227     case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
228       trace_processor_->RestoreInitialTables();
229       Response resp(tx_seq_id_++, req_type);
230       resp.Send(rpc_response_fn_);
231       break;
232     }
233     case RpcProto::TPM_ENABLE_METATRACE: {
234       trace_processor_->EnableMetatrace();
235       Response resp(tx_seq_id_++, req_type);
236       resp.Send(rpc_response_fn_);
237       break;
238     }
239     case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
240       Response resp(tx_seq_id_++, req_type);
241       DisableAndReadMetatraceInternal(resp->set_metatrace());
242       resp.Send(rpc_response_fn_);
243       break;
244     }
245     case RpcProto::TPM_GET_STATUS: {
246       Response resp(tx_seq_id_++, req_type);
247       std::vector<uint8_t> status = GetStatus();
248       resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
249       resp.Send(rpc_response_fn_);
250       break;
251     }
252     default: {
253       // This can legitimately happen if the client is newer. We reply with a
254       // generic "unkown request" response, so the client can do feature
255       // detection
256       PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
257       Response resp(tx_seq_id_++, req_type);
258       resp->set_invalid_request(
259           static_cast<RpcProto::TraceProcessorMethod>(req_type));
260       resp.Send(rpc_response_fn_);
261       break;
262     }
263   }  // switch(req_type)
264 }
265 
Parse(const uint8_t * data,size_t len)266 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
267   if (eof_) {
268     // Reset the trace processor state if another trace has been previously
269     // loaded.
270     ResetTraceProcessor();
271   }
272 
273   eof_ = false;
274   bytes_parsed_ += len;
275   MaybePrintProgress();
276 
277   if (len == 0)
278     return util::OkStatus();
279 
280   // TraceProcessor needs take ownership of the memory chunk.
281   std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
282   memcpy(data_copy.get(), data, len);
283   return trace_processor_->Parse(std::move(data_copy), len);
284 }
285 
NotifyEndOfFile()286 void Rpc::NotifyEndOfFile() {
287   trace_processor_->NotifyEndOfFile();
288   eof_ = true;
289   MaybePrintProgress();
290 }
291 
MaybePrintProgress()292 void Rpc::MaybePrintProgress() {
293   if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
294     bytes_last_progress_ = bytes_parsed_;
295     auto t_load_s =
296         static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
297         1e9;
298     fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
299             static_cast<double>(bytes_parsed_) / 1e6,
300             static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
301             (eof_ ? "\n" : ""));
302     fflush(stderr);
303   }
304 }
305 
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)306 void Rpc::Query(const uint8_t* args,
307                 size_t len,
308                 QueryResultBatchCallback result_callback) {
309   auto it = QueryInternal(args, len);
310   QueryResultSerializer serializer(std::move(it));
311 
312   std::vector<uint8_t> res;
313   for (bool has_more = true; has_more;) {
314     has_more = serializer.Serialize(&res);
315     result_callback(res.data(), res.size(), has_more);
316     res.clear();
317   }
318 }
319 
QueryInternal(const uint8_t * args,size_t len)320 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
321   protos::pbzero::RawQueryArgs::Decoder query(args, len);
322   std::string sql = query.sql_query().ToStdString();
323   PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
324   PERFETTO_TP_TRACE("RPC_QUERY",
325                     [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
326 
327   return trace_processor_->ExecuteQuery(sql.c_str());
328 }
329 
RawQuery(const uint8_t * args,size_t len)330 std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
331   protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
332   RawQueryInternal(args, len, result.get());
333   return result.SerializeAsArray();
334 }
335 
RawQueryInternal(const uint8_t * args,size_t len,protos::pbzero::RawQueryResult * result)336 void Rpc::RawQueryInternal(const uint8_t* args,
337                            size_t len,
338                            protos::pbzero::RawQueryResult* result) {
339   using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
340   using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
341 
342   protos::pbzero::RawQueryArgs::Decoder query(args, len);
343   std::string sql = query.sql_query().ToStdString();
344   PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
345   PERFETTO_TP_TRACE("RPC_RAW_QUERY",
346                     [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
347 
348   auto it = trace_processor_->ExecuteQuery(sql.c_str());
349 
350   // This vector contains a standalone protozero message per column. The problem
351   // it's solving is the following: (i) sqlite iterators are row-based; (ii) the
352   // RawQueryResult proto is column-based (that was a poor design choice we
353   // should revisit at some point); (iii) the protozero API doesn't allow to
354   // begin a new nested message before the previous one is completed.
355   // In order to avoid the interleaved-writing, we write each column in a
356   // dedicated heap buffer and then we merge all the column data at the end,
357   // after having iterated all rows.
358   std::vector<protozero::HeapBuffered<ColumnValues>> cols(it.ColumnCount());
359 
360   // This constexpr is to avoid ODR-use of protozero constants which are only
361   // declared but not defined. Putting directly UNKONWN in the vector ctor
362   // causes a linker error in the WASM toolchain.
363   static constexpr auto kUnknown = ColumnDesc::UNKNOWN;
364   std::vector<ColumnDesc::Type> col_types(it.ColumnCount(), kUnknown);
365   uint32_t rows = 0;
366 
367   for (; it.Next(); ++rows) {
368     for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
369       auto& col = cols[col_idx];
370       auto& col_type = col_types[col_idx];
371 
372       using SqlValue = trace_processor::SqlValue;
373       auto cell = it.Get(col_idx);
374       if (col_type == ColumnDesc::UNKNOWN) {
375         switch (cell.type) {
376           case SqlValue::Type::kLong:
377             col_type = ColumnDesc::LONG;
378             break;
379           case SqlValue::Type::kString:
380             col_type = ColumnDesc::STRING;
381             break;
382           case SqlValue::Type::kDouble:
383             col_type = ColumnDesc::DOUBLE;
384             break;
385           case SqlValue::Type::kBytes:
386             col_type = ColumnDesc::STRING;
387             break;
388           case SqlValue::Type::kNull:
389             break;
390         }
391       }
392 
393       // If either the column type is null or we still don't know the type,
394       // just add null values to all the columns.
395       if (cell.type == SqlValue::Type::kNull ||
396           col_type == ColumnDesc::UNKNOWN) {
397         col->add_long_values(0);
398         col->add_string_values("[NULL]");
399         col->add_double_values(0);
400         col->add_is_nulls(true);
401         continue;
402       }
403 
404       // Cast the sqlite value to the type of the column.
405       switch (col_type) {
406         case ColumnDesc::LONG:
407           PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
408                          cell.type == SqlValue::Type::kDouble);
409           if (cell.type == SqlValue::Type::kLong) {
410             col->add_long_values(cell.long_value);
411           } else /* if (cell.type == SqlValue::Type::kDouble) */ {
412             col->add_long_values(static_cast<int64_t>(cell.double_value));
413           }
414           col->add_is_nulls(false);
415           break;
416         case ColumnDesc::STRING: {
417           if (cell.type == SqlValue::Type::kBytes) {
418             col->add_string_values("<bytes>");
419           } else {
420             PERFETTO_CHECK(cell.type == SqlValue::Type::kString);
421             col->add_string_values(cell.string_value);
422           }
423           col->add_is_nulls(false);
424           break;
425         }
426         case ColumnDesc::DOUBLE:
427           PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
428                          cell.type == SqlValue::Type::kDouble);
429           if (cell.type == SqlValue::Type::kLong) {
430             col->add_double_values(static_cast<double>(cell.long_value));
431           } else /* if (cell.type == SqlValue::Type::kDouble) */ {
432             col->add_double_values(cell.double_value);
433           }
434           col->add_is_nulls(false);
435           break;
436         case ColumnDesc::UNKNOWN:
437           PERFETTO_FATAL("Handled in if statement above.");
438       }
439     }  // for(col)
440   }    // for(row)
441 
442   // Write the column descriptors.
443   for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
444     auto* descriptor = result->add_column_descriptors();
445     std::string col_name = it.GetColumnName(col_idx);
446     descriptor->set_name(col_name.data(), col_name.size());
447     descriptor->set_type(col_types[col_idx]);
448   }
449 
450   // Merge the column values.
451   for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
452     std::vector<uint8_t> col_data = cols[col_idx].SerializeAsArray();
453     result->AppendBytes(protos::pbzero::RawQueryResult::kColumnsFieldNumber,
454                         col_data.data(), col_data.size());
455   }
456 
457   util::Status status = it.Status();
458   result->set_num_records(rows);
459   if (!status.ok())
460     result->set_error(status.c_message());
461   PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
462 }
463 
RestoreInitialTables()464 void Rpc::RestoreInitialTables() {
465   trace_processor_->RestoreInitialTables();
466 }
467 
ComputeMetric(const uint8_t * args,size_t len)468 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
469   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
470   ComputeMetricInternal(args, len, result.get());
471   return result.SerializeAsArray();
472 }
473 
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)474 void Rpc::ComputeMetricInternal(const uint8_t* data,
475                                 size_t len,
476                                 protos::pbzero::ComputeMetricResult* result) {
477   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
478   std::vector<std::string> metric_names;
479   for (auto it = args.metric_names(); it; ++it) {
480     metric_names.emplace_back(it->as_std_string());
481   }
482 
483   PERFETTO_TP_TRACE("RPC_COMPUTE_METRIC", [&](metatrace::Record* r) {
484     for (const auto& metric : metric_names) {
485       r->AddArg("Metric", metric);
486       r->AddArg("Format", std::to_string(args.format()));
487     }
488   });
489 
490   PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
491                 metric_names.empty() ? "" : metric_names.front().c_str(),
492                 args.format());
493   switch (args.format()) {
494     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
495       std::vector<uint8_t> metrics_proto;
496       util::Status status =
497           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
498       if (status.ok()) {
499         result->set_metrics(metrics_proto.data(), metrics_proto.size());
500       } else {
501         result->set_error(status.message());
502       }
503       break;
504     }
505     case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
506       std::string metrics_string;
507       util::Status status = trace_processor_->ComputeMetricText(
508           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
509           &metrics_string);
510       if (status.ok()) {
511         result->set_metrics_as_prototext(metrics_string);
512       } else {
513         result->set_error(status.message());
514       }
515       break;
516     }
517   }
518 }
519 
EnableMetatrace()520 void Rpc::EnableMetatrace() {
521   trace_processor_->EnableMetatrace();
522 }
523 
DisableAndReadMetatrace()524 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
525   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
526   DisableAndReadMetatraceInternal(result.get());
527   return result.SerializeAsArray();
528 }
529 
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)530 void Rpc::DisableAndReadMetatraceInternal(
531     protos::pbzero::DisableAndReadMetatraceResult* result) {
532   std::vector<uint8_t> trace_proto;
533   util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
534   if (status.ok()) {
535     result->set_metatrace(trace_proto.data(), trace_proto.size());
536   } else {
537     result->set_error(status.message());
538   }
539 }
540 
GetStatus()541 std::vector<uint8_t> Rpc::GetStatus() {
542   protozero::HeapBuffered<protos::pbzero::StatusResult> status;
543   status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
544   status->set_human_readable_version(base::GetVersionString());
545   status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
546   return status.SerializeAsArray();
547 }
548 
549 }  // namespace trace_processor
550 }  // namespace perfetto
551