• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/cloud_trace_processor/trace_processor_wrapper.h"
18 #include <cstdint>
19 #include <vector>
20 
21 #include "perfetto/base/flat_set.h"
22 #include "perfetto/base/platform_handle.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/ext/base/string_utils.h"
26 #include "perfetto/ext/base/string_view.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/thread_pool.h"
29 #include "perfetto/ext/base/threading/util.h"
30 #include "protos/perfetto/cloud_trace_processor/worker.pb.h"
31 #include "test/gtest_and_gmock.h"
32 
33 namespace perfetto {
34 namespace cloud_trace_processor {
35 namespace {
36 
37 using SF = TraceProcessorWrapper::Statefulness;
38 
39 const char kSimpleSystrace[] = R"--(# tracer
40   surfaceflinger-598   (  598) [004] .... 10852.771242: tracing_mark_write: B|598|some event
41   surfaceflinger-598   (  598) [004] .... 10852.771245: tracing_mark_write: E|598
42 )--";
43 
SimpleSystrace()44 base::StatusOr<std::vector<uint8_t>> SimpleSystrace() {
45   return std::vector<uint8_t>(kSimpleSystrace,
46                               kSimpleSystrace + strlen(kSimpleSystrace));
47 }
48 
SimpleSystraceChunked()49 std::vector<base::StatusOr<std::vector<uint8_t>>> SimpleSystraceChunked() {
50   std::string systrace(kSimpleSystrace);
51   std::vector<base::StatusOr<std::vector<uint8_t>>> chunks;
52   for (auto& chunk : base::SplitString(systrace, "\n")) {
53     auto with_newline = chunk + "\n";
54     chunks.push_back(std::vector<uint8_t>(
55         with_newline.data(), with_newline.data() + with_newline.size()));
56   }
57 
58   return chunks;
59 }
60 
61 template <typename T>
WaitForFutureReady(base::Future<T> & future)62 T WaitForFutureReady(base::Future<T>& future) {
63   base::FlatSet<base::PlatformHandle> ready;
64   base::FlatSet<base::PlatformHandle> interested;
65   base::PollContext ctx(&interested, &ready);
66   auto res = future.Poll(&ctx);
67   for (; res.IsPending(); res = future.Poll(&ctx)) {
68     PERFETTO_CHECK(interested.size() == 1);
69     base::BlockUntilReadableFd(*interested.begin());
70     interested = {};
71   }
72   return res.item();
73 }
74 
75 template <typename T>
WaitForStreamReady(base::Stream<T> & stream)76 std::optional<T> WaitForStreamReady(base::Stream<T>& stream) {
77   base::FlatSet<base::PlatformHandle> ready;
78   base::FlatSet<base::PlatformHandle> interested;
79   base::PollContext ctx(&interested, &ready);
80   auto res = stream.PollNext(&ctx);
81   for (; res.IsPending(); res = stream.PollNext(&ctx)) {
82     PERFETTO_CHECK(interested.size() == 1);
83     base::BlockUntilReadableFd(*interested.begin());
84     interested = {};
85   }
86   return res.IsDone() ? std::nullopt : std::make_optional(res.item());
87 }
88 
TEST(TraceProcessorWrapperUnittest,Stateful)89 TEST(TraceProcessorWrapperUnittest, Stateful) {
90   base::ThreadPool pool(1);
91   TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful);
92   {
93     auto load = wrapper.LoadTrace(base::StreamOf(SimpleSystrace()));
94     base::Status status = WaitForFutureReady(load);
95     ASSERT_TRUE(status.ok()) << status.message();
96   }
97   {
98     auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
99     auto proto = WaitForStreamReady(stream);
100     ASSERT_TRUE(proto.has_value());
101     ASSERT_TRUE(proto->ok()) << proto->status().message();
102 
103     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
104   }
105   {
106     auto stream = wrapper.Query("SELECT ts, dur FROM foo");
107     auto proto = WaitForStreamReady(stream);
108 
109     ASSERT_TRUE(proto.has_value());
110     ASSERT_TRUE(proto->ok()) << proto->status().message();
111 
112     ASSERT_EQ(proto->value().trace(), "foobar");
113 
114     auto& result = proto.value()->result();
115     ASSERT_EQ(result.batch_size(), 1);
116     ASSERT_EQ(result.batch(0).cells_size(), 2);
117 
118     ASSERT_EQ(result.batch(0).cells(0),
119               protos::QueryResult::CellsBatch::CELL_VARINT);
120     ASSERT_EQ(result.batch(0).cells(1),
121               protos::QueryResult::CellsBatch::CELL_VARINT);
122     ASSERT_EQ(result.batch(0).varint_cells(0), 10852771242000);
123     ASSERT_EQ(result.batch(0).varint_cells(1), 3000);
124 
125     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
126   }
127 }
128 
TEST(TraceProcessorWrapperUnittest,Stateless)129 TEST(TraceProcessorWrapperUnittest, Stateless) {
130   base::ThreadPool pool(1);
131   TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateless);
132   {
133     auto load = wrapper.LoadTrace(base::StreamOf(SimpleSystrace()));
134     base::Status status = WaitForFutureReady(load);
135     ASSERT_TRUE(status.ok()) << status.message();
136   }
137   {
138     auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
139     auto proto = WaitForStreamReady(stream);
140     ASSERT_TRUE(proto.has_value());
141     ASSERT_TRUE(proto->ok()) << proto->status().message();
142 
143     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
144   }
145 
146   // Second CREATE VIEW should also succeed because the first one should have
147   // been wiped.
148   {
149     auto stream = wrapper.Query("CREATE VIEW foo AS SELECT ts, dur FROM slice");
150     auto proto = WaitForStreamReady(stream);
151     ASSERT_TRUE(proto.has_value());
152     ASSERT_TRUE(proto->ok()) << proto->status().message();
153 
154     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
155   }
156 
157   // Selecting from it should return an error.
158   {
159     auto stream = wrapper.Query("SELECT ts, dur FROM foo");
160     auto proto = WaitForStreamReady(stream);
161     ASSERT_TRUE(proto.has_value());
162     ASSERT_TRUE(proto->ok()) << proto->status().message();
163     ASSERT_TRUE(proto->value().result().has_error());
164 
165     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
166   }
167 }
168 
TEST(TraceProcessorWrapperUnittest,Chunked)169 TEST(TraceProcessorWrapperUnittest, Chunked) {
170   base::ThreadPool pool(1);
171   TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateless);
172   {
173     auto chunked = SimpleSystraceChunked();
174     ASSERT_EQ(chunked.size(), 3u);
175     auto load = wrapper.LoadTrace(base::StreamFrom(chunked));
176     base::Status status = WaitForFutureReady(load);
177     ASSERT_TRUE(status.ok()) << status.message();
178   }
179   {
180     auto stream = wrapper.Query("SELECT ts, dur FROM slice");
181     auto proto = WaitForStreamReady(stream);
182 
183     ASSERT_TRUE(proto.has_value());
184     ASSERT_TRUE(proto->ok()) << proto->status().message();
185 
186     ASSERT_EQ(proto->value().trace(), "foobar");
187 
188     auto& result = proto.value()->result();
189     ASSERT_EQ(result.batch_size(), 1);
190     ASSERT_EQ(result.batch(0).cells_size(), 2);
191 
192     ASSERT_EQ(result.batch(0).cells(0),
193               protos::QueryResult::CellsBatch::CELL_VARINT);
194     ASSERT_EQ(result.batch(0).cells(1),
195               protos::QueryResult::CellsBatch::CELL_VARINT);
196     ASSERT_EQ(result.batch(0).varint_cells(0), 10852771242000);
197     ASSERT_EQ(result.batch(0).varint_cells(1), 3000);
198 
199     ASSERT_FALSE(WaitForStreamReady(stream).has_value());
200   }
201 }
202 
203 }  // namespace
204 }  // namespace cloud_trace_processor
205 }  // namespace perfetto
206