1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/rpc.h"
18
19 #include <string.h>
20
21 #include <vector>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/time.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/protozero/scattered_heap_buffer.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "perfetto/trace_processor/trace_processor.h"
30 #include "src/protozero/proto_ring_buffer.h"
31 #include "src/trace_processor/rpc/query_result_serializer.h"
32 #include "src/trace_processor/tp_metatrace.h"
33
34 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
35
36 namespace perfetto {
37 namespace trace_processor {
38
39 namespace {
40 // Writes a "Loading trace ..." update every N bytes.
41 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
42 using TraceProcessorRpcStream = protos::pbzero::TraceProcessorRpcStream;
43 using RpcProto = protos::pbzero::TraceProcessorRpc;
44
45 // Most RPC messages are either very small or a query results.
46 // QueryResultSerializer splits rows into batches of approximately 128KB. Try
47 // avoid extra heap allocations for the nominal case.
48 constexpr auto kSliceSize =
49 QueryResultSerializer::kDefaultBatchSplitThreshold + 4096;
50
51 // Holds a trace_processor::TraceProcessorRpc pbzero message. Avoids extra
52 // copies by doing direct scattered calls from the fragmented heap buffer onto
53 // the RpcResponseFunction (the receiver is expected to deal with arbitrary
54 // fragmentation anyways). It also takes care of prefixing each message with
55 // the proto preamble and varint size.
56 class Response {
57 public:
58 Response(int64_t seq, int method);
59 Response(const Response&) = delete;
60 Response& operator=(const Response&) = delete;
operator ->()61 RpcProto* operator->() { return msg_; }
62 void Send(Rpc::RpcResponseFunction);
63
64 private:
65 RpcProto* msg_ = nullptr;
66
67 // The reason why we use TraceProcessorRpcStream as root message is because
68 // the RPC wire protocol expects each message to be prefixed with a proto
69 // preamble and varint size. This happens to be the same serialization of a
70 // repeated field (this is really the same trick we use between
71 // Trace and TracePacket in trace.proto)
72 protozero::HeapBuffered<TraceProcessorRpcStream> buf_;
73 };
74
Response(int64_t seq,int method)75 Response::Response(int64_t seq, int method) : buf_(kSliceSize, kSliceSize) {
76 msg_ = buf_->add_msg();
77 msg_->set_seq(seq);
78 msg_->set_response(static_cast<RpcProto::TraceProcessorMethod>(method));
79 }
80
Send(Rpc::RpcResponseFunction send_fn)81 void Response::Send(Rpc::RpcResponseFunction send_fn) {
82 buf_->Finalize();
83 for (const auto& slice : buf_.GetSlices()) {
84 auto range = slice.GetUsedRange();
85 send_fn(range.begin, static_cast<uint32_t>(range.size()));
86 }
87 }
88
89 } // namespace
90
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)91 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
92 : trace_processor_(std::move(preloaded_instance)) {
93 if (!trace_processor_)
94 ResetTraceProcessorInternal(Config());
95 }
96
Rpc()97 Rpc::Rpc() : Rpc(nullptr) {}
98 Rpc::~Rpc() = default;
99
ResetTraceProcessorInternal(const Config & config)100 void Rpc::ResetTraceProcessorInternal(const Config& config) {
101 trace_processor_config_ = config;
102 trace_processor_ = TraceProcessor::CreateInstance(config);
103 bytes_parsed_ = bytes_last_progress_ = 0;
104 t_parse_started_ = base::GetWallTimeNs().count();
105 // Deliberately not resetting the RPC channel state (rxbuf_, {tx,rx}_seq_id_).
106 // This is invoked from the same client to clear the current trace state
107 // before loading a new one. The IPC channel is orthogonal to that and the
108 // message numbering continues regardless of the reset.
109 }
110
OnRpcRequest(const void * data,size_t len)111 void Rpc::OnRpcRequest(const void* data, size_t len) {
112 rxbuf_.Append(data, len);
113 for (;;) {
114 auto msg = rxbuf_.ReadMessage();
115 if (!msg.valid()) {
116 if (msg.fatal_framing_error) {
117 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
118 err_msg->add_msg()->set_fatal_error("RPC framing error");
119 auto err = err_msg.SerializeAsArray();
120 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
121 rpc_response_fn_(nullptr, 0); // Disconnect.
122 }
123 break;
124 }
125 ParseRpcRequest(msg.start, msg.len);
126 }
127 }
128
129 namespace {
130
131 using ProtoEnum = protos::pbzero::MetatraceCategories;
MetatraceCategoriesToPublicEnum(ProtoEnum categories)132 TraceProcessor::MetatraceCategories MetatraceCategoriesToPublicEnum(
133 ProtoEnum categories) {
134 switch (categories) {
135 case ProtoEnum::TOPLEVEL:
136 return TraceProcessor::MetatraceCategories::TOPLEVEL;
137 case ProtoEnum::QUERY:
138 return TraceProcessor::MetatraceCategories::QUERY;
139 case ProtoEnum::FUNCTION:
140 return TraceProcessor::MetatraceCategories::FUNCTION;
141 case ProtoEnum::ALL:
142 return TraceProcessor::MetatraceCategories::ALL;
143 case ProtoEnum::NONE:
144 return TraceProcessor::MetatraceCategories::NONE;
145 }
146 return TraceProcessor::MetatraceCategories::NONE;
147 }
148
149 } // namespace
150
151 // [data, len] here is a tokenized TraceProcessorRpc proto message, without the
152 // size header.
ParseRpcRequest(const uint8_t * data,size_t len)153 void Rpc::ParseRpcRequest(const uint8_t* data, size_t len) {
154 RpcProto::Decoder req(data, len);
155
156 // We allow restarting the sequence from 0. This happens when refreshing the
157 // browser while using the external trace_processor_shell --httpd.
158 if (req.seq() != 0 && rx_seq_id_ != 0 && req.seq() != rx_seq_id_ + 1) {
159 char err_str[255];
160 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts in the UI.
161 snprintf(err_str, sizeof(err_str),
162 "RPC request out of order. Expected %" PRId64 ", got %" PRId64
163 " (ERR:rpc_seq)",
164 rx_seq_id_ + 1, req.seq());
165 PERFETTO_ELOG("%s", err_str);
166 protozero::HeapBuffered<TraceProcessorRpcStream> err_msg;
167 err_msg->add_msg()->set_fatal_error(err_str);
168 auto err = err_msg.SerializeAsArray();
169 rpc_response_fn_(err.data(), static_cast<uint32_t>(err.size()));
170 rpc_response_fn_(nullptr, 0); // Disconnect.
171 return;
172 }
173 rx_seq_id_ = req.seq();
174
175 // The static cast is to prevent that the compiler breaks future proofness.
176 const int req_type = static_cast<int>(req.request());
177 static const char kErrFieldNotSet[] = "RPC error: request field not set";
178 switch (req_type) {
179 case RpcProto::TPM_APPEND_TRACE_DATA: {
180 Response resp(tx_seq_id_++, req_type);
181 auto* result = resp->set_append_result();
182 if (!req.has_append_trace_data()) {
183 result->set_error(kErrFieldNotSet);
184 } else {
185 protozero::ConstBytes byte_range = req.append_trace_data();
186 util::Status res = Parse(byte_range.data, byte_range.size);
187 if (!res.ok()) {
188 result->set_error(res.message());
189 }
190 }
191 resp.Send(rpc_response_fn_);
192 break;
193 }
194 case RpcProto::TPM_FINALIZE_TRACE_DATA: {
195 Response resp(tx_seq_id_++, req_type);
196 NotifyEndOfFile();
197 resp.Send(rpc_response_fn_);
198 break;
199 }
200 case RpcProto::TPM_QUERY_STREAMING: {
201 if (!req.has_query_args()) {
202 Response resp(tx_seq_id_++, req_type);
203 auto* result = resp->set_query_result();
204 result->set_error(kErrFieldNotSet);
205 resp.Send(rpc_response_fn_);
206 } else {
207 protozero::ConstBytes args = req.query_args();
208 auto it = QueryInternal(args.data, args.size);
209 QueryResultSerializer serializer(std::move(it));
210 for (bool has_more = true; has_more;) {
211 Response resp(tx_seq_id_++, req_type);
212 has_more = serializer.Serialize(resp->set_query_result());
213 resp.Send(rpc_response_fn_);
214 }
215 }
216 break;
217 }
218 case RpcProto::TPM_COMPUTE_METRIC: {
219 Response resp(tx_seq_id_++, req_type);
220 auto* result = resp->set_metric_result();
221 if (!req.has_compute_metric_args()) {
222 result->set_error(kErrFieldNotSet);
223 } else {
224 protozero::ConstBytes args = req.compute_metric_args();
225 ComputeMetricInternal(args.data, args.size, result);
226 }
227 resp.Send(rpc_response_fn_);
228 break;
229 }
230 case RpcProto::TPM_GET_METRIC_DESCRIPTORS: {
231 Response resp(tx_seq_id_++, req_type);
232 auto descriptor_set = trace_processor_->GetMetricDescriptors();
233 auto* result = resp->set_metric_descriptors();
234 result->AppendRawProtoBytes(descriptor_set.data(), descriptor_set.size());
235 resp.Send(rpc_response_fn_);
236 break;
237 }
238 case RpcProto::TPM_RESTORE_INITIAL_TABLES: {
239 trace_processor_->RestoreInitialTables();
240 Response resp(tx_seq_id_++, req_type);
241 resp.Send(rpc_response_fn_);
242 break;
243 }
244 case RpcProto::TPM_ENABLE_METATRACE: {
245 using protos::pbzero::MetatraceCategories;
246 protozero::ConstBytes args = req.enable_metatrace_args();
247 EnableMetatrace(args.data, args.size);
248
249 Response resp(tx_seq_id_++, req_type);
250 resp.Send(rpc_response_fn_);
251 break;
252 }
253 case RpcProto::TPM_DISABLE_AND_READ_METATRACE: {
254 Response resp(tx_seq_id_++, req_type);
255 DisableAndReadMetatraceInternal(resp->set_metatrace());
256 resp.Send(rpc_response_fn_);
257 break;
258 }
259 case RpcProto::TPM_GET_STATUS: {
260 Response resp(tx_seq_id_++, req_type);
261 std::vector<uint8_t> status = GetStatus();
262 resp->set_status()->AppendRawProtoBytes(status.data(), status.size());
263 resp.Send(rpc_response_fn_);
264 break;
265 }
266 case RpcProto::TPM_RESET_TRACE_PROCESSOR: {
267 Response resp(tx_seq_id_++, req_type);
268 protozero::ConstBytes args = req.reset_trace_processor_args();
269 ResetTraceProcessor(args.data, args.size);
270 resp.Send(rpc_response_fn_);
271 break;
272 }
273 default: {
274 // This can legitimately happen if the client is newer. We reply with a
275 // generic "unkown request" response, so the client can do feature
276 // detection
277 PERFETTO_DLOG("[RPC] Uknown request type (%d), size=%zu", req_type, len);
278 Response resp(tx_seq_id_++, req_type);
279 resp->set_invalid_request(
280 static_cast<RpcProto::TraceProcessorMethod>(req_type));
281 resp.Send(rpc_response_fn_);
282 break;
283 }
284 } // switch(req_type)
285 }
286
Parse(const uint8_t * data,size_t len)287 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
288 PERFETTO_TP_TRACE(
289 metatrace::Category::TOPLEVEL, "RPC_PARSE",
290 [&](metatrace::Record* r) { r->AddArg("length", std::to_string(len)); });
291 if (eof_) {
292 // Reset the trace processor state if another trace has been previously
293 // loaded. Use the same TraceProcessor Config.
294 ResetTraceProcessorInternal(trace_processor_config_);
295 }
296
297 eof_ = false;
298 bytes_parsed_ += len;
299 MaybePrintProgress();
300
301 if (len == 0)
302 return util::OkStatus();
303
304 // TraceProcessor needs take ownership of the memory chunk.
305 std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
306 memcpy(data_copy.get(), data, len);
307 return trace_processor_->Parse(std::move(data_copy), len);
308 }
309
NotifyEndOfFile()310 void Rpc::NotifyEndOfFile() {
311 PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_NOTIFY_END_OF_FILE");
312
313 trace_processor_->NotifyEndOfFile();
314 eof_ = true;
315 MaybePrintProgress();
316 }
317
ResetTraceProcessor(const uint8_t * args,size_t len)318 void Rpc::ResetTraceProcessor(const uint8_t* args, size_t len) {
319 protos::pbzero::ResetTraceProcessorArgs::Decoder reset_trace_processor_args(
320 args, len);
321 Config config;
322 if (reset_trace_processor_args.has_drop_track_event_data_before()) {
323 config.drop_track_event_data_before =
324 reset_trace_processor_args.drop_track_event_data_before() ==
325 protos::pbzero::ResetTraceProcessorArgs::
326 TRACK_EVENT_RANGE_OF_INTEREST
327 ? DropTrackEventDataBefore::kTrackEventRangeOfInterest
328 : DropTrackEventDataBefore::kNoDrop;
329 }
330 if (reset_trace_processor_args.has_ingest_ftrace_in_raw_table()) {
331 config.ingest_ftrace_in_raw_table =
332 reset_trace_processor_args.ingest_ftrace_in_raw_table();
333 }
334 if (reset_trace_processor_args.has_analyze_trace_proto_content()) {
335 config.analyze_trace_proto_content =
336 reset_trace_processor_args.analyze_trace_proto_content();
337 }
338 ResetTraceProcessorInternal(config);
339 }
340
MaybePrintProgress()341 void Rpc::MaybePrintProgress() {
342 if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
343 bytes_last_progress_ = bytes_parsed_;
344 auto t_load_s =
345 static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
346 1e9;
347 fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
348 static_cast<double>(bytes_parsed_) / 1e6,
349 static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
350 (eof_ ? "\n" : ""));
351 fflush(stderr);
352 }
353 }
354
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)355 void Rpc::Query(const uint8_t* args,
356 size_t len,
357 QueryResultBatchCallback result_callback) {
358 auto it = QueryInternal(args, len);
359 QueryResultSerializer serializer(std::move(it));
360
361 std::vector<uint8_t> res;
362 for (bool has_more = true; has_more;) {
363 has_more = serializer.Serialize(&res);
364 result_callback(res.data(), res.size(), has_more);
365 res.clear();
366 }
367 }
368
QueryInternal(const uint8_t * args,size_t len)369 Iterator Rpc::QueryInternal(const uint8_t* args, size_t len) {
370 protos::pbzero::QueryArgs::Decoder query(args, len);
371 std::string sql = query.sql_query().ToStdString();
372 PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
373 PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_QUERY",
374 [&](metatrace::Record* r) {
375 r->AddArg("SQL", sql);
376 if (query.has_tag()) {
377 r->AddArg("tag", query.tag());
378 }
379 });
380
381 return trace_processor_->ExecuteQuery(sql.c_str());
382 }
383
RestoreInitialTables()384 void Rpc::RestoreInitialTables() {
385 trace_processor_->RestoreInitialTables();
386 }
387
ComputeMetric(const uint8_t * args,size_t len)388 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* args, size_t len) {
389 protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
390 ComputeMetricInternal(args, len, result.get());
391 return result.SerializeAsArray();
392 }
393
ComputeMetricInternal(const uint8_t * data,size_t len,protos::pbzero::ComputeMetricResult * result)394 void Rpc::ComputeMetricInternal(const uint8_t* data,
395 size_t len,
396 protos::pbzero::ComputeMetricResult* result) {
397 protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
398 std::vector<std::string> metric_names;
399 for (auto it = args.metric_names(); it; ++it) {
400 metric_names.emplace_back(it->as_std_string());
401 }
402
403 PERFETTO_TP_TRACE(metatrace::Category::TOPLEVEL, "RPC_COMPUTE_METRIC",
404 [&](metatrace::Record* r) {
405 for (const auto& metric : metric_names) {
406 r->AddArg("Metric", metric);
407 r->AddArg("Format", std::to_string(args.format()));
408 }
409 });
410
411 PERFETTO_DLOG("[RPC] ComputeMetrics(%zu, %s), format=%d", metric_names.size(),
412 metric_names.empty() ? "" : metric_names.front().c_str(),
413 args.format());
414 switch (args.format()) {
415 case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
416 std::vector<uint8_t> metrics_proto;
417 util::Status status =
418 trace_processor_->ComputeMetric(metric_names, &metrics_proto);
419 if (status.ok()) {
420 result->set_metrics(metrics_proto.data(), metrics_proto.size());
421 } else {
422 result->set_error(status.message());
423 }
424 break;
425 }
426 case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
427 std::string metrics_string;
428 util::Status status = trace_processor_->ComputeMetricText(
429 metric_names, TraceProcessor::MetricResultFormat::kProtoText,
430 &metrics_string);
431 if (status.ok()) {
432 result->set_metrics_as_prototext(metrics_string);
433 } else {
434 result->set_error(status.message());
435 }
436 break;
437 }
438 }
439 }
440
EnableMetatrace(const uint8_t * data,size_t len)441 void Rpc::EnableMetatrace(const uint8_t* data, size_t len) {
442 using protos::pbzero::MetatraceCategories;
443 TraceProcessor::MetatraceConfig config;
444 protos::pbzero::EnableMetatraceArgs::Decoder args(data, len);
445 config.categories = MetatraceCategoriesToPublicEnum(
446 static_cast<MetatraceCategories>(args.categories()));
447 trace_processor_->EnableMetatrace(config);
448 }
449
DisableAndReadMetatrace()450 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
451 protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
452 DisableAndReadMetatraceInternal(result.get());
453 return result.SerializeAsArray();
454 }
455
DisableAndReadMetatraceInternal(protos::pbzero::DisableAndReadMetatraceResult * result)456 void Rpc::DisableAndReadMetatraceInternal(
457 protos::pbzero::DisableAndReadMetatraceResult* result) {
458 std::vector<uint8_t> trace_proto;
459 util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
460 if (status.ok()) {
461 result->set_metatrace(trace_proto.data(), trace_proto.size());
462 } else {
463 result->set_error(status.message());
464 }
465 }
466
GetStatus()467 std::vector<uint8_t> Rpc::GetStatus() {
468 protozero::HeapBuffered<protos::pbzero::StatusResult> status;
469 status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
470 status->set_human_readable_version(base::GetVersionString());
471 status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
472 return status.SerializeAsArray();
473 }
474
475 } // namespace trace_processor
476 } // namespace perfetto
477