1#!/usr/bin/env python3 2# Copyright (C) 2020 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import io 17import os 18import unittest 19from typing import Optional 20 21import pandas as pd 22 23from perfetto.batch_trace_processor.api import BatchTraceProcessor 24from perfetto.batch_trace_processor.api import BatchTraceProcessorConfig 25from perfetto.batch_trace_processor.api import FailureHandling 26from perfetto.batch_trace_processor.api import Metadata 27from perfetto.batch_trace_processor.api import TraceListReference 28from perfetto.trace_processor.api import PLATFORM_DELEGATE 29from perfetto.trace_processor.api import TraceProcessor 30from perfetto.trace_processor.api import TraceProcessorException 31from perfetto.trace_processor.api import TraceProcessorConfig 32from perfetto.trace_processor.api import TraceReference 33from perfetto.trace_uri_resolver.resolver import TraceUriResolver 34from perfetto.trace_uri_resolver.path import PathUriResolver 35 36 37class SimpleResolver(TraceUriResolver): 38 PREFIX = 'simple' 39 40 def __init__(self, path, skip_resolve_file=False): 41 self.path = path 42 self.file = open(example_android_trace_path(), 'rb') 43 self.skip_resolve_file = skip_resolve_file 44 45 def file_gen(self): 46 with open(example_android_trace_path(), 'rb') as f: 47 yield f.read() 48 49 def resolve(self): 50 res = [ 51 TraceUriResolver.Result( 52 self.file_gen(), metadata={'source': 'generator'}), 53 TraceUriResolver.Result( 54 example_android_trace_path(), metadata={'source': 'path'}), 55 ] 56 if not self.skip_resolve_file: 57 res.extend([ 58 TraceUriResolver.Result( 59 PathUriResolver(example_android_trace_path()), 60 metadata={'source': 'path_resolver'}), 61 TraceUriResolver.Result(self.file, metadata={'source': 'file'}), 62 ]) 63 return res 64 65 66class RecursiveResolver(SimpleResolver): 67 PREFIX = 'recursive' 68 69 def __init__(self, path, skip_resolve_file): 70 super().__init__(path=path, skip_resolve_file=skip_resolve_file) 71 72 def resolve(self): 73 srf = self.skip_resolve_file 74 return [ 75 TraceUriResolver.Result( 76 self.file_gen(), metadata={'source': 'recursive_gen'}), 77 TraceUriResolver.Result( 78 f'simple:path={self.path};skip_resolve_file={srf}', 79 metadata={ 80 'source': 'recursive_path', 81 'root_source': 'recursive_path' 82 }), 83 TraceUriResolver.Result( 84 SimpleResolver( 85 path=self.path, skip_resolve_file=self.skip_resolve_file), 86 metadata={ 87 'source': 'recursive_obj', 88 'root_source': 'recursive_obj' 89 }), 90 ] 91 92 93class SimpleObserver(BatchTraceProcessor.Observer): 94 95 def __init__(self): 96 self.execution_times = [] 97 98 def trace_processed(self, metadata: Metadata, execution_time_seconds: float): 99 self.execution_times.append(execution_time_seconds) 100 101 102def create_batch_tp( 103 traces: TraceListReference, 104 load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION, 105 execute_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION, 106 observer: Optional[BatchTraceProcessor.Observer] = None): 107 registry = PLATFORM_DELEGATE().default_resolver_registry() 108 registry.register(SimpleResolver) 109 registry.register(RecursiveResolver) 110 config = BatchTraceProcessorConfig( 111 load_failure_handling=load_failure_handling, 112 execute_failure_handling=execute_failure_handling, 113 tp_config=TraceProcessorConfig( 114 bin_path=os.environ["SHELL_PATH"], resolver_registry=registry)) 115 return BatchTraceProcessor(traces=traces, config=config, observer=observer) 116 117 118def create_tp(trace: TraceReference): 119 return TraceProcessor( 120 trace=trace, 121 config=TraceProcessorConfig(bin_path=os.environ["SHELL_PATH"])) 122 123 124def example_android_trace_path(): 125 return os.path.join(os.environ["ROOT_DIR"], 'test', 'data', 126 'example_android_trace_30s.pb') 127 128 129class TestApi(unittest.TestCase): 130 131 def test_invalid_trace(self): 132 f = io.BytesIO(b'<foo></foo>') 133 with self.assertRaises(TraceProcessorException): 134 _ = create_tp(trace=f) 135 136 def test_trace_path(self): 137 # Get path to trace_processor_shell and construct TraceProcessor 138 tp = create_tp(trace=example_android_trace_path()) 139 qr_iterator = tp.query('select * from slice limit 10') 140 dur_result = [ 141 178646, 119740, 58073, 155000, 173177, 20209377, 3589167, 90104, 275312, 142 65313 143 ] 144 145 for num, row in enumerate(qr_iterator): 146 self.assertEqual(row.type, 'internal_slice') 147 self.assertEqual(row.dur, dur_result[num]) 148 149 # Test the batching logic by issuing a large query and ensuring we receive 150 # all rows, not just a truncated subset. 151 qr_iterator = tp.query('select count(*) as cnt from slice') 152 expected_count = next(qr_iterator).cnt 153 self.assertGreater(expected_count, 0) 154 155 qr_iterator = tp.query('select * from slice') 156 count = sum(1 for _ in qr_iterator) 157 self.assertEqual(count, expected_count) 158 159 tp.close() 160 161 def test_trace_byteio(self): 162 f = io.BytesIO( 163 b'\n(\n&\x08\x00\x12\x12\x08\x01\x10\xc8\x01\x1a\x0b\x12\t' 164 b'B|200|foo\x12\x0e\x08\x02\x10\xc8\x01\x1a\x07\x12\x05E|200') 165 with create_tp(trace=f) as tp: 166 qr_iterator = tp.query('select * from slice limit 10') 167 res = list(qr_iterator) 168 169 self.assertEqual(len(res), 1) 170 171 row = res[0] 172 self.assertEqual(row.ts, 1) 173 self.assertEqual(row.dur, 1) 174 self.assertEqual(row.name, 'foo') 175 176 def test_trace_file(self): 177 with open(example_android_trace_path(), 'rb') as file: 178 with create_tp(trace=file) as tp: 179 qr_iterator = tp.query('select * from slice limit 10') 180 dur_result = [ 181 178646, 119740, 58073, 155000, 173177, 20209377, 3589167, 90104, 182 275312, 65313 183 ] 184 185 for num, row in enumerate(qr_iterator): 186 self.assertEqual(row.dur, dur_result[num]) 187 188 def test_trace_generator(self): 189 190 def reader_generator(): 191 with open(example_android_trace_path(), 'rb') as file: 192 yield file.read(1024) 193 194 with create_tp(trace=reader_generator()) as tp: 195 qr_iterator = tp.query('select * from slice limit 10') 196 dur_result = [ 197 178646, 119740, 58073, 155000, 173177, 20209377, 3589167, 90104, 198 275312, 65313 199 ] 200 201 for num, row in enumerate(qr_iterator): 202 self.assertEqual(row.dur, dur_result[num]) 203 204 def test_simple_resolver(self): 205 dur = [178646, 178646, 178646, 178646] 206 source = ['generator', 'path', 'path_resolver', 'file'] 207 expected = pd.DataFrame(list(zip(dur, source)), columns=['dur', 'source']) 208 209 with create_batch_tp( 210 traces='simple:path={}'.format(example_android_trace_path())) as btp: 211 df = btp.query_and_flatten('select dur from slice limit 1') 212 pd.testing.assert_frame_equal(df, expected, check_dtype=False) 213 214 with create_batch_tp( 215 traces=SimpleResolver(path=example_android_trace_path())) as btp: 216 df = btp.query_and_flatten('select dur from slice limit 1') 217 pd.testing.assert_frame_equal(df, expected, check_dtype=False) 218 219 def test_query_timing(self): 220 observer = SimpleObserver() 221 with create_batch_tp( 222 traces='simple:path={}'.format(example_android_trace_path()), 223 observer=observer) as btp: 224 btp.query_and_flatten('select dur from slice limit 1') 225 self.assertTrue( 226 all([x > 0 for x in observer.execution_times]), 227 'Running time should be positive') 228 229 def test_recursive_resolver(self): 230 dur = [ 231 178646, 178646, 178646, 178646, 178646, 178646, 178646, 178646, 178646 232 ] 233 source = ['recursive_gen', 'generator', 'path', 'generator', 'path'] 234 root_source = [ 235 None, 'recursive_path', 'recursive_path', 'recursive_obj', 236 'recursive_obj' 237 ] 238 expected = pd.DataFrame( 239 list(zip(dur, source, root_source)), 240 columns=['dur', 'source', 'root_source']) 241 242 uri = 'recursive:path={};skip_resolve_file=true'.format( 243 example_android_trace_path()) 244 with create_batch_tp(traces=uri) as btp: 245 df = btp.query_and_flatten('select dur from slice limit 1') 246 pd.testing.assert_frame_equal(df, expected, check_dtype=False) 247 248 with create_batch_tp( 249 traces=RecursiveResolver( 250 path=example_android_trace_path(), skip_resolve_file=True)) as btp: 251 df = btp.query_and_flatten('select dur from slice limit 1') 252 pd.testing.assert_frame_equal(df, expected, check_dtype=False) 253 254 def test_btp_load_failure(self): 255 f = io.BytesIO(b'<foo></foo>') 256 with self.assertRaises(TraceProcessorException): 257 _ = create_batch_tp(traces=f) 258 259 def test_btp_load_failure_increment_stat(self): 260 f = io.BytesIO(b'<foo></foo>') 261 btp = create_batch_tp( 262 traces=f, load_failure_handling=FailureHandling.INCREMENT_STAT) 263 self.assertEqual(btp.stats().load_failures, 1) 264 265 def test_btp_query_failure(self): 266 btp = create_batch_tp(traces=example_android_trace_path()) 267 with self.assertRaises(TraceProcessorException): 268 _ = btp.query('select * from sl') 269 270 def test_btp_query_failure_increment_stat(self): 271 btp = create_batch_tp( 272 traces=example_android_trace_path(), 273 execute_failure_handling=FailureHandling.INCREMENT_STAT) 274 _ = btp.query('select * from sl') 275 self.assertEqual(btp.stats().execute_failures, 1) 276 277 def test_btp_query_failure_message(self): 278 btp = create_batch_tp( 279 traces='simple:path={}'.format(example_android_trace_path())) 280 with self.assertRaisesRegex( 281 TraceProcessorException, expected_regex='.*source.*generator.*'): 282 _ = btp.query('select * from sl') 283