1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/rpc.h"
18
19 #include <string.h>
20
21 #include <vector>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/time.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/protozero/scattered_heap_buffer.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "perfetto/trace_processor/trace_processor.h"
30 #include "src/protozero/proto_ring_buffer.h"
31 #include "src/trace_processor/rpc/query_result_serializer.h"
32 #include "src/trace_processor/tp_metatrace.h"
33
34 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
35
36 namespace perfetto {
37 namespace trace_processor {
38
39 namespace {
40 // Writes a "Loading trace ..." update every N bytes.
41 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
42 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
43 using RpcProto = protos::pbzero::TraceProcessorRpc;
44
45 // Most RPC messages are either very small or a query results.
46 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
47 // avoid extra heap allocations for the nominal case.
48 constexpr auto kSliceSize =
49 QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
50
51 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
52 // copies by doing direct scattered calls from the fragmented heap buffer onto
53 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
54 // fragmentation anyways). It also takes care of prefixing each message with
55 // the proto preamble and varint size.
56 class Response {
57 public:
58 Response(int64_t seq, int method);
59 Response(const Response&) = delete;
60 Response& operator=(const Response&) = delete;
operator ->()61 RpcProto* operator->() { return msg_; }
62 void Send(Rpc::RpcResponseFunction);
63
64 private:
65 RpcProto* msg_ = nullptr;
66
67 // The reason why we use TraceProcessorRpcStream as root message is because
68 // the RPC wire protocol expects each message to be prefixed with a proto
69 // preamble and varint size. This happens to be the same serialization of a
70 // repeated field (this is really the same trick we use between
71 // Trace and TracePacket in trace.proto)
72 protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
73 };
74
Response(int64_t seq,int method)75 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
76 msg_ = buf_->add_msg();
77 msg_->set_seq(seq);
78 msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
79 }
80
Send(Rpc::RpcResponseFunction send_fn)81 void Response::Send(Rpc::RpcResponseFunction send_fn) {
82 buf_->Finalize();
83 for (const auto& slice : buf_.GetSlices()) {
84 auto range = slice.GetUsedRange();
85 send_fn(range.begin, static_cast<uint32_t>(range.size()));
86 }
87 }
88
89 } // namespace
90
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)91 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
92 : trace_processor_(std::move(preloaded_instance)) {
93 if (!trace_processor_)
94 ResetTraceProcessor();
95 }
96
Rpc()97 Rpc::Rpc() : Rpc(nullptr) {}
98 Rpc::~Rpc() = default;
99
ResetTraceProcessor()100 void Rpc::ResetTraceProcessor() {
101 trace_processor_ = TraceProcessor::CreateInstance(Config());
102 bytes_parsed_ = bytes_last_progress_ = 0;
103 t_parse_started_ = base::GetWallTimeNs().count();
104 // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
105 // This is invoked from the same client to clear the current trace state
106 // before loading a new one. The IPC channel is orthogonal to that and the
107 // message numbering continues regardless of the reset.
108 }
109
OnRpcRequest(const void * data,size_t len)110 void Rpc::OnRpcRequest(const void* data, size_t len) {
111 rxbuf_.Append(data, len);
112 for (;;) {
113 auto msg = rxbuf_.ReadMessage();
114 if (!msg.valid()) {
115 if (msg.fatal_framing_error) {
116 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
117 err_msg->add_msg()->set_fatal_error("RPC framing error");
118 auto err = err_msg.SerializeAsArray();
119 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
120 rpc_response_fn_(nullptr, 0); // Disconnect.
121 }
122 break;
123 }
124 ParseRpcRequest(msg.start, msg.len);
125 }
126 }
127
128 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
129 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)130 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
131 RpcProto::Decoder req(data, len);
132
133 // We allow restarting the sequence from 0. This happens when refreshing the
134 // browser while using the external trace_processor_shell --httpd.
135 if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
136 char err_str[255];
137 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
138 sprintf(err_str,
139 "RPC request out of order. Expected %" PRId64 ", got %" PRId64
140 " (ERR:rpc_seq)",
141 rx_seq_id_ + 1, req.seq());
142 PERFETTO_ELOG("%s", err_str);
143 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
144 err_msg->add_msg()->set_fatal_error(err_str);
145 auto err = err_msg.SerializeAsArray();
146 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
147 rpc_response_fn_(nullptr, 0); // Disconnect.
148 return;
149 }
150 rx_seq_id_ = req.seq();
151
152 // The static cast is to prevent that the compiler breaks future proofness.
153 const int req_type = static_cast<int>(req.request());
154 static const char kErrFieldNotSet[] = "RPC error: request field not set";
155 switch (req_type) {
156 case RpcProto::TPM_APPEND_TRACE_DATA: {
157 Response resp(tx_seq_id_++, req_type);
158 auto* result = resp->set_append_result();
159 if (!req.has_append_trace_data()) {
160 result->set_error(kErrFieldNotSet);
161 } else {
162 protozero::ConstBytes byte_range = req.append_trace_data();
163 util::Status res = Parse(byte_range.data, byte_range.size);
164 if (!res.ok()) {
165 result->set_error(res.message());
166 }
167 }
168 resp.Send(rpc_response_fn_);
169 break;
170 }
171 case RpcProto::TPM_FINALIZE_TRACE_DATA: {
172 Response resp(tx_seq_id_++, req_type);
173 NotifyEndOfFile();
174 resp.Send(rpc_response_fn_);
175 break;
176 }
177 case RpcProto::TPM_QUERY_STREAMING: {
178 if (!req.has_query_args()) {
179 Response resp(tx_seq_id_++, req_type);
180 auto* result = resp->set_query_result();
181 result->set_error(kErrFieldNotSet);
182 resp.Send(rpc_response_fn_);
183 } else {
184 protozero::ConstBytes args = req.query_args();
185 auto it = QueryInternal(args.data, args.size);
186 QueryResultSerializer serializer(std::move(it));
187 for (bool has_more = true; has_more;) {
188 Response resp(tx_seq_id_++, req_type);
189 has_more = serializer.Serialize(resp->set_query_result());
190 resp.Send(rpc_response_fn_);
191 }
192 }
193 break;
194 }
195 case RpcProto::TPM_QUERY_RAW_DEPRECATED: {
196 Response resp(tx_seq_id_++, req_type);
197 auto* result = resp->set_raw_query_result();
198 if (!req.has_raw_query_args()) {
199 result->set_error(kErrFieldNotSet);
200 } else {
201 protozero::ConstBytes args = req.raw_query_args();
202 RawQueryInternal(args.data, args.size, result);
203 }
204 resp.Send(rpc_response_fn_);
205 break;
206 }
207 case RpcProto::TPM_COMPUTE_METRIC: {
208 Response resp(tx_seq_id_++, req_type);
209 auto* result = resp->set_metric_result();
210 if (!req.has_compute_metric_args()) {
211 result->set_error(kErrFieldNotSet);
212 } else {
213 protozero::ConstBytes args = req.compute_metric_args();
214 ComputeMetricInternal(args.data, args.size, result);
215 }
216 resp.Send(rpc_response_fn_);
217 break;
218 }
219 case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
220 Response resp(tx_seq_id_++, req_type);
221 auto descriptor_set = trace_processor_->GetMetricDescriptors();
222 auto* result = resp->set_metric_descriptors();
223 result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
224 resp.Send(rpc_response_fn_);
225 break;
226 }
227 case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
228 trace_processor_->RestoreInitialTables();
229 Response resp(tx_seq_id_++, req_type);
230 resp.Send(rpc_response_fn_);
231 break;
232 }
233 case RpcProto::TPM_ENABLE_METATRACE: {
234 trace_processor_->EnableMetatrace();
235 Response resp(tx_seq_id_++, req_type);
236 resp.Send(rpc_response_fn_);
237 break;
238 }
239 case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
240 Response resp(tx_seq_id_++, req_type);
241 DisableAndReadMetatraceInternal(resp->set_metatrace());
242 resp.Send(rpc_response_fn_);
243 break;
244 }
245 case RpcProto::TPM_GET_STATUS: {
246 Response resp(tx_seq_id_++, req_type);
247 std::vector<uint8_t> status = GetStatus();
248 resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
249 resp.Send(rpc_response_fn_);
250 break;
251 }
252 default: {
253 // This can legitimately happen if the client is newer. We reply with a
254 // generic "unkown request" response, so the client can do feature
255 // detection
256 PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
257 Response resp(tx_seq_id_++, req_type);
258 resp->set_invalid_request(
259 static_cast<RpcProto::TraceProcessorMethod>(req_type));
260 resp.Send(rpc_response_fn_);
261 break;
262 }
263 } // switch(req_type)
264 }
265
Parse(const uint8_t * data,size_t len)266 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
267 if (eof_) {
268 // Reset the trace processor state if another trace has been previously
269 // loaded.
270 ResetTraceProcessor();
271 }
272
273 eof_ = false;
274 bytes_parsed_ += len;
275 MaybePrintProgress();
276
277 if (len == 0)
278 return util::OkStatus();
279
280 // TraceProcessor needs take ownership of the memory chunk.
281 std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
282 memcpy(data_copy.get(), data, len);
283 return trace_processor_->Parse(std::move(data_copy), len);
284 }
285
NotifyEndOfFile()286 void Rpc::NotifyEndOfFile() {
287 trace_processor_->NotifyEndOfFile();
288 eof_ = true;
289 MaybePrintProgress();
290 }
291
MaybePrintProgress()292 void Rpc::MaybePrintProgress() {
293 if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
294 bytes_last_progress_ = bytes_parsed_;
295 auto t_load_s =
296 static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
297 1e9;
298 fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
299 static_cast<double>(bytes_parsed_) / 1e6,
300 static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
301 (eof_ ? "\n" : ""));
302 fflush(stderr);
303 }
304 }
305
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)306 void Rpc::Query(const uint8_t* args,
307 size_t len,
308 QueryResultBatchCallback result_callback) {
309 auto it = QueryInternal(args, len);
310 QueryResultSerializer serializer(std::move(it));
311
312 std::vector<uint8_t> res;
313 for (bool has_more = true; has_more;) {
314 has_more = serializer.Serialize(&res);
315 result_callback(res.data(), res.size(), has_more);
316 res.clear();
317 }
318 }
319
QueryInternal(const uint8_t * args,size_t len)320 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
321 protos::pbzero::RawQueryArgs::Decoder query(args, len);
322 std::string sql = query.sql_query().ToStdString();
323 PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
324 PERFETTO_TP_TRACE("RPC_QUERY",
325 [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
326
327 return trace_processor_->ExecuteQuery(sql.c_str());
328 }
329
RawQuery(const uint8_t * args,size_t len)330 std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
331 protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
332 RawQueryInternal(args, len, result.get());
333 return result.SerializeAsArray();
334 }
335
RawQueryInternal(const uint8_t * args,size_t len,protos::pbzero::RawQueryResult * result)336 void Rpc::RawQueryInternal(const uint8_t* args,
337 size_t len,
338 protos::pbzero::RawQueryResult* result) {
339 using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
340 using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
341
342 protos::pbzero::RawQueryArgs::Decoder query(args, len);
343 std::string sql = query.sql_query().ToStdString();
344 PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
345 PERFETTO_TP_TRACE("RPC_RAW_QUERY",
346 [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
347
348 auto it = trace_processor_->ExecuteQuery(sql.c_str());
349
350 // This vector contains a standalone protozero message per column. The problem
351 // it's solving is the following: (i) sqlite iterators are row-based; (ii) the
352 // RawQueryResult proto is column-based (that was a poor design choice we
353 // should revisit at some point); (iii) the protozero API doesn't allow to
354 // begin a new nested message before the previous one is completed.
355 // In order to avoid the interleaved-writing, we write each column in a
356 // dedicated heap buffer and then we merge all the column data at the end,
357 // after having iterated all rows.
358 std::vector<protozero::HeapBuffered<ColumnValues>> cols(it.ColumnCount());
359
360 // This constexpr is to avoid ODR-use of protozero constants which are only
361 // declared but not defined. Putting directly UNKONWN in the vector ctor
362 // causes a linker error in the WASM toolchain.
363 static constexpr auto kUnknown = ColumnDesc::UNKNOWN;
364 std::vector<ColumnDesc::Type> col_types(it.ColumnCount(), kUnknown);
365 uint32_t rows = 0;
366
367 for (; it.Next(); ++rows) {
368 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
369 auto& col = cols[col_idx];
370 auto& col_type = col_types[col_idx];
371
372 using SqlValue = trace_processor::SqlValue;
373 auto cell = it.Get(col_idx);
374 if (col_type == ColumnDesc::UNKNOWN) {
375 switch (cell.type) {
376 case SqlValue::Type::kLong:
377 col_type = ColumnDesc::LONG;
378 break;
379 case SqlValue::Type::kString:
380 col_type = ColumnDesc::STRING;
381 break;
382 case SqlValue::Type::kDouble:
383 col_type = ColumnDesc::DOUBLE;
384 break;
385 case SqlValue::Type::kBytes:
386 col_type = ColumnDesc::STRING;
387 break;
388 case SqlValue::Type::kNull:
389 break;
390 }
391 }
392
393 // If either the column type is null or we still don't know the type,
394 // just add null values to all the columns.
395 if (cell.type == SqlValue::Type::kNull ||
396 col_type == ColumnDesc::UNKNOWN) {
397 col->add_long_values(0);
398 col->add_string_values("[NULL]");
399 col->add_double_values(0);
400 col->add_is_nulls(true);
401 continue;
402 }
403
404 // Cast the sqlite value to the type of the column.
405 switch (col_type) {
406 case ColumnDesc::LONG:
407 PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
408 cell.type == SqlValue::Type::kDouble);
409 if (cell.type == SqlValue::Type::kLong) {
410 col->add_long_values(cell.long_value);
411 } else /* if (cell.type == SqlValue::Type::kDouble) */ {
412 col->add_long_values(static_cast<int64_t>(cell.double_value));
413 }
414 col->add_is_nulls(false);
415 break;
416 case ColumnDesc::STRING: {
417 if (cell.type == SqlValue::Type::kBytes) {
418 col->add_string_values("<bytes>");
419 } else {
420 PERFETTO_CHECK(cell.type == SqlValue::Type::kString);
421 col->add_string_values(cell.string_value);
422 }
423 col->add_is_nulls(false);
424 break;
425 }
426 case ColumnDesc::DOUBLE:
427 PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
428 cell.type == SqlValue::Type::kDouble);
429 if (cell.type == SqlValue::Type::kLong) {
430 col->add_double_values(static_cast<double>(cell.long_value));
431 } else /* if (cell.type == SqlValue::Type::kDouble) */ {
432 col->add_double_values(cell.double_value);
433 }
434 col->add_is_nulls(false);
435 break;
436 case ColumnDesc::UNKNOWN:
437 PERFETTO_FATAL("Handled in if statement above.");
438 }
439 } // for(col)
440 } // for(row)
441
442 // Write the column descriptors.
443 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
444 auto* descriptor = result->add_column_descriptors();
445 std::string col_name = it.GetColumnName(col_idx);
446 descriptor->set_name(col_name.data(), col_name.size());
447 descriptor->set_type(col_types[col_idx]);
448 }
449
450 // Merge the column values.
451 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
452 std::vector<uint8_t> col_data = cols[col_idx].SerializeAsArray();
453 result->AppendBytes(protos::pbzero::RawQueryResult::kColumnsFieldNumber,
454 col_data.data(), col_data.size());
455 }
456
457 util::Status status = it.Status();
458 result->set_num_records(rows);
459 if (!status.ok())
460 result->set_error(status.c_message());
461 PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
462 }
463
RestoreInitialTables()464 void Rpc::RestoreInitialTables() {
465 trace_processor_->RestoreInitialTables();
466 }
467
ComputeMetric(const uint8_t * args,size_t len)468 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
469 protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
470 ComputeMetricInternal(args, len, result.get());
471 return result.SerializeAsArray();
472 }
473
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)474 void Rpc::ComputeMetricInternal(const uint8_t* data,
475 size_t len,
476 protos::pbzero::ComputeMetricResult* result) {
477 protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
478 std::vector<std::string> metric_names;
479 for (auto it = args.metric_names(); it; ++it) {
480 metric_names.emplace_back(it->as_std_string());
481 }
482
483 PERFETTO_TP_TRACE("RPC_COMPUTE_METRIC", [&](metatrace::Record* r) {
484 for (const auto& metric : metric_names) {
485 r->AddArg("Metric", metric);
486 r->AddArg("Format", std::to_string(args.format()));
487 }
488 });
489
490 PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
491 metric_names.empty() ? "" : metric_names.front().c_str(),
492 args.format());
493 switch (args.format()) {
494 case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
495 std::vector<uint8_t> metrics_proto;
496 util::Status status =
497 trace_processor_->ComputeMetric(metric_names, &metrics_proto);
498 if (status.ok()) {
499 result->set_metrics(metrics_proto.data(), metrics_proto.size());
500 } else {
501 result->set_error(status.message());
502 }
503 break;
504 }
505 case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
506 std::string metrics_string;
507 util::Status status = trace_processor_->ComputeMetricText(
508 metric_names, TraceProcessor::MetricResultFormat::kProtoText,
509 &metrics_string);
510 if (status.ok()) {
511 result->set_metrics_as_prototext(metrics_string);
512 } else {
513 result->set_error(status.message());
514 }
515 break;
516 }
517 }
518 }
519
EnableMetatrace()520 void Rpc::EnableMetatrace() {
521 trace_processor_->EnableMetatrace();
522 }
523
DisableAndReadMetatrace()524 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
525 protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
526 DisableAndReadMetatraceInternal(result.get());
527 return result.SerializeAsArray();
528 }
529
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)530 void Rpc::DisableAndReadMetatraceInternal(
531 protos::pbzero::DisableAndReadMetatraceResult* result) {
532 std::vector<uint8_t> trace_proto;
533 util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
534 if (status.ok()) {
535 result->set_metatrace(trace_proto.data(), trace_proto.size());
536 } else {
537 result->set_error(status.message());
538 }
539 }
540
GetStatus()541 std::vector<uint8_t> Rpc::GetStatus() {
542 protozero::HeapBuffered<protos::pbzero::StatusResult> status;
543 status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
544 status->set_human_readable_version(base::GetVersionString());
545 status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
546 return status.SerializeAsArray();
547 }
548
549 } // namespace trace_processor
550 } // namespace perfetto
551