1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/rpc.h"
18
19 #include <vector>
20
21 #include "perfetto/base/time.h"
22 #include "perfetto/protozero/scattered_heap_buffer.h"
23 #include "perfetto/trace_processor/trace_processor.h"
24 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
25 #include "src/trace_processor/rpc/query_result_serializer.h"
26 #include "src/trace_processor/tp_metatrace.h"
27
28 namespace perfetto {
29 namespace trace_processor {
30
31 using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
32 using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
33
34 // Writes a "Loading trace ..." update every N bytes.
35 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
36
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)37 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
38 : trace_processor_(std::move(preloaded_instance)) {}
39
Rpc()40 Rpc::Rpc() : Rpc(nullptr) {}
41
42 Rpc::~Rpc() = default;
43
Parse(const uint8_t * data,size_t len)44 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
45 if (eof_) {
46 // Reset the trace processor state if this is either the first call ever or
47 // if another trace has been previously fully loaded.
48 trace_processor_ = TraceProcessor::CreateInstance(Config());
49 bytes_parsed_ = bytes_last_progress_ = 0;
50 t_parse_started_ = base::GetWallTimeNs().count();
51 }
52
53 eof_ = false;
54 bytes_parsed_ += len;
55 MaybePrintProgress();
56
57 if (len == 0)
58 return util::OkStatus();
59
60 // TraceProcessor needs take ownership of the memory chunk.
61 std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
62 memcpy(data_copy.get(), data, len);
63 return trace_processor_->Parse(std::move(data_copy), len);
64 }
65
NotifyEndOfFile()66 void Rpc::NotifyEndOfFile() {
67 if (!trace_processor_)
68 return;
69 trace_processor_->NotifyEndOfFile();
70 eof_ = true;
71 MaybePrintProgress();
72 }
73
MaybePrintProgress()74 void Rpc::MaybePrintProgress() {
75 if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
76 bytes_last_progress_ = bytes_parsed_;
77 auto t_load_s =
78 static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
79 1e9;
80 fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
81 static_cast<double>(bytes_parsed_) / 1e6,
82 static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
83 (eof_ ? "\n" : ""));
84 fflush(stderr);
85 }
86 }
87
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)88 void Rpc::Query(const uint8_t* args,
89 size_t len,
90 QueryResultBatchCallback result_callback) {
91 protos::pbzero::RawQueryArgs::Decoder query(args, len);
92 std::string sql = query.sql_query().ToStdString();
93 PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
94 PERFETTO_TP_TRACE("RPC_QUERY",
95 [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
96
97 if (!trace_processor_) {
98 static const char kErr[] = "Query() called before Parse()";
99 PERFETTO_ELOG("[RPC] %s", kErr);
100 protozero::HeapBuffered<protos::pbzero::QueryResult> result;
101 result->set_error(kErr);
102 auto vec = result.SerializeAsArray();
103 result_callback(vec.data(), vec.size(), /*has_more=*/false);
104 return;
105 }
106
107 auto it = trace_processor_->ExecuteQuery(sql.c_str());
108 QueryResultSerializer serializer(std::move(it));
109
110 std::vector<uint8_t> res;
111 for (bool has_more = true; has_more;) {
112 has_more = serializer.Serialize(&res);
113 result_callback(res.data(), res.size(), has_more);
114 res.clear();
115 }
116 }
117
RawQuery(const uint8_t * args,size_t len)118 std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
119 protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
120 protos::pbzero::RawQueryArgs::Decoder query(args, len);
121 std::string sql = query.sql_query().ToStdString();
122 PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
123 PERFETTO_TP_TRACE("RPC_RAW_QUERY",
124 [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
125
126 if (!trace_processor_) {
127 static const char kErr[] = "RawQuery() called before Parse()";
128 PERFETTO_ELOG("[RPC] %s", kErr);
129 result->set_error(kErr);
130 return result.SerializeAsArray();
131 }
132
133 auto it = trace_processor_->ExecuteQuery(sql.c_str());
134
135 // This vector contains a standalone protozero message per column. The problem
136 // it's solving is the following: (i) sqlite iterators are row-based; (ii) the
137 // RawQueryResult proto is column-based (that was a poor design choice we
138 // should revisit at some point); (iii) the protozero API doesn't allow to
139 // begin a new nested message before the previous one is completed.
140 // In order to avoid the interleaved-writing, we write each column in a
141 // dedicated heap buffer and then we merge all the column data at the end,
142 // after having iterated all rows.
143 std::vector<protozero::HeapBuffered<ColumnValues>> cols(it.ColumnCount());
144
145 // This constexpr is to avoid ODR-use of protozero constants which are only
146 // declared but not defined. Putting directly UNKONWN in the vector ctor
147 // causes a linker error in the WASM toolchain.
148 static constexpr auto kUnknown = ColumnDesc::UNKNOWN;
149 std::vector<ColumnDesc::Type> col_types(it.ColumnCount(), kUnknown);
150 uint32_t rows = 0;
151
152 for (; it.Next(); ++rows) {
153 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
154 auto& col = cols[col_idx];
155 auto& col_type = col_types[col_idx];
156
157 using SqlValue = trace_processor::SqlValue;
158 auto cell = it.Get(col_idx);
159 if (col_type == ColumnDesc::UNKNOWN) {
160 switch (cell.type) {
161 case SqlValue::Type::kLong:
162 col_type = ColumnDesc::LONG;
163 break;
164 case SqlValue::Type::kString:
165 col_type = ColumnDesc::STRING;
166 break;
167 case SqlValue::Type::kDouble:
168 col_type = ColumnDesc::DOUBLE;
169 break;
170 case SqlValue::Type::kBytes:
171 col_type = ColumnDesc::STRING;
172 break;
173 case SqlValue::Type::kNull:
174 break;
175 }
176 }
177
178 // If either the column type is null or we still don't know the type,
179 // just add null values to all the columns.
180 if (cell.type == SqlValue::Type::kNull ||
181 col_type == ColumnDesc::UNKNOWN) {
182 col->add_long_values(0);
183 col->add_string_values("[NULL]");
184 col->add_double_values(0);
185 col->add_is_nulls(true);
186 continue;
187 }
188
189 // Cast the sqlite value to the type of the column.
190 switch (col_type) {
191 case ColumnDesc::LONG:
192 PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
193 cell.type == SqlValue::Type::kDouble);
194 if (cell.type == SqlValue::Type::kLong) {
195 col->add_long_values(cell.long_value);
196 } else /* if (cell.type == SqlValue::Type::kDouble) */ {
197 col->add_long_values(static_cast<int64_t>(cell.double_value));
198 }
199 col->add_is_nulls(false);
200 break;
201 case ColumnDesc::STRING: {
202 if (cell.type == SqlValue::Type::kBytes) {
203 col->add_string_values("<bytes>");
204 } else {
205 PERFETTO_CHECK(cell.type == SqlValue::Type::kString);
206 col->add_string_values(cell.string_value);
207 }
208 col->add_is_nulls(false);
209 break;
210 }
211 case ColumnDesc::DOUBLE:
212 PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
213 cell.type == SqlValue::Type::kDouble);
214 if (cell.type == SqlValue::Type::kLong) {
215 col->add_double_values(static_cast<double>(cell.long_value));
216 } else /* if (cell.type == SqlValue::Type::kDouble) */ {
217 col->add_double_values(cell.double_value);
218 }
219 col->add_is_nulls(false);
220 break;
221 case ColumnDesc::UNKNOWN:
222 PERFETTO_FATAL("Handled in if statement above.");
223 }
224 } // for(col)
225 } // for(row)
226
227 // Write the column descriptors.
228 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
229 auto* descriptor = result->add_column_descriptors();
230 std::string col_name = it.GetColumnName(col_idx);
231 descriptor->set_name(col_name.data(), col_name.size());
232 descriptor->set_type(col_types[col_idx]);
233 }
234
235 // Merge the column values.
236 for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
237 std::vector<uint8_t> col_data = cols[col_idx].SerializeAsArray();
238 result->AppendBytes(protos::pbzero::RawQueryResult::kColumnsFieldNumber,
239 col_data.data(), col_data.size());
240 }
241
242 util::Status status = it.Status();
243 result->set_num_records(rows);
244 if (!status.ok())
245 result->set_error(status.c_message());
246 PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
247
248 return result.SerializeAsArray();
249 }
250
GetCurrentTraceName()251 std::string Rpc::GetCurrentTraceName() {
252 if (!trace_processor_)
253 return "";
254 return trace_processor_->GetCurrentTraceName();
255 }
256
RestoreInitialTables()257 void Rpc::RestoreInitialTables() {
258 if (trace_processor_)
259 trace_processor_->RestoreInitialTables();
260 }
261
ComputeMetric(const uint8_t * data,size_t len)262 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* data, size_t len) {
263 protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
264 if (!trace_processor_) {
265 result->set_error("Null trace processor instance");
266 return result.SerializeAsArray();
267 }
268
269 protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
270 std::vector<std::string> metric_names;
271 for (auto it = args.metric_names(); it; ++it) {
272 metric_names.emplace_back(it->as_std_string());
273 }
274
275 PERFETTO_TP_TRACE("RPC_COMPUTE_METRIC", [&](metatrace::Record* r) {
276 for (const auto& metric : metric_names) {
277 r->AddArg("Metric", metric);
278 r->AddArg("Format", std::to_string(args.format()));
279 }
280 });
281
282 switch (args.format()) {
283 case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
284 std::vector<uint8_t> metrics_proto;
285 util::Status status =
286 trace_processor_->ComputeMetric(metric_names, &metrics_proto);
287 if (status.ok()) {
288 result->AppendBytes(
289 protos::pbzero::ComputeMetricResult::kMetricsFieldNumber,
290 metrics_proto.data(), metrics_proto.size());
291 } else {
292 result->set_error(status.message());
293 }
294 break;
295 }
296 case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
297 std::string metrics_string;
298 util::Status status = trace_processor_->ComputeMetricText(
299 metric_names, TraceProcessor::MetricResultFormat::kProtoText,
300 &metrics_string);
301 if (status.ok()) {
302 result->AppendString(
303 protos::pbzero::ComputeMetricResult::kMetricsAsPrototextFieldNumber,
304 metrics_string);
305 } else {
306 result->set_error(status.message());
307 }
308 break;
309 }
310 }
311 return result.SerializeAsArray();
312 }
313
GetMetricDescriptors(const uint8_t *,size_t)314 std::vector<uint8_t> Rpc::GetMetricDescriptors(const uint8_t*, size_t) {
315 protozero::HeapBuffered<protos::pbzero::GetMetricDescriptorsResult> result;
316 if (!trace_processor_) {
317 return result.SerializeAsArray();
318 }
319 std::vector<uint8_t> descriptor_set =
320 trace_processor_->GetMetricDescriptors();
321 result->AppendBytes(
322 protos::pbzero::GetMetricDescriptorsResult::kDescriptorSetFieldNumber,
323 descriptor_set.data(), descriptor_set.size());
324 return result.SerializeAsArray();
325 }
326
EnableMetatrace()327 void Rpc::EnableMetatrace() {
328 if (!trace_processor_)
329 return;
330 trace_processor_->EnableMetatrace();
331 }
332
DisableAndReadMetatrace()333 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
334 protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
335 if (!trace_processor_) {
336 result->set_error("Null trace processor instance");
337 return result.SerializeAsArray();
338 }
339
340 std::vector<uint8_t> trace_proto;
341 util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
342 if (status.ok()) {
343 result->set_metatrace(trace_proto.data(), trace_proto.size());
344 } else {
345 result->set_error(status.message());
346 }
347 return result.SerializeAsArray();
348 }
349
350 } // namespace trace_processor
351 } // namespace perfetto
352