• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/rpc.h"
18 
19 #include <vector>
20 
21 #include "perfetto/base/time.h"
22 #include "perfetto/protozero/scattered_heap_buffer.h"
23 #include "perfetto/trace_processor/trace_processor.h"
24 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
25 #include "src/trace_processor/rpc/query_result_serializer.h"
26 #include "src/trace_processor/tp_metatrace.h"
27 
28 namespace perfetto {
29 namespace trace_processor {
30 
31 using ColumnValues = protos::pbzero::RawQueryResult::ColumnValues;
32 using ColumnDesc = protos::pbzero::RawQueryResult::ColumnDesc;
33 
34 // Writes a "Loading trace ..." update every N bytes.
35 constexpr size_t kProgressUpdateBytes = 50 * 1000 * 1000;
36 
Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)37 Rpc::Rpc(std::unique_ptr<TraceProcessor> preloaded_instance)
38     : trace_processor_(std::move(preloaded_instance)) {}
39 
Rpc()40 Rpc::Rpc() : Rpc(nullptr) {}
41 
42 Rpc::~Rpc() = default;
43 
Parse(const uint8_t * data,size_t len)44 util::Status Rpc::Parse(const uint8_t* data, size_t len) {
45   if (eof_) {
46     // Reset the trace processor state if this is either the first call ever or
47     // if another trace has been previously fully loaded.
48     trace_processor_ = TraceProcessor::CreateInstance(Config());
49     bytes_parsed_ = bytes_last_progress_ = 0;
50     t_parse_started_ = base::GetWallTimeNs().count();
51   }
52 
53   eof_ = false;
54   bytes_parsed_ += len;
55   MaybePrintProgress();
56 
57   if (len == 0)
58     return util::OkStatus();
59 
60   // TraceProcessor needs take ownership of the memory chunk.
61   std::unique_ptr<uint8_t[]> data_copy(new uint8_t[len]);
62   memcpy(data_copy.get(), data, len);
63   return trace_processor_->Parse(std::move(data_copy), len);
64 }
65 
NotifyEndOfFile()66 void Rpc::NotifyEndOfFile() {
67   if (!trace_processor_)
68     return;
69   trace_processor_->NotifyEndOfFile();
70   eof_ = true;
71   MaybePrintProgress();
72 }
73 
MaybePrintProgress()74 void Rpc::MaybePrintProgress() {
75   if (eof_ || bytes_parsed_ - bytes_last_progress_ > kProgressUpdateBytes) {
76     bytes_last_progress_ = bytes_parsed_;
77     auto t_load_s =
78         static_cast<double>(base::GetWallTimeNs().count() - t_parse_started_) /
79         1e9;
80     fprintf(stderr, "\rLoading trace %.2f MB (%.1f MB/s)%s",
81             static_cast<double>(bytes_parsed_) / 1e6,
82             static_cast<double>(bytes_parsed_) / 1e6 / t_load_s,
83             (eof_ ? "\n" : ""));
84     fflush(stderr);
85   }
86 }
87 
Query(const uint8_t * args,size_t len,QueryResultBatchCallback result_callback)88 void Rpc::Query(const uint8_t* args,
89                 size_t len,
90                 QueryResultBatchCallback result_callback) {
91   protos::pbzero::RawQueryArgs::Decoder query(args, len);
92   std::string sql = query.sql_query().ToStdString();
93   PERFETTO_DLOG("[RPC] Query < %s", sql.c_str());
94   PERFETTO_TP_TRACE("RPC_QUERY",
95                     [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
96 
97   if (!trace_processor_) {
98     static const char kErr[] = "Query() called before Parse()";
99     PERFETTO_ELOG("[RPC] %s", kErr);
100     protozero::HeapBuffered<protos::pbzero::QueryResult> result;
101     result->set_error(kErr);
102     auto vec = result.SerializeAsArray();
103     result_callback(vec.data(), vec.size(), /*has_more=*/false);
104     return;
105   }
106 
107   auto it = trace_processor_->ExecuteQuery(sql.c_str());
108   QueryResultSerializer serializer(std::move(it));
109 
110   std::vector<uint8_t> res;
111   for (bool has_more = true; has_more;) {
112     has_more = serializer.Serialize(&res);
113     result_callback(res.data(), res.size(), has_more);
114     res.clear();
115   }
116 }
117 
RawQuery(const uint8_t * args,size_t len)118 std::vector<uint8_t> Rpc::RawQuery(const uint8_t* args, size_t len) {
119   protozero::HeapBuffered<protos::pbzero::RawQueryResult> result;
120   protos::pbzero::RawQueryArgs::Decoder query(args, len);
121   std::string sql = query.sql_query().ToStdString();
122   PERFETTO_DLOG("[RPC] RawQuery < %s", sql.c_str());
123   PERFETTO_TP_TRACE("RPC_RAW_QUERY",
124                     [&](metatrace::Record* r) { r->AddArg("SQL", sql); });
125 
126   if (!trace_processor_) {
127     static const char kErr[] = "RawQuery() called before Parse()";
128     PERFETTO_ELOG("[RPC] %s", kErr);
129     result->set_error(kErr);
130     return result.SerializeAsArray();
131   }
132 
133   auto it = trace_processor_->ExecuteQuery(sql.c_str());
134 
135   // This vector contains a standalone protozero message per column. The problem
136   // it's solving is the following: (i) sqlite iterators are row-based; (ii) the
137   // RawQueryResult proto is column-based (that was a poor design choice we
138   // should revisit at some point); (iii) the protozero API doesn't allow to
139   // begin a new nested message before the previous one is completed.
140   // In order to avoid the interleaved-writing, we write each column in a
141   // dedicated heap buffer and then we merge all the column data at the end,
142   // after having iterated all rows.
143   std::vector<protozero::HeapBuffered<ColumnValues>> cols(it.ColumnCount());
144 
145   // This constexpr is to avoid ODR-use of protozero constants which are only
146   // declared but not defined. Putting directly UNKONWN in the vector ctor
147   // causes a linker error in the WASM toolchain.
148   static constexpr auto kUnknown = ColumnDesc::UNKNOWN;
149   std::vector<ColumnDesc::Type> col_types(it.ColumnCount(), kUnknown);
150   uint32_t rows = 0;
151 
152   for (; it.Next(); ++rows) {
153     for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
154       auto& col = cols[col_idx];
155       auto& col_type = col_types[col_idx];
156 
157       using SqlValue = trace_processor::SqlValue;
158       auto cell = it.Get(col_idx);
159       if (col_type == ColumnDesc::UNKNOWN) {
160         switch (cell.type) {
161           case SqlValue::Type::kLong:
162             col_type = ColumnDesc::LONG;
163             break;
164           case SqlValue::Type::kString:
165             col_type = ColumnDesc::STRING;
166             break;
167           case SqlValue::Type::kDouble:
168             col_type = ColumnDesc::DOUBLE;
169             break;
170           case SqlValue::Type::kBytes:
171             col_type = ColumnDesc::STRING;
172             break;
173           case SqlValue::Type::kNull:
174             break;
175         }
176       }
177 
178       // If either the column type is null or we still don't know the type,
179       // just add null values to all the columns.
180       if (cell.type == SqlValue::Type::kNull ||
181           col_type == ColumnDesc::UNKNOWN) {
182         col->add_long_values(0);
183         col->add_string_values("[NULL]");
184         col->add_double_values(0);
185         col->add_is_nulls(true);
186         continue;
187       }
188 
189       // Cast the sqlite value to the type of the column.
190       switch (col_type) {
191         case ColumnDesc::LONG:
192           PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
193                          cell.type == SqlValue::Type::kDouble);
194           if (cell.type == SqlValue::Type::kLong) {
195             col->add_long_values(cell.long_value);
196           } else /* if (cell.type == SqlValue::Type::kDouble) */ {
197             col->add_long_values(static_cast<int64_t>(cell.double_value));
198           }
199           col->add_is_nulls(false);
200           break;
201         case ColumnDesc::STRING: {
202           if (cell.type == SqlValue::Type::kBytes) {
203             col->add_string_values("<bytes>");
204           } else {
205             PERFETTO_CHECK(cell.type == SqlValue::Type::kString);
206             col->add_string_values(cell.string_value);
207           }
208           col->add_is_nulls(false);
209           break;
210         }
211         case ColumnDesc::DOUBLE:
212           PERFETTO_CHECK(cell.type == SqlValue::Type::kLong ||
213                          cell.type == SqlValue::Type::kDouble);
214           if (cell.type == SqlValue::Type::kLong) {
215             col->add_double_values(static_cast<double>(cell.long_value));
216           } else /* if (cell.type == SqlValue::Type::kDouble) */ {
217             col->add_double_values(cell.double_value);
218           }
219           col->add_is_nulls(false);
220           break;
221         case ColumnDesc::UNKNOWN:
222           PERFETTO_FATAL("Handled in if statement above.");
223       }
224     }  // for(col)
225   }    // for(row)
226 
227   // Write the column descriptors.
228   for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
229     auto* descriptor = result->add_column_descriptors();
230     std::string col_name = it.GetColumnName(col_idx);
231     descriptor->set_name(col_name.data(), col_name.size());
232     descriptor->set_type(col_types[col_idx]);
233   }
234 
235   // Merge the column values.
236   for (uint32_t col_idx = 0; col_idx < it.ColumnCount(); ++col_idx) {
237     std::vector<uint8_t> col_data = cols[col_idx].SerializeAsArray();
238     result->AppendBytes(protos::pbzero::RawQueryResult::kColumnsFieldNumber,
239                         col_data.data(), col_data.size());
240   }
241 
242   util::Status status = it.Status();
243   result->set_num_records(rows);
244   if (!status.ok())
245     result->set_error(status.c_message());
246   PERFETTO_DLOG("[RPC] RawQuery > %d rows (err: %d)", rows, !status.ok());
247 
248   return result.SerializeAsArray();
249 }
250 
GetCurrentTraceName()251 std::string Rpc::GetCurrentTraceName() {
252   if (!trace_processor_)
253     return "";
254   return trace_processor_->GetCurrentTraceName();
255 }
256 
RestoreInitialTables()257 void Rpc::RestoreInitialTables() {
258   if (trace_processor_)
259     trace_processor_->RestoreInitialTables();
260 }
261 
ComputeMetric(const uint8_t * data,size_t len)262 std::vector<uint8_t> Rpc::ComputeMetric(const uint8_t* data, size_t len) {
263   protozero::HeapBuffered<protos::pbzero::ComputeMetricResult> result;
264   if (!trace_processor_) {
265     result->set_error("Null trace processor instance");
266     return result.SerializeAsArray();
267   }
268 
269   protos::pbzero::ComputeMetricArgs::Decoder args(data, len);
270   std::vector<std::string> metric_names;
271   for (auto it = args.metric_names(); it; ++it) {
272     metric_names.emplace_back(it->as_std_string());
273   }
274 
275   PERFETTO_TP_TRACE("RPC_COMPUTE_METRIC", [&](metatrace::Record* r) {
276     for (const auto& metric : metric_names) {
277       r->AddArg("Metric", metric);
278       r->AddArg("Format", std::to_string(args.format()));
279     }
280   });
281 
282   switch (args.format()) {
283     case protos::pbzero::ComputeMetricArgs::BINARY_PROTOBUF: {
284       std::vector<uint8_t> metrics_proto;
285       util::Status status =
286           trace_processor_->ComputeMetric(metric_names, &metrics_proto);
287       if (status.ok()) {
288         result->AppendBytes(
289             protos::pbzero::ComputeMetricResult::kMetricsFieldNumber,
290             metrics_proto.data(), metrics_proto.size());
291       } else {
292         result->set_error(status.message());
293       }
294       break;
295     }
296     case protos::pbzero::ComputeMetricArgs::TEXTPROTO: {
297       std::string metrics_string;
298       util::Status status = trace_processor_->ComputeMetricText(
299           metric_names, TraceProcessor::MetricResultFormat::kProtoText,
300           &metrics_string);
301       if (status.ok()) {
302         result->AppendString(
303             protos::pbzero::ComputeMetricResult::kMetricsAsPrototextFieldNumber,
304             metrics_string);
305       } else {
306         result->set_error(status.message());
307       }
308       break;
309     }
310   }
311   return result.SerializeAsArray();
312 }
313 
GetMetricDescriptors(const uint8_t *,size_t)314 std::vector<uint8_t> Rpc::GetMetricDescriptors(const uint8_t*, size_t) {
315   protozero::HeapBuffered<protos::pbzero::GetMetricDescriptorsResult> result;
316   if (!trace_processor_) {
317     return result.SerializeAsArray();
318   }
319   std::vector<uint8_t> descriptor_set =
320       trace_processor_->GetMetricDescriptors();
321   result->AppendBytes(
322       protos::pbzero::GetMetricDescriptorsResult::kDescriptorSetFieldNumber,
323       descriptor_set.data(), descriptor_set.size());
324   return result.SerializeAsArray();
325 }
326 
EnableMetatrace()327 void Rpc::EnableMetatrace() {
328   if (!trace_processor_)
329     return;
330   trace_processor_->EnableMetatrace();
331 }
332 
DisableAndReadMetatrace()333 std::vector<uint8_t> Rpc::DisableAndReadMetatrace() {
334   protozero::HeapBuffered<protos::pbzero::DisableAndReadMetatraceResult> result;
335   if (!trace_processor_) {
336     result->set_error("Null trace processor instance");
337     return result.SerializeAsArray();
338   }
339 
340   std::vector<uint8_t> trace_proto;
341   util::Status status = trace_processor_->DisableAndReadMetatrace(&trace_proto);
342   if (status.ok()) {
343     result->set_metatrace(trace_proto.data(), trace_proto.size());
344   } else {
345     result->set_error(status.message());
346   }
347   return result.SerializeAsArray();
348 }
349 
350 }  // namespace trace_processor
351 }  // namespace perfetto
352