• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#    Copyright 2015-2017 ARM Limited
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16import unittest
17from trappy.stats.Topology import Topology
18from trappy.stats.Trigger import Trigger
19from trappy.stats.Aggregator import MultiTriggerAggregator
20
21import collections
22import trappy
23from trappy.base import Base
24import pandas as pd
25from pandas.util.testing import assert_series_equal
26
27
28class TestTopology(unittest.TestCase):
29
30    def test_add_to_level(self):
31        """Test level creation"""
32
33        level_groups = [[1, 2], [0, 3, 4, 5]]
34        level = "test_level"
35        topology = Topology()
36        topology.add_to_level(level, level_groups)
37        check_groups = topology.get_level(level)
38
39        self.assertTrue(topology.has_level(level))
40        self.assertEqual(level_groups, check_groups)
41
42    def test_flatten(self):
43        """Test Topology: flatten"""
44
45        level_groups = [[1, 2], [0, 3, 4, 5]]
46        level = "test_level"
47        topology = Topology()
48        topology.add_to_level(level, level_groups)
49        flattened = [0, 1, 2, 3, 4, 5]
50
51        self.assertEqual(flattened, topology.flatten())
52
53    def test_cpu_topology_construction(self):
54        """Test CPU Topology Construction"""
55
56        cluster_0 = [0, 3, 4, 5]
57        cluster_1 = [1, 2]
58        clusters = [cluster_0, cluster_1]
59        topology = Topology(clusters=clusters)
60
61        # Check cluster level creation
62        cluster_groups = [[0, 3, 4, 5], [1, 2]]
63        self.assertTrue(topology.has_level("cluster"))
64        self.assertEqual(cluster_groups, topology.get_level("cluster"))
65
66        # Check cpu level creation
67        cpu_groups = [[0], [1], [2], [3], [4], [5]]
68        self.assertTrue(topology.has_level("cpu"))
69        self.assertEqual(cpu_groups, topology.get_level("cpu"))
70
71        # Check "all" level
72        all_groups = [[0, 1, 2, 3, 4, 5]]
73        self.assertEqual(all_groups, topology.get_level("all"))
74
75    def test_level_span(self):
76        """TestTopology: level_span"""
77
78        level_groups = [[1, 2], [0, 3, 4, 5]]
79        level = "test_level"
80        topology = Topology()
81        topology.add_to_level(level, level_groups)
82
83        self.assertEqual(topology.level_span(level), 2)
84
85    def test_group_index(self):
86        """TestTopology: get_index"""
87
88        level_groups = [[1, 2], [0, 3, 4, 5]]
89        level = "test_level"
90        topology = Topology()
91        topology.add_to_level(level, level_groups)
92
93        self.assertEqual(topology.get_index(level, [1, 2]), 0)
94        self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1)
95
96class BaseTestStats(unittest.TestCase):
97    def setUp(self):
98        trace = trappy.BareTrace()
99        data = {
100
101            "identifier": [
102                0,
103                0,
104                0,
105                1,
106                1,
107                1,
108            ],
109            "result": [
110                "fire",
111                "blank",
112                "fire",
113                "blank",
114                "fire",
115                "blank",
116            ],
117        }
118
119        index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time")
120        data_frame = pd.DataFrame(data, index=index)
121        trace.add_parsed_event("aim_and_fire", data_frame)
122        self._trace = trace
123        self.topology = Topology(clusters=[[0], [1]])
124
125
126class TestTrigger(BaseTestStats):
127
128    def test_trigger_generation(self):
129        """TestTrigger: generate"""
130
131        filters = {
132            "result": "fire"
133        }
134
135        event_class = self._trace.aim_and_fire
136        value = 1
137        pivot = "identifier"
138
139        trigger = Trigger(self._trace,
140                          event_class,
141                          filters,
142                          value,
143                          pivot)
144
145        expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time"))
146        assert_series_equal(expected, trigger.generate(0))
147
148        expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
149        assert_series_equal(expected, trigger.generate(1))
150
151    def test_trigger_with_func(self):
152        """Trigger works with a function or lambda as filter"""
153
154        def my_filter(val):
155            return val.startswith("fi")
156
157        trigger = Trigger(self._trace, self._trace.aim_and_fire,
158                          filters={"result": my_filter}, value=1,
159                          pivot="identifier")
160
161        expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
162        assert_series_equal(expected, trigger.generate(1))
163
164        my_filters = {"result": lambda x: x.startswith("bl")}
165        trigger = Trigger(self._trace, self._trace.aim_and_fire,
166                          filters=my_filters, value=1, pivot="identifier")
167
168        expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time"))
169        assert_series_equal(expected, trigger.generate(1))
170
171    def test_trigger_with_callable_class(self):
172        """Trigger works with a callable class as filter"""
173
174        class my_filter(object):
175            def __init__(self, val_out):
176                self.prev_val = 0
177                self.val_out = val_out
178
179            def __call__(self, val):
180                ret = self.prev_val == self.val_out
181                self.prev_val = val
182
183                return ret
184
185        trigger = Trigger(self._trace, self._trace.aim_and_fire,
186                          filters={"identifier": my_filter(1)}, value=1,
187                          pivot="result")
188
189        expected = pd.Series([1], index=pd.Index([0.6], name="Time"))
190        assert_series_equal(expected, trigger.generate("blank"))
191
192    def test_filter_prev_values(self):
193        """Trigger works with a filter that depends on previous values of the same pivot"""
194
195        # We generate an example in which we want a trigger whenever the
196        # identifier is no longer 1 for blank
197
198        class my_filter(object):
199            def __init__(self, val_out):
200                self.prev_val = 0
201                self.val_out = val_out
202
203            def __call__(self, val):
204                ret = self.prev_val == self.val_out
205                self.prev_val = val
206
207                return ret
208
209        trace = trappy.BareTrace()
210        data = collections.OrderedDict([
211            (0.1, ["blank", 1]),
212            (0.2, ["fire",  1]),
213            (0.3, ["blank", 0]), # value is no longer 1, trigger
214            (0.4, ["blank", 1]),
215            (0.5, ["fire",  0]), # This should NOT trigger
216            (0.6, ["blank", 0]), # value is no longer 1 for blank, trigger
217        ])
218        data_frame = pd.DataFrame.from_dict(data, orient="index", )
219        data_frame.columns = ["result", "identifier"]
220        trace.add_parsed_event("aim_and_fire", data_frame)
221
222        trigger = Trigger(trace, trace.aim_and_fire,
223                          filters={"identifier": my_filter(1)}, value=-1,
224                          pivot="result")
225
226        expected = pd.Series([-1, -1], index=[0.3, 0.6])
227        assert_series_equal(expected, trigger.generate("blank"))
228
229
230
231class TestAggregator(BaseTestStats):
232
233    def test_scalar_aggfunc_single_trigger(self):
234        """TestAggregator: 1 trigger scalar aggfunc"""
235
236        def aggfunc(series):
237            return series.sum()
238
239        filters = {
240            "result": "fire"
241        }
242
243        event_class = self._trace.aim_and_fire
244        value = 1
245        pivot = "identifier"
246
247        trigger = Trigger(self._trace,
248                          event_class,
249                          filters,
250                          value,
251                          pivot)
252
253        aggregator = MultiTriggerAggregator([trigger],
254                        self.topology,
255                        aggfunc=aggfunc)
256
257        # There are three "fire" in total
258        # The all level in topology looks like
259        # [[0, 1]]
260        result = aggregator.aggregate(level="all")
261        self.assertEqual(result, [3.0])
262
263        # There are two "fire" on the first node group and a
264        # a single "fire" on the second node group at the cluster
265        # level which looks like
266        # [[0], [1]]
267        result = aggregator.aggregate(level="cluster")
268        self.assertEqual(result, [2.0, 1.0])
269
270    def test_vector_aggfunc_single_trigger(self):
271        """TestAggregator: 1 trigger vector aggfunc"""
272
273        def aggfunc(series):
274            return series.cumsum()
275
276        filters = {
277            "result": "fire"
278        }
279
280        event_class = self._trace.aim_and_fire
281        value = 1
282        pivot = "identifier"
283
284        trigger = Trigger(self._trace, event_class, filters, value, pivot)
285
286        aggregator = MultiTriggerAggregator([trigger],
287                        self.topology,
288                        aggfunc=aggfunc)
289
290        # There are three "fire" in total
291        # The all level in topology looks like
292        # [[0, 1]]
293        result = aggregator.aggregate(level="all")
294        expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0],
295                                      index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
296                          )
297        assert_series_equal(result[0], expected_result)
298
299    def test_vector_aggfunc_multiple_trigger(self):
300        """TestAggregator: multi trigger vector aggfunc"""
301
302        def aggfunc(series):
303            return series.cumsum()
304
305        filters = {
306            "result": "fire"
307        }
308
309        event_class = self._trace.aim_and_fire
310        value = 1
311        pivot = "identifier"
312
313        trigger_fire = Trigger(self._trace,
314                          event_class,
315                          filters,
316                          value,
317                          pivot)
318
319        filters = {
320            "result": "blank"
321        }
322        value = -1
323        trigger_blank = Trigger(self._trace, event_class, filters, value,
324                                pivot)
325
326        aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
327                        self.topology,
328                        aggfunc=aggfunc)
329
330        # There are three "fire" in total
331        # The all level in topology looks like
332        # [[0, 1]]
333        result = aggregator.aggregate(level="all")
334        expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0],
335                                      index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
336                          )
337        assert_series_equal(result[0], expected_result)
338
339    def test_default_aggfunc_multiple_trigger(self):
340        """MultiTriggerAggregator with the default aggfunc"""
341
342        trigger_fire = Trigger(self._trace, self._trace.aim_and_fire,
343                          filters={"result": "fire"},
344                          pivot="identifier", value=1)
345
346        trigger_blank = Trigger(self._trace, self._trace.aim_and_fire,
347                          filters={"result": "blank"},
348                          pivot="identifier", value=2)
349
350        aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
351                                            self.topology)
352
353        results = aggregator.aggregate(level="cpu")
354        expected_results = [
355            pd.Series([1., 2., 1., 0., 0., 0.],
356                      index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
357            pd.Series([0., 0., 0., 2., 1., 2.],
358                      index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
359            ]
360
361        self.assertEquals(len(results), len(expected_results))
362        for result, expected in zip(results, expected_results):
363            assert_series_equal(result, expected)
364