1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/cloud_trace_processor/trace_processor_wrapper.h"
18 #include <cstdint>
19 #include <vector>
20
21 #include "perfetto/base/flat_set.h"
22 #include "perfetto/base/platform_handle.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/ext/base/string_utils.h"
26 #include "perfetto/ext/base/string_view.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/thread_pool.h"
29 #include "perfetto/ext/base/threading/util.h"
30 #include "protos/perfetto/cloud_trace_processor/worker.pb.h"
31 #include "test/gtest_and_gmock.h"
32
33 namespace perfetto {
34 namespace cloud_trace_processor {
35 namespace {
36
37 using SF = TraceProcessorWrapper::Statefulness;
38
39 const char kSimpleSystrace[] = R"--(# tracer
40 surfaceflinger-598 ( 598) [004] .... 10852.771242: tracing_mark_write: B|598|some event
41 surfaceflinger-598 ( 598) [004] .... 10852.771245: tracing_mark_write: E|598
42 )--";
43
SimpleSystrace()44 base::StatusOr<std::vector<uint8_t>> SimpleSystrace() {
45 return std::vector<uint8_t>(kSimpleSystrace,
46 kSimpleSystrace + strlen(kSimpleSystrace));
47 }
48
SimpleSystraceChunked()49 std::vector<base::StatusOr<std::vector<uint8_t>>> SimpleSystraceChunked() {
50 std::string systrace(kSimpleSystrace);
51 std::vector<base::StatusOr<std::vector<uint8_t>>> chunks;
52 for (auto& chunk : base::SplitString(systrace, "\n")) {
53 auto with_newline = chunk + "\n";
54 chunks.push_back(std::vector<uint8_t>(
55 with_newline.data(), with_newline.data() + with_newline.size()));
56 }
57
58 return chunks;
59 }
60
61 template <typename T>
WaitForFutureReady(base::Future<T> & future)62 T WaitForFutureReady(base::Future<T>& future) {
63 base::FlatSet<base::PlatformHandle> ready;
64 base::FlatSet<base::PlatformHandle> interested;
65 base::PollContext ctx(&interested, &ready);
66 auto res = future.Poll(&ctx);
67 for (; res.IsPending(); res = future.Poll(&ctx)) {
68 PERFETTO_CHECK(interested.size() == 1);
69 base::BlockUntilReadableFd(*interested.begin());
70 interested = {};
71 }
72 return res.item();
73 }
74
75 template <typename T>
WaitForStreamReady(base::Stream<T> & stream)76 std::optional<T> WaitForStreamReady(base::Stream<T>& stream) {
77 base::FlatSet<base::PlatformHandle> ready;
78 base::FlatSet<base::PlatformHandle> interested;
79 base::PollContext ctx(&interested, &ready);
80 auto res = stream.PollNext(&ctx);
81 for (; res.IsPending(); res = stream.PollNext(&ctx)) {
82 PERFETTO_CHECK(interested.size() == 1);
83 base::BlockUntilReadableFd(*interested.begin());
84 interested = {};
85 }
86 return res.IsDone() ? std::nullopt : std::make_optional(res.item());
87 }
88
TEST(TraceProcessorWrapperUnittest,Stateful)89 TEST(TraceProcessorWrapperUnittest, Stateful) {
90 base::ThreadPool pool(1);
91 TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful);
92 {
93 auto load = wrapper.LoadTrace(base::StreamOf(SimpleSystrace()));
94 base::Status status = WaitForFutureReady(load);
95 ASSERT_TRUE(status.ok()) << status.message();
96 }
97 {
98 auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
99 auto proto = WaitForStreamReady(stream);
100 ASSERT_TRUE(proto.has_value());
101 ASSERT_TRUE(proto->ok()) << proto->status().message();
102
103 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
104 }
105 {
106 auto stream = wrapper.Query("SELECT ts, dur FROM foo");
107 auto proto = WaitForStreamReady(stream);
108
109 ASSERT_TRUE(proto.has_value());
110 ASSERT_TRUE(proto->ok()) << proto->status().message();
111
112 ASSERT_EQ(proto->value().trace(), "foobar");
113
114 auto& result = proto.value()->result();
115 ASSERT_EQ(result.batch_size(), 1);
116 ASSERT_EQ(result.batch(0).cells_size(), 2);
117
118 ASSERT_EQ(result.batch(0).cells(0),
119 protos::QueryResult::CellsBatch::CELL_VARINT);
120 ASSERT_EQ(result.batch(0).cells(1),
121 protos::QueryResult::CellsBatch::CELL_VARINT);
122 ASSERT_EQ(result.batch(0).varint_cells(0), 10852771242000);
123 ASSERT_EQ(result.batch(0).varint_cells(1), 3000);
124
125 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
126 }
127 }
128
TEST(TraceProcessorWrapperUnittest,Stateless)129 TEST(TraceProcessorWrapperUnittest, Stateless) {
130 base::ThreadPool pool(1);
131 TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateless);
132 {
133 auto load = wrapper.LoadTrace(base::StreamOf(SimpleSystrace()));
134 base::Status status = WaitForFutureReady(load);
135 ASSERT_TRUE(status.ok()) << status.message();
136 }
137 {
138 auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
139 auto proto = WaitForStreamReady(stream);
140 ASSERT_TRUE(proto.has_value());
141 ASSERT_TRUE(proto->ok()) << proto->status().message();
142
143 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
144 }
145
146 // Second CREATE VIEW should also succeed because the first one should have
147 // been wiped.
148 {
149 auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
150 auto proto = WaitForStreamReady(stream);
151 ASSERT_TRUE(proto.has_value());
152 ASSERT_TRUE(proto->ok()) << proto->status().message();
153
154 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
155 }
156
157 // Selecting from it should return an error.
158 {
159 auto stream = wrapper.Query("SELECT ts, dur FROM foo");
160 auto proto = WaitForStreamReady(stream);
161 ASSERT_TRUE(proto.has_value());
162 ASSERT_TRUE(proto->ok()) << proto->status().message();
163 ASSERT_TRUE(proto->value().result().has_error());
164
165 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
166 }
167 }
168
TEST(TraceProcessorWrapperUnittest,Chunked)169 TEST(TraceProcessorWrapperUnittest, Chunked) {
170 base::ThreadPool pool(1);
171 TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateless);
172 {
173 auto chunked = SimpleSystraceChunked();
174 ASSERT_EQ(chunked.size(), 3u);
175 auto load = wrapper.LoadTrace(base::StreamFrom(chunked));
176 base::Status status = WaitForFutureReady(load);
177 ASSERT_TRUE(status.ok()) << status.message();
178 }
179 {
180 auto stream = wrapper.Query("SELECT ts, dur FROM slice");
181 auto proto = WaitForStreamReady(stream);
182
183 ASSERT_TRUE(proto.has_value());
184 ASSERT_TRUE(proto->ok()) << proto->status().message();
185
186 ASSERT_EQ(proto->value().trace(), "foobar");
187
188 auto& result = proto.value()->result();
189 ASSERT_EQ(result.batch_size(), 1);
190 ASSERT_EQ(result.batch(0).cells_size(), 2);
191
192 ASSERT_EQ(result.batch(0).cells(0),
193 protos::QueryResult::CellsBatch::CELL_VARINT);
194 ASSERT_EQ(result.batch(0).cells(1),
195 protos::QueryResult::CellsBatch::CELL_VARINT);
196 ASSERT_EQ(result.batch(0).varint_cells(0), 10852771242000);
197 ASSERT_EQ(result.batch(0).varint_cells(1), 3000);
198
199 ASSERT_FALSE(WaitForStreamReady(stream).has_value());
200 }
201 }
202
203 } // namespace
204 } // namespace cloud_trace_processor
205 } // namespace perfetto
206