1# Copyright (C) 2022 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import unittest 16 17from perfetto.trace_uri_resolver.util import parse_trace_uri 18from perfetto.trace_uri_resolver.resolver import _args_dict_from_uri 19from perfetto.trace_uri_resolver.resolver import TraceUriResolver 20from perfetto.trace_uri_resolver.registry import ResolverRegistry 21 22 23class SimpleResolver(TraceUriResolver): 24 PREFIX = 'simple' 25 26 def __init__(self, foo=None, bar=None): 27 self.foo = foo 28 self.bar = bar 29 30 def foo_gen(self): 31 yield self.foo.encode() if self.foo else b'' 32 33 def bar_gen(self): 34 yield self.bar.encode() if self.bar else b'' 35 36 def resolve(self): 37 return [ 38 TraceUriResolver.Result(self.foo_gen()), 39 TraceUriResolver.Result( 40 self.bar_gen(), metadata={ 41 'foo': self.foo, 42 'bar': self.bar 43 }) 44 ] 45 46 47class RecursiveResolver(SimpleResolver): 48 PREFIX = 'recursive' 49 50 def __init__(self, foo=None, bar=None): 51 super().__init__(foo=foo, bar=bar) 52 53 def resolve(self): 54 return [ 55 TraceUriResolver.Result(self.foo_gen()), 56 TraceUriResolver.Result( 57 self.bar_gen(), metadata={ 58 'foo': 'foo', 59 'bar': 'bar' 60 }), 61 TraceUriResolver.Result(f'simple:foo={self.foo};bar={self.bar}'), 62 TraceUriResolver.Result(SimpleResolver(foo=self.foo, bar=self.bar)), 63 ] 64 65 66class TestResolver(unittest.TestCase): 67 68 def test_simple_resolve(self): 69 registry = ResolverRegistry([SimpleResolver]) 70 71 res = registry.resolve('simple:foo=x;bar=y') 72 self.assertEqual(len(res), 2) 73 74 (foo_res, bar_res) = res 75 self._check_resolver_result(foo_res, bar_res) 76 77 (foo_res, bar_res) = registry.resolve(['simple:foo=x;bar=y']) 78 self._check_resolver_result(foo_res, bar_res) 79 80 resolver = SimpleResolver(foo='x', bar='y') 81 82 (foo_res, bar_res) = registry.resolve(resolver) 83 self._check_resolver_result(foo_res, bar_res) 84 85 (foo_res, bar_res) = registry.resolve([resolver]) 86 self._check_resolver_result(foo_res, bar_res) 87 88 (foo_a, bar_b, foo_x, 89 bar_y) = registry.resolve(['simple:foo=a;bar=b', resolver]) 90 self._check_resolver_result(foo_a, bar_b, foo='a', bar='b') 91 self._check_resolver_result(foo_x, bar_y) 92 93 def test_simple_resolve_missing_arg(self): 94 registry = ResolverRegistry([SimpleResolver]) 95 96 (foo_res, bar_res) = registry.resolve('simple:foo=x') 97 self._check_resolver_result(foo_res, bar_res, bar=None) 98 99 (foo_res, bar_res) = registry.resolve('simple:bar=y') 100 self._check_resolver_result(foo_res, bar_res, foo=None) 101 102 (foo_res, bar_res) = registry.resolve('simple:') 103 self._check_resolver_result(foo_res, bar_res, foo=None, bar=None) 104 105 def test_recursive_resolve(self): 106 registry = ResolverRegistry([SimpleResolver]) 107 registry.register(RecursiveResolver) 108 109 res = registry.resolve('recursive:foo=x;bar=y') 110 self.assertEqual(len(res), 6) 111 112 (non_rec_foo, non_rec_bar, rec_foo_str, rec_bar_str, rec_foo_obj, 113 rec_bar_obj) = res 114 115 self._check_resolver_result( 116 non_rec_foo, non_rec_bar, foo_metadata='foo', bar_metadata='bar') 117 self._check_resolver_result(rec_foo_str, rec_bar_str) 118 self._check_resolver_result(rec_foo_obj, rec_bar_obj) 119 120 def test_parse_trace_uri(self): 121 self.assertEqual(parse_trace_uri('/foo/bar'), (None, '/foo/bar')) 122 self.assertEqual(parse_trace_uri('foo/bar'), (None, 'foo/bar')) 123 self.assertEqual(parse_trace_uri('/foo/b:ar'), (None, '/foo/b:ar')) 124 self.assertEqual(parse_trace_uri('./foo/b:ar'), (None, './foo/b:ar')) 125 self.assertEqual(parse_trace_uri('foo/b:ar'), ('foo/b', 'ar')) 126 127 def test_args_dict_from_uri(self): 128 self.assertEqual(_args_dict_from_uri('foo:'), {}) 129 self.assertEqual(_args_dict_from_uri('foo:bar=baz'), { 130 'bar': 'baz', 131 }) 132 self.assertEqual( 133 _args_dict_from_uri('foo:key=v1,v2'), {'key': ['v1', 'v2']}) 134 self.assertEqual( 135 _args_dict_from_uri('foo:bar=baz;key=v1,v2'), { 136 'bar': 'baz', 137 'key': ['v1', 'v2'] 138 }) 139 140 def _check_resolver_result(self, 141 foo_res, 142 bar_res, 143 foo='x', 144 bar='y', 145 foo_metadata=None, 146 bar_metadata=None): 147 self.assertEqual( 148 tuple(foo_res.generator), (foo.encode() if foo else ''.encode(),)) 149 self.assertEqual( 150 tuple(bar_res.generator), (bar.encode() if bar else ''.encode(),)) 151 self.assertEqual( 152 bar_res.metadata, { 153 'foo': foo_metadata if foo_metadata else foo, 154 'bar': bar_metadata if bar_metadata else bar 155 })