• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
16
17from perfetto.trace_uri_resolver.util import parse_trace_uri
18from perfetto.trace_uri_resolver.resolver import _args_dict_from_uri
19from perfetto.trace_uri_resolver.resolver import TraceUriResolver
20from perfetto.trace_uri_resolver.registry import ResolverRegistry
21
22
23class SimpleResolver(TraceUriResolver):
24  PREFIX = 'simple'
25
26  def __init__(self, foo=None, bar=None):
27    self.foo = foo
28    self.bar = bar
29
30  def foo_gen(self):
31    yield self.foo.encode() if self.foo else b''
32
33  def bar_gen(self):
34    yield self.bar.encode() if self.bar else b''
35
36  def resolve(self):
37    return [
38        TraceUriResolver.Result(self.foo_gen()),
39        TraceUriResolver.Result(
40            self.bar_gen(), metadata={
41                'foo': self.foo,
42                'bar': self.bar
43            })
44    ]
45
46
47class RecursiveResolver(SimpleResolver):
48  PREFIX = 'recursive'
49
50  def __init__(self, foo=None, bar=None):
51    super().__init__(foo=foo, bar=bar)
52
53  def resolve(self):
54    return [
55        TraceUriResolver.Result(self.foo_gen()),
56        TraceUriResolver.Result(
57            self.bar_gen(), metadata={
58                'foo': 'foo',
59                'bar': 'bar'
60            }),
61        TraceUriResolver.Result(f'simple:foo={self.foo};bar={self.bar}'),
62        TraceUriResolver.Result(SimpleResolver(foo=self.foo, bar=self.bar)),
63    ]
64
65
66class TestResolver(unittest.TestCase):
67
68  def test_simple_resolve(self):
69    registry = ResolverRegistry([SimpleResolver])
70
71    res = registry.resolve('simple:foo=x;bar=y')
72    self.assertEqual(len(res), 2)
73
74    (foo_res, bar_res) = res
75    self._check_resolver_result(foo_res, bar_res)
76
77    (foo_res, bar_res) = registry.resolve(['simple:foo=x;bar=y'])
78    self._check_resolver_result(foo_res, bar_res)
79
80    resolver = SimpleResolver(foo='x', bar='y')
81
82    (foo_res, bar_res) = registry.resolve(resolver)
83    self._check_resolver_result(foo_res, bar_res)
84
85    (foo_res, bar_res) = registry.resolve([resolver])
86    self._check_resolver_result(foo_res, bar_res)
87
88    (foo_a, bar_b, foo_x,
89     bar_y) = registry.resolve(['simple:foo=a;bar=b', resolver])
90    self._check_resolver_result(foo_a, bar_b, foo='a', bar='b')
91    self._check_resolver_result(foo_x, bar_y)
92
93  def test_simple_resolve_missing_arg(self):
94    registry = ResolverRegistry([SimpleResolver])
95
96    (foo_res, bar_res) = registry.resolve('simple:foo=x')
97    self._check_resolver_result(foo_res, bar_res, bar=None)
98
99    (foo_res, bar_res) = registry.resolve('simple:bar=y')
100    self._check_resolver_result(foo_res, bar_res, foo=None)
101
102    (foo_res, bar_res) = registry.resolve('simple:')
103    self._check_resolver_result(foo_res, bar_res, foo=None, bar=None)
104
105  def test_recursive_resolve(self):
106    registry = ResolverRegistry([SimpleResolver])
107    registry.register(RecursiveResolver)
108
109    res = registry.resolve('recursive:foo=x;bar=y')
110    self.assertEqual(len(res), 6)
111
112    (non_rec_foo, non_rec_bar, rec_foo_str, rec_bar_str, rec_foo_obj,
113     rec_bar_obj) = res
114
115    self._check_resolver_result(
116        non_rec_foo, non_rec_bar, foo_metadata='foo', bar_metadata='bar')
117    self._check_resolver_result(rec_foo_str, rec_bar_str)
118    self._check_resolver_result(rec_foo_obj, rec_bar_obj)
119
120  def test_parse_trace_uri(self):
121    self.assertEqual(parse_trace_uri('/foo/bar'), (None, '/foo/bar'))
122    self.assertEqual(parse_trace_uri('foo/bar'), (None, 'foo/bar'))
123    self.assertEqual(parse_trace_uri('/foo/b:ar'), (None, '/foo/b:ar'))
124    self.assertEqual(parse_trace_uri('./foo/b:ar'), (None, './foo/b:ar'))
125    self.assertEqual(parse_trace_uri('foo/b:ar'), ('foo/b', 'ar'))
126
127  def test_args_dict_from_uri(self):
128    self.assertEqual(_args_dict_from_uri('foo:'), {})
129    self.assertEqual(_args_dict_from_uri('foo:bar=baz'), {
130        'bar': 'baz',
131    })
132    self.assertEqual(
133        _args_dict_from_uri('foo:key=v1,v2'), {'key': ['v1', 'v2']})
134    self.assertEqual(
135        _args_dict_from_uri('foo:bar=baz;key=v1,v2'), {
136            'bar': 'baz',
137            'key': ['v1', 'v2']
138        })
139
140  def _check_resolver_result(self,
141                             foo_res,
142                             bar_res,
143                             foo='x',
144                             bar='y',
145                             foo_metadata=None,
146                             bar_metadata=None):
147    self.assertEqual(
148        tuple(foo_res.generator), (foo.encode() if foo else ''.encode(),))
149    self.assertEqual(
150        tuple(bar_res.generator), (bar.encode() if bar else ''.encode(),))
151    self.assertEqual(
152        bar_res.metadata, {
153            'foo': foo_metadata if foo_metadata else foo,
154            'bar': bar_metadata if bar_metadata else bar
155        })