1
2 /*
3 * Copyright (C) 2020 The Android Open Source Project
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "src/trace_processor/rpc/query_result_serializer.h"
19
20 #include <deque>
21 #include <ostream>
22 #include <random>
23 #include <string>
24 #include <vector>
25
26 #include "perfetto/ext/base/string_utils.h"
27 #include "perfetto/trace_processor/basic_types.h"
28 #include "perfetto/trace_processor/trace_processor.h"
29 #include "test/gtest_and_gmock.h"
30
31 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
32
33 namespace perfetto {
34 namespace trace_processor {
35
36 // For ASSERT_THAT(ElementsAre(...))
operator ==(const SqlValue & a,const SqlValue & b)37 inline bool operator==(const SqlValue& a, const SqlValue& b) {
38 if (a.type != b.type)
39 return false;
40 if (a.type == SqlValue::kString)
41 return strcmp(a.string_value, b.string_value) == 0;
42 if (a.type == SqlValue::kBytes) {
43 if (a.bytes_count != b.bytes_count)
44 return false;
45 return memcmp(a.bytes_value, b.bytes_value, a.bytes_count) == 0;
46 }
47 return a.long_value == b.long_value;
48 }
49
operator <<(std::ostream & stream,const SqlValue & v)50 inline std::ostream& operator<<(std::ostream& stream, const SqlValue& v) {
51 stream << "SqlValue{";
52 switch (v.type) {
53 case SqlValue::kString:
54 return stream << "\"" << v.string_value << "\"}";
55 case SqlValue::kBytes:
56 return stream << "Bytes[" << v.bytes_count << "]:"
57 << base::ToHex(reinterpret_cast<const char*>(v.bytes_value),
58 v.bytes_count)
59 << "}";
60 case SqlValue::kLong:
61 return stream << "Long " << v.long_value << "}";
62 case SqlValue::kDouble:
63 return stream << "Double " << v.double_value << "}";
64 case SqlValue::kNull:
65 return stream << "NULL}";
66 }
67 return stream;
68 }
69
70 namespace {
71
72 using ::testing::ElementsAre;
73 using BatchProto = protos::pbzero::QueryResult::CellsBatch;
74 using ResultProto = protos::pbzero::QueryResult;
75
RunQueryChecked(TraceProcessor * tp,const std::string & query)76 void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
77 auto iter = tp->ExecuteQuery(query);
78 iter.Next();
79 ASSERT_TRUE(iter.Status().ok()) << iter.Status().message();
80 }
81
82 // Implements a minimal deserializer for QueryResultSerializer.
83 class TestDeserializer {
84 public:
85 void SerializeAndDeserialize(QueryResultSerializer*);
86 void DeserializeBuffer(const uint8_t* start, size_t size);
87
88 std::vector<std::string> columns;
89 std::vector<SqlValue> cells;
90 std::string error;
91 bool eof_reached = false;
92
93 private:
94 std::vector<std::unique_ptr<char[]>> copied_buf_;
95 };
96
SerializeAndDeserialize(QueryResultSerializer * serializer)97 void TestDeserializer::SerializeAndDeserialize(
98 QueryResultSerializer* serializer) {
99 std::vector<uint8_t> buf;
100 error.clear();
101 for (eof_reached = false; !eof_reached;) {
102 serializer->Serialize(&buf);
103 DeserializeBuffer(buf.data(), buf.size());
104 buf.clear();
105 }
106 }
107
DeserializeBuffer(const uint8_t * start,size_t size)108 void TestDeserializer::DeserializeBuffer(const uint8_t* start, size_t size) {
109 ResultProto::Decoder result(start, size);
110 error += result.error().ToStdString();
111 for (auto it = result.column_names(); it; ++it)
112 columns.push_back(it->as_std_string());
113
114 for (auto batch_it = result.batch(); batch_it; ++batch_it) {
115 ASSERT_FALSE(eof_reached);
116 auto batch_bytes = batch_it->as_bytes();
117
118 ResultProto::CellsBatch::Decoder batch(batch_bytes.data, batch_bytes.size);
119 eof_reached = batch.is_last_batch();
120 std::deque<int64_t> varints;
121 std::deque<double> doubles;
122 std::deque<std::string> blobs;
123
124 bool parse_error = false;
125 for (auto it = batch.varint_cells(&parse_error); it; ++it)
126 varints.emplace_back(*it);
127
128 for (auto it = batch.float64_cells(&parse_error); it; ++it)
129 doubles.emplace_back(*it);
130
131 for (auto it = batch.blob_cells(); it; ++it)
132 blobs.emplace_back((*it).ToStdString());
133
134 std::string merged_strings = batch.string_cells().ToStdString();
135 std::deque<std::string> strings;
136 for (size_t pos = 0; pos < merged_strings.size();) {
137 // Will return npos for the last string, but it's fine
138 size_t next_sep = merged_strings.find('\0', pos);
139 strings.emplace_back(merged_strings.substr(pos, next_sep - pos));
140 pos = next_sep == std::string::npos ? next_sep : next_sep + 1;
141 }
142
143 uint32_t num_cells = 0;
144 for (auto it = batch.cells(&parse_error); it; ++it, ++num_cells) {
145 uint8_t cell_type = static_cast<uint8_t>(*it);
146 switch (cell_type) {
147 case BatchProto::CELL_INVALID:
148 break;
149 case BatchProto::CELL_NULL:
150 cells.emplace_back(SqlValue());
151 break;
152 case BatchProto::CELL_VARINT:
153 ASSERT_GT(varints.size(), 0u);
154 cells.emplace_back(SqlValue::Long(varints.front()));
155 varints.pop_front();
156 break;
157 case BatchProto::CELL_FLOAT64:
158 ASSERT_GT(doubles.size(), 0u);
159 cells.emplace_back(SqlValue::Double(doubles.front()));
160 doubles.pop_front();
161 break;
162 case BatchProto::CELL_STRING: {
163 ASSERT_GT(strings.size(), 0u);
164 const std::string& str = strings.front();
165 copied_buf_.emplace_back(new char[str.size() + 1]);
166 char* new_buf = copied_buf_.back().get();
167 memcpy(new_buf, str.c_str(), str.size() + 1);
168 cells.emplace_back(SqlValue::String(new_buf));
169 strings.pop_front();
170 break;
171 }
172 case BatchProto::CELL_BLOB: {
173 ASSERT_GT(blobs.size(), 0u);
174 auto bytes = blobs.front();
175 copied_buf_.emplace_back(new char[bytes.size()]);
176 memcpy(copied_buf_.back().get(), bytes.data(), bytes.size());
177 cells.emplace_back(
178 SqlValue::Bytes(copied_buf_.back().get(), bytes.size()));
179 blobs.pop_front();
180 break;
181 }
182 default:
183 FAIL() << "Unknown cell type " << cell_type;
184 }
185
186 EXPECT_FALSE(parse_error);
187 }
188 if (columns.empty()) {
189 EXPECT_EQ(num_cells, 0u);
190 } else {
191 EXPECT_EQ(num_cells % columns.size(), 0u);
192 }
193 }
194 }
195
TEST(QueryResultSerializerTest,ShortBatch)196 TEST(QueryResultSerializerTest, ShortBatch) {
197 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
198
199 auto iter = tp->ExecuteQuery(
200 "select 1 as i8, 128 as i16, 100000 as i32, 42001001001 as i64, 1e9 as "
201 "f64, 'a_string' as str, cast('a_blob' as blob) as blb");
202 QueryResultSerializer ser(std::move(iter));
203 TestDeserializer deser;
204 deser.SerializeAndDeserialize(&ser);
205
206 EXPECT_THAT(deser.columns,
207 ElementsAre("i8", "i16", "i32", "i64", "f64", "str", "blb"));
208 EXPECT_THAT(deser.cells,
209 ElementsAre(SqlValue::Long(1), SqlValue::Long(128),
210 SqlValue::Long(100000), SqlValue::Long(42001001001),
211 SqlValue::Double(1e9), SqlValue::String("a_string"),
212 SqlValue::Bytes("a_blob", 6)));
213 }
214
TEST(QueryResultSerializerTest,LongBatch)215 TEST(QueryResultSerializerTest, LongBatch) {
216 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
217
218 RunQueryChecked(tp.get(), "create virtual table win using window;");
219 RunQueryChecked(tp.get(),
220 "update win set window_start=0, window_dur=8192, quantum=1 "
221 "where rowid = 0");
222
223 auto iter = tp->ExecuteQuery(
224 "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
225 QueryResultSerializer ser(std::move(iter));
226
227 TestDeserializer deser;
228 deser.SerializeAndDeserialize(&ser);
229
230 ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
231 ASSERT_EQ(deser.cells.size(), 4 * 8192u);
232 for (uint32_t row = 0; row < 1024; row++) {
233 uint32_t cell = row * 4;
234 ASSERT_EQ(deser.cells[cell].type, SqlValue::kString);
235 ASSERT_STREQ(deser.cells[cell].string_value, "x");
236
237 ASSERT_EQ(deser.cells[cell + 1].type, SqlValue::kLong);
238 ASSERT_EQ(deser.cells[cell + 1].long_value, row);
239
240 ASSERT_EQ(deser.cells[cell + 2].type, SqlValue::kDouble);
241 ASSERT_EQ(deser.cells[cell + 2].double_value, 1.0);
242
243 ASSERT_EQ(deser.cells[cell + 3].type, SqlValue::kLong);
244 ASSERT_EQ(deser.cells[cell + 3].long_value, row);
245 }
246 }
247
TEST(QueryResultSerializerTest,BatchSaturatingBinaryPayload)248 TEST(QueryResultSerializerTest, BatchSaturatingBinaryPayload) {
249 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
250
251 RunQueryChecked(tp.get(), "create virtual table win using window;");
252 RunQueryChecked(tp.get(),
253 "update win set window_start=0, window_dur=1024, quantum=1 "
254 "where rowid = 0");
255 auto iter = tp->ExecuteQuery(
256 "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
257 QueryResultSerializer ser(std::move(iter));
258 ser.set_batch_size_for_testing(1024, 32);
259
260 TestDeserializer deser;
261 deser.SerializeAndDeserialize(&ser);
262
263 ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
264 ASSERT_EQ(deser.cells.size(), 1024 * 4u);
265 }
266
TEST(QueryResultSerializerTest,BatchSaturatingNumCells)267 TEST(QueryResultSerializerTest, BatchSaturatingNumCells) {
268 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
269
270 RunQueryChecked(tp.get(), "create virtual table win using window;");
271 RunQueryChecked(tp.get(),
272 "update win set window_start=0, window_dur=4, quantum=1 "
273 "where rowid = 0");
274 auto iter = tp->ExecuteQuery(
275 "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
276 QueryResultSerializer ser(std::move(iter));
277 ser.set_batch_size_for_testing(16, 4096);
278
279 TestDeserializer deser;
280 deser.SerializeAndDeserialize(&ser);
281
282 ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
283 ASSERT_EQ(deser.cells.size(), 16u);
284 }
285
TEST(QueryResultSerializerTest,LargeStringAndBlobs)286 TEST(QueryResultSerializerTest, LargeStringAndBlobs) {
287 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
288 RunQueryChecked(tp.get(), "create table tab (colz);");
289
290 std::minstd_rand0 rnd_engine(0);
291 std::vector<SqlValue> expected;
292 std::string sql_values;
293 std::deque<std::string> string_buf; // Needs stable pointers
294 for (size_t n = 0; n < 32; n++) {
295 std::string very_long_str;
296 size_t len = (rnd_engine() % 4) * 32 * 1024;
297 very_long_str.resize(len);
298 for (size_t i = 0; i < very_long_str.size(); i++)
299 very_long_str[i] = 'A' + ((n * 11 + i) % 25);
300
301 if (n % 4 == 0) {
302 sql_values += "(NULL),";
303 expected.emplace_back(SqlValue()); // NULL.
304 } else if (n % 4 == 1) {
305 // Blob
306 sql_values += "(X'" + base::ToHex(very_long_str) + "'),";
307 string_buf.emplace_back(std::move(very_long_str));
308 expected.emplace_back(
309 SqlValue::Bytes(string_buf.back().data(), string_buf.back().size()));
310 } else {
311 sql_values += "('" + very_long_str + "'),";
312 string_buf.emplace_back(std::move(very_long_str));
313 expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
314 }
315 }
316 sql_values.resize(sql_values.size() - 1); // Remove trailing comma.
317 RunQueryChecked(tp.get(), "insert into tab (colz) values " + sql_values);
318
319 auto iter = tp->ExecuteQuery("select colz from tab");
320 QueryResultSerializer ser(std::move(iter));
321 TestDeserializer deser;
322 deser.SerializeAndDeserialize(&ser);
323 ASSERT_EQ(deser.cells.size(), expected.size());
324 for (size_t i = 0; i < expected.size(); i++) {
325 EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
326 }
327 }
328
TEST(QueryResultSerializerTest,RandomSizes)329 TEST(QueryResultSerializerTest, RandomSizes) {
330 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
331 static constexpr uint32_t kNumCells = 3 * 1000;
332
333 RunQueryChecked(tp.get(), "create table tab (a, b, c);");
334 std::vector<SqlValue> expected;
335 expected.reserve(kNumCells);
336 std::deque<std::string> string_buf; // Needs stable pointers
337 std::minstd_rand0 rnd_engine(0);
338 std::string insert_values;
339
340 for (uint32_t i = 0; i < kNumCells; i++) {
341 const uint32_t col = i % 3;
342 if (col == 0)
343 insert_values += "(";
344 int type = rnd_engine() % 5;
345 if (type == 0) {
346 expected.emplace_back(SqlValue()); // NULL
347 insert_values += "NULL";
348 } else if (type == 1) {
349 expected.emplace_back(SqlValue::Long(static_cast<long>(rnd_engine())));
350 insert_values += std::to_string(expected.back().long_value);
351 } else if (type == 2) {
352 expected.emplace_back(
353 SqlValue::Double(static_cast<double>(rnd_engine())));
354 insert_values += std::to_string(expected.back().double_value);
355 } else if (type == 3 || type == 4) {
356 size_t len = (rnd_engine() % 5) * 32;
357 std::string rndstr;
358 rndstr.resize(len);
359 for (size_t n = 0; n < len; n++)
360 rndstr[n] = static_cast<char>(rnd_engine() % 256);
361 auto rndstr_hex = base::ToHex(rndstr);
362 if (type == 3) {
363 insert_values += "\"" + rndstr_hex + "\"";
364 string_buf.emplace_back(std::move(rndstr_hex));
365 expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
366
367 } else {
368 insert_values += "X'" + rndstr_hex + "'";
369 string_buf.emplace_back(std::move(rndstr));
370 expected.emplace_back(SqlValue::Bytes(string_buf.back().data(),
371 string_buf.back().size()));
372 }
373 }
374
375 if (col < 2) {
376 insert_values += ",";
377 } else {
378 insert_values += "),";
379 if (insert_values.size() > 1024 * 1024 || i == kNumCells - 1) {
380 insert_values[insert_values.size() - 1] = ';';
381 auto query = "insert into tab (a,b,c) values " + insert_values;
382 insert_values = "";
383 RunQueryChecked(tp.get(), query);
384 }
385 }
386 }
387
388 // Serialize and de-serialize with different batch and payload sizes.
389 for (int rep = 0; rep < 10; rep++) {
390 auto iter = tp->ExecuteQuery("select * from tab");
391 QueryResultSerializer ser(std::move(iter));
392 uint32_t cells_per_batch = 1 << (rnd_engine() % 8 + 2);
393 uint32_t binary_payload_size = 1 << (rnd_engine() % 8 + 8);
394 ser.set_batch_size_for_testing(cells_per_batch, binary_payload_size);
395 TestDeserializer deser;
396 deser.SerializeAndDeserialize(&ser);
397 ASSERT_EQ(deser.cells.size(), expected.size());
398 for (size_t i = 0; i < expected.size(); i++) {
399 EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
400 }
401 }
402 }
403
TEST(QueryResultSerializerTest,ErrorBeforeStartingQuery)404 TEST(QueryResultSerializerTest, ErrorBeforeStartingQuery) {
405 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
406 auto iter = tp->ExecuteQuery("insert into incomplete_input");
407 QueryResultSerializer ser(std::move(iter));
408 TestDeserializer deser;
409 deser.SerializeAndDeserialize(&ser);
410 EXPECT_EQ(deser.cells.size(), 0u);
411 EXPECT_EQ(deser.error, "incomplete input");
412 EXPECT_TRUE(deser.eof_reached);
413 }
414
TEST(QueryResultSerializerTest,ErrorAfterSomeResults)415 TEST(QueryResultSerializerTest, ErrorAfterSomeResults) {
416 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
417 RunQueryChecked(tp.get(), "create table tab (x)");
418 RunQueryChecked(tp.get(), "insert into tab (x) values (0), (1), ('error')");
419 auto iter = tp->ExecuteQuery("select str_split('a;b', ';', x) as s from tab");
420 QueryResultSerializer ser(std::move(iter));
421 TestDeserializer deser;
422 deser.SerializeAndDeserialize(&ser);
423 EXPECT_NE(deser.error, "");
424 EXPECT_THAT(deser.cells,
425 ElementsAre(SqlValue::String("a"), SqlValue::String("b")));
426 EXPECT_TRUE(deser.eof_reached);
427 }
428
TEST(QueryResultSerializerTest,NoResultQuery)429 TEST(QueryResultSerializerTest, NoResultQuery) {
430 auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
431 {
432 auto iter = tp->ExecuteQuery("create table tab (x)");
433 QueryResultSerializer ser(std::move(iter));
434 TestDeserializer deser;
435 deser.SerializeAndDeserialize(&ser);
436 EXPECT_EQ(deser.error, "");
437 EXPECT_EQ(deser.cells.size(), 0u);
438 EXPECT_TRUE(deser.eof_reached);
439 }
440
441 // Check that the table has been created for real.
442 {
443 auto iter = tp->ExecuteQuery("select count(*) from tab");
444 QueryResultSerializer ser(std::move(iter));
445 TestDeserializer deser;
446 deser.SerializeAndDeserialize(&ser);
447 EXPECT_EQ(deser.error, "");
448 EXPECT_EQ(deser.cells.size(), 1u);
449 EXPECT_TRUE(deser.eof_reached);
450 }
451 }
452
453 } // namespace
454 } // namespace trace_processor
455 } // namespace perfetto
456