1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/rpc.h"
18
19 #include <cinttypes>
20 #include <cstdint>
21 #include <cstdio>
22 #include <cstring>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/status.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/ext/base/version.h"
32 #include "perfetto/ext/protozero/proto_ring_buffer.h"
33 #include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
34 #include "perfetto/protozero/field.h"
35 #include "perfetto/protozero/scattered_heap_buffer.h"
36 #include "perfetto/trace_processor/basic_types.h"
37 #include "perfetto/trace_processor/metatrace_config.h"
38 #include "perfetto/trace_processor/trace_processor.h"
39 #include "src/trace_processor/tp_metatrace.h"
40
41 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
42 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
43
44 namespace perfetto::trace_processor {
45
46 namespace {
47 // Writes a "Loading trace ..." update every N bytes.
48 constexpr size_t kProgressUpdateBytes = 50ul * 1000 * 1000;
49 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
50 using RpcProto = protos::pbzero::TraceProcessorRpc;
51
52 // Most RPC messages are either very small or a query results.
53 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
54 // avoid extra heap allocations for the nominal case.
55 constexpr auto kSliceSize =
56 QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
57
58 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
59 // copies by doing direct scattered calls from the fragmented heap buffer onto
60 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
61 // fragmentation anyways). It also takes care of prefixing each message with
62 // the proto preamble and varint size.
63 class Response {
64 public:
65 Response(int64_t seq, int method);
66 Response(const Response&) = delete;
67 Response& operator=(const Response&) = delete;
operator ->()68 RpcProto* operator->() { return msg_; }
69 void Send(Rpc::RpcResponseFunction);
70
71 private:
72 RpcProto* msg_ = nullptr;
73
74 // The reason why we use TraceProcessorRpcStream as root message is because
75 // the RPC wire protocol expects each message to be prefixed with a proto
76 // preamble and varint size. This happens to be the same serialization of a
77 // repeated field (this is really the same trick we use between
78 // Trace and TracePacket in trace.proto)
79 protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
80 };
81
Response(int64_t seq,int method)82 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
83 msg_ = buf_->add_msg();
84 msg_->set_seq(seq);
85 msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
86 }
87
Send(Rpc::RpcResponseFunction send_fn)88 void Response::Send(Rpc::RpcResponseFunction send_fn) {
89 buf_->Finalize();
90 for (const auto& slice : buf_.GetSlices()) {
91 auto range = slice.GetUsedRange();
92 send_fn(range.begin, static_cast<uint32_t>(range.size()));
93 }
94 }
95
96 } // namespace
97
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)98 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
99 : trace_processor_(std::move(preloaded_instance)) {
100 if (!trace_processor_)
101 ResetTraceProcessorInternal(Config());
102 }
103
Rpc()104 Rpc::Rpc() : Rpc(nullptr) {}
105 Rpc::~Rpc() = default;
106
ResetTraceProcessorInternal(const Config & config)107 void Rpc::ResetTraceProcessorInternal(const Config& config) {
108 trace_processor_config_ = config;
109 trace_processor_ = TraceProcessor::CreateInstance(config);
110 bytes_parsed_ = bytes_last_progress_ = 0;
111 t_parse_started_ = base::GetWallTimeNs().count();
112 // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
113 // This is invoked from the same client to clear the current trace state
114 // before loading a new one. The IPC channel is orthogonal to that and the
115 // message numbering continues regardless of the reset.
116 }
117
OnRpcRequest(const void * data,size_t len)118 void Rpc::OnRpcRequest(const void* data, size_t len) {
119 rxbuf_.Append(data, len);
120 for (;;) {
121 auto msg = rxbuf_.ReadMessage();
122 if (!msg.valid()) {
123 if (msg.fatal_framing_error) {
124 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
125 err_msg->add_msg()->set_fatal_error("RPC framing error");
126 auto err = err_msg.SerializeAsArray();
127 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
128 rpc_response_fn_(nullptr, 0); // Disconnect.
129 }
130 break;
131 }
132 ParseRpcRequest(msg.start, msg.len);
133 }
134 }
135
136 namespace {
137
138 using ProtoEnum = protos::pbzero::MetatraceCategories;
MetatraceCategoriesToPublicEnum(ProtoEnum categories)139 TraceProcessor::MetatraceCategories MetatraceCategoriesToPublicEnum(
140 ProtoEnum categories) {
141 TraceProcessor::MetatraceCategories result =
142 TraceProcessor::MetatraceCategories::NONE;
143 if (categories & ProtoEnum::QUERY_TIMELINE) {
144 result = static_cast<TraceProcessor::MetatraceCategories>(
145 result | TraceProcessor::MetatraceCategories::QUERY_TIMELINE);
146 }
147 if (categories & ProtoEnum::QUERY_DETAILED) {
148 result = static_cast<TraceProcessor::MetatraceCategories>(
149 result | TraceProcessor::MetatraceCategories::QUERY_DETAILED);
150 }
151 if (categories & ProtoEnum::FUNCTION_CALL) {
152 result = static_cast<TraceProcessor::MetatraceCategories>(
153 result | TraceProcessor::MetatraceCategories::FUNCTION_CALL);
154 }
155 if (categories & ProtoEnum::DB) {
156 result = static_cast<TraceProcessor::MetatraceCategories>(
157 result | TraceProcessor::MetatraceCategories::DB);
158 }
159 if (categories & ProtoEnum::API_TIMELINE) {
160 result = static_cast<TraceProcessor::MetatraceCategories>(
161 result | TraceProcessor::MetatraceCategories::API_TIMELINE);
162 }
163 return result;
164 }
165
166 } // namespace
167
168 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
169 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)170 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
171 RpcProto::Decoder req(data, len);
172
173 // We allow restarting the sequence from 0. This happens when refreshing the
174 // browser while using the external trace_processor_shell --httpd.
175 if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
176 char err_str[255];
177 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
178 snprintf(err_str, sizeof(err_str),
179 "RPC request out of order. Expected %" PRId64 ", got %" PRId64
180 " (ERR:rpc_seq)",
181 rx_seq_id_ + 1, req.seq());
182 PERFETTO_ELOG("%s", err_str);
183 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
184 err_msg->add_msg()->set_fatal_error(err_str);
185 auto err = err_msg.SerializeAsArray();
186 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
187 rpc_response_fn_(nullptr, 0); // Disconnect.
188 return;
189 }
190 rx_seq_id_ = req.seq();
191
192 // The static cast is to prevent that the compiler breaks future proofness.
193 const int req_type = static_cast<int>(req.request());
194 static const char kErrFieldNotSet[] = "RPC error: request field not set";
195 switch (req_type) {
196 case RpcProto::TPM_APPEND_TRACE_DATA: {
197 Response resp(tx_seq_id_++, req_type);
198 auto* result = resp->set_append_result();
199 if (!req.has_append_trace_data()) {
200 result->set_error(kErrFieldNotSet);
201 } else {
202 protozero::ConstBytes byte_range = req.append_trace_data();
203 base::Status res = Parse(byte_range.data, byte_range.size);
204 if (!res.ok()) {
205 result->set_error(res.message());
206 }
207 }
208 resp.Send(rpc_response_fn_);
209 break;
210 }
211 case RpcProto::TPM_FINALIZE_TRACE_DATA: {
212 Response resp(tx_seq_id_++, req_type);
213 NotifyEndOfFile();
214 resp.Send(rpc_response_fn_);
215 break;
216 }
217 case RpcProto::TPM_QUERY_STREAMING: {
218 if (!req.has_query_args()) {
219 Response resp(tx_seq_id_++, req_type);
220 auto* result = resp->set_query_result();
221 result->set_error(kErrFieldNotSet);
222 resp.Send(rpc_response_fn_);
223 } else {
224 protozero::ConstBytes args = req.query_args();
225 auto it = QueryInternal(args.data, args.size);
226 QueryResultSerializer serializer(std::move(it));
227 for (bool has_more = true; has_more;) {
228 Response resp(tx_seq_id_++, req_type);
229 has_more = serializer.Serialize(resp->set_query_result());
230 resp.Send(rpc_response_fn_);
231 }
232 }
233 break;
234 }
235 case RpcProto::TPM_COMPUTE_METRIC: {
236 Response resp(tx_seq_id_++, req_type);
237 auto* result = resp->set_metric_result();
238 if (!req.has_compute_metric_args()) {
239 result->set_error(kErrFieldNotSet);
240 } else {
241 protozero::ConstBytes args = req.compute_metric_args();
242 ComputeMetricInternal(args.data, args.size, result);
243 }
244 resp.Send(rpc_response_fn_);
245 break;
246 }
247 case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
248 Response resp(tx_seq_id_++, req_type);
249 auto descriptor_set = trace_processor_->GetMetricDescriptors();
250 auto* result = resp->set_metric_descriptors();
251 result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
252 resp.Send(rpc_response_fn_);
253 break;
254 }
255 case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
256 trace_processor_->RestoreInitialTables();
257 Response resp(tx_seq_id_++, req_type);
258 resp.Send(rpc_response_fn_);
259 break;
260 }
261 case RpcProto::TPM_ENABLE_METATRACE: {
262 using protos::pbzero::MetatraceCategories;
263 protozero::ConstBytes args = req.enable_metatrace_args();
264 EnableMetatrace(args.data, args.size);
265
266 Response resp(tx_seq_id_++, req_type);
267 resp.Send(rpc_response_fn_);
268 break;
269 }
270 case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
271 Response resp(tx_seq_id_++, req_type);
272 DisableAndReadMetatraceInternal(resp->set_metatrace());
273 resp.Send(rpc_response_fn_);
274 break;
275 }
276 case RpcProto::TPM_GET_STATUS: {
277 Response resp(tx_seq_id_++, req_type);
278 std::vector<uint8_t> status = GetStatus();
279 resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
280 resp.Send(rpc_response_fn_);
281 break;
282 }
283 case RpcProto::TPM_RESET_TRACE_PROCESSOR: {
284 Response resp(tx_seq_id_++, req_type);
285 protozero::ConstBytes args = req.reset_trace_processor_args();
286 ResetTraceProcessor(args.data, args.size);
287 resp.Send(rpc_response_fn_);
288 break;
289 }
290 default: {
291 // This can legitimately happen if the client is newer. We reply with a
292 // generic "unkown request" response, so the client can do feature
293 // detection
294 PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
295 Response resp(tx_seq_id_++, req_type);
296 resp->set_invalid_request(
297 static_cast<RpcProto::TraceProcessorMethod>(req_type));
298 resp.Send(rpc_response_fn_);
299 break;
300 }
301 } // switch(req_type)
302 }
303
Parse(const uint8_t * data,size_t len)304 base::Status Rpc::Parse(const uint8_t* data, size_t len) {
305 PERFETTO_TP_TRACE(
306 metatrace::Category::API_TIMELINE, "RPC_PARSE",
307 [&](metatrace::Record* r) { r->AddArg("length", std::to_string(len)); });
308 if (eof_) {
309 // Reset the trace processor state if another trace has been previously
310 // loaded. Use the same TraceProcessor Config.
311 ResetTraceProcessorInternal(trace_processor_config_);
312 }
313
314 eof_ = false;
315 bytes_parsed_ += len;
316 MaybePrintProgress();
317
318 if (len == 0)
319 return base::OkStatus();
320
321 // TraceProcessor needs take ownership of the memory chunk.
322 std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
323 memcpy(data_copy.get(), data, len);
324 return trace_processor_->Parse(std::move(data_copy), len);
325 }
326
NotifyEndOfFile()327 void Rpc::NotifyEndOfFile() {
328 PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE,
329 "RPC_NOTIFY_END_OF_FILE");
330
331 trace_processor_->NotifyEndOfFile();
332 eof_ = true;
333 MaybePrintProgress();
334 }
335
ResetTraceProcessor(const uint8_t * args,size_t len)336 void Rpc::ResetTraceProcessor(const uint8_t* args, size_t len) {
337 protos::pbzero::ResetTraceProcessorArgs::Decoder reset_trace_processor_args(
338 args, len);
339 Config config;
340 if (reset_trace_processor_args.has_drop_track_event_data_before()) {
341 config.drop_track_event_data_before =
342 reset_trace_processor_args.drop_track_event_data_before() ==
343 protos::pbzero::ResetTraceProcessorArgs::
344 TRACK_EVENT_RANGE_OF_INTEREST
345 ? DropTrackEventDataBefore::kTrackEventRangeOfInterest
346 : DropTrackEventDataBefore::kNoDrop;
347 }
348 if (reset_trace_processor_args.has_ingest_ftrace_in_raw_table()) {
349 config.ingest_ftrace_in_raw_table =
350 reset_trace_processor_args.ingest_ftrace_in_raw_table();
351 }
352 if (reset_trace_processor_args.has_analyze_trace_proto_content()) {
353 config.analyze_trace_proto_content =
354 reset_trace_processor_args.analyze_trace_proto_content();
355 }
356 if (reset_trace_processor_args.has_ftrace_drop_until_all_cpus_valid()) {
357 config.soft_drop_ftrace_data_before =
358 reset_trace_processor_args.ftrace_drop_until_all_cpus_valid()
359 ? SoftDropFtraceDataBefore::kAllPerCpuBuffersValid
360 : SoftDropFtraceDataBefore::kNoDrop;
361 }
362 ResetTraceProcessorInternal(config);
363 }
364
MaybePrintProgress()365 void Rpc::MaybePrintProgress() {
366 if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
367 bytes_last_progress_ = bytes_parsed_;
368 auto t_load_s =
369 static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
370 1e9;
371 fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
372 static_cast<double>(bytes_parsed_) / 1e6,
373 static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
374 (eof_ ? "\n" : ""));
375 fflush(stderr);
376 }
377 }
378
Query(const uint8_t * args,size_t len,const QueryResultBatchCallback & result_callback)379 void Rpc::Query(const uint8_t* args,
380 size_t len,
381 const QueryResultBatchCallback& result_callback) {
382 auto it = QueryInternal(args, len);
383 QueryResultSerializer serializer(std::move(it));
384
385 std::vector<uint8_t> res;
386 for (bool has_more = true; has_more;) {
387 has_more = serializer.Serialize(&res);
388 result_callback(res.data(), res.size(), has_more);
389 res.clear();
390 }
391 }
392
QueryInternal(const uint8_t * args,size_t len)393 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
394 protos::pbzero::QueryArgs::Decoder query(args, len);
395 std::string sql = query.sql_query().ToStdString();
396 PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
397 PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_QUERY",
398 [&](metatrace::Record* r) {
399 r->AddArg("SQL", sql);
400 if (query.has_tag()) {
401 r->AddArg("tag", query.tag());
402 }
403 });
404
405 return trace_processor_->ExecuteQuery(sql);
406 }
407
RestoreInitialTables()408 void Rpc::RestoreInitialTables() {
409 trace_processor_->RestoreInitialTables();
410 }
411
ComputeMetric(const uint8_t * args,size_t len)412 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
413 protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
414 ComputeMetricInternal(args, len, result.get());
415 return result.SerializeAsArray();
416 }
417
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)418 void Rpc::ComputeMetricInternal(const uint8_t* data,
419 size_t len,
420 protos::pbzero::ComputeMetricResult* result) {
421 protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
422 std::vector<std::string> metric_names;
423 for (auto it = args.metric_names(); it; ++it) {
424 metric_names.emplace_back(it->as_std_string());
425 }
426
427 PERFETTO_TP_TRACE(metatrace::Category::API_TIMELINE, "RPC_COMPUTE_METRIC",
428 [&](metatrace::Record* r) {
429 for (const auto& metric : metric_names) {
430 r->AddArg("Metric", metric);
431 r->AddArg("Format", std::to_string(args.format()));
432 }
433 });
434
435 PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
436 metric_names.empty() ? "" : metric_names.front().c_str(),
437 args.format());
438 switch (args.format()) {
439 case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
440 std::vector<uint8_t> metrics_proto;
441 base::Status status =
442 trace_processor_->ComputeMetric(metric_names, &metrics_proto);
443 if (status.ok()) {
444 result->set_metrics(metrics_proto.data(), metrics_proto.size());
445 } else {
446 result->set_error(status.message());
447 }
448 break;
449 }
450 case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
451 std::string metrics_string;
452 base::Status status = trace_processor_->ComputeMetricText(
453 metric_names, TraceProcessor::MetricResultFormat::kProtoText,
454 &metrics_string);
455 if (status.ok()) {
456 result->set_metrics_as_prototext(metrics_string);
457 } else {
458 result->set_error(status.message());
459 }
460 break;
461 }
462 case protos::pbzero::ComputeMetricArgs::JSON: {
463 std::string metrics_string;
464 base::Status status = trace_processor_->ComputeMetricText(
465 metric_names, TraceProcessor::MetricResultFormat::kJson,
466 &metrics_string);
467 if (status.ok()) {
468 result->set_metrics_as_json(metrics_string);
469 } else {
470 result->set_error(status.message());
471 }
472 break;
473 }
474 }
475 }
476
EnableMetatrace(const uint8_t * data,size_t len)477 void Rpc::EnableMetatrace(const uint8_t* data, size_t len) {
478 using protos::pbzero::MetatraceCategories;
479 TraceProcessor::MetatraceConfig config;
480 protos::pbzero::EnableMetatraceArgs::Decoder args(data, len);
481 config.categories = MetatraceCategoriesToPublicEnum(
482 static_cast<MetatraceCategories>(args.categories()));
483 trace_processor_->EnableMetatrace(config);
484 }
485
DisableAndReadMetatrace()486 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
487 protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
488 DisableAndReadMetatraceInternal(result.get());
489 return result.SerializeAsArray();
490 }
491
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)492 void Rpc::DisableAndReadMetatraceInternal(
493 protos::pbzero::DisableAndReadMetatraceResult* result) {
494 std::vector<uint8_t> trace_proto;
495 base::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
496 if (status.ok()) {
497 result->set_metatrace(trace_proto.data(), trace_proto.size());
498 } else {
499 result->set_error(status.message());
500 }
501 }
502
GetStatus()503 std::vector<uint8_t> Rpc::GetStatus() {
504 protozero::HeapBuffered<protos::pbzero::StatusResult> status;
505 status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
506 status->set_human_readable_version(base::GetVersionString());
507 if (const char* version_code = base::GetVersionCode(); version_code) {
508 status->set_version_code(version_code);
509 }
510 status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
511 return status.SerializeAsArray();
512 }
513
514 } // namespace perfetto::trace_processor
515