1# Copyright 2015-2017 ARM Limited 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16import unittest 17from trappy.stats.Topology import Topology 18from trappy.stats.Trigger import Trigger 19from trappy.stats.Aggregator import MultiTriggerAggregator 20 21import collections 22import trappy 23from trappy.base import Base 24import pandas as pd 25from pandas.util.testing import assert_series_equal 26 27 28class TestTopology(unittest.TestCase): 29 30 def test_add_to_level(self): 31 """Test level creation""" 32 33 level_groups = [[1, 2], [0, 3, 4, 5]] 34 level = "test_level" 35 topology = Topology() 36 topology.add_to_level(level, level_groups) 37 check_groups = topology.get_level(level) 38 39 self.assertTrue(topology.has_level(level)) 40 self.assertEqual(level_groups, check_groups) 41 42 def test_flatten(self): 43 """Test Topology: flatten""" 44 45 level_groups = [[1, 2], [0, 3, 4, 5]] 46 level = "test_level" 47 topology = Topology() 48 topology.add_to_level(level, level_groups) 49 flattened = [0, 1, 2, 3, 4, 5] 50 51 self.assertEqual(flattened, topology.flatten()) 52 53 def test_cpu_topology_construction(self): 54 """Test CPU Topology Construction""" 55 56 cluster_0 = [0, 3, 4, 5] 57 cluster_1 = [1, 2] 58 clusters = [cluster_0, cluster_1] 59 topology = Topology(clusters=clusters) 60 61 # Check cluster level creation 62 cluster_groups = [[0, 3, 4, 5], [1, 2]] 63 self.assertTrue(topology.has_level("cluster")) 64 self.assertEqual(cluster_groups, topology.get_level("cluster")) 65 66 # Check cpu level creation 67 cpu_groups = [[0], [1], [2], [3], [4], [5]] 68 self.assertTrue(topology.has_level("cpu")) 69 self.assertEqual(cpu_groups, topology.get_level("cpu")) 70 71 # Check "all" level 72 all_groups = [[0, 1, 2, 3, 4, 5]] 73 self.assertEqual(all_groups, topology.get_level("all")) 74 75 def test_level_span(self): 76 """TestTopology: level_span""" 77 78 level_groups = [[1, 2], [0, 3, 4, 5]] 79 level = "test_level" 80 topology = Topology() 81 topology.add_to_level(level, level_groups) 82 83 self.assertEqual(topology.level_span(level), 2) 84 85 def test_group_index(self): 86 """TestTopology: get_index""" 87 88 level_groups = [[1, 2], [0, 3, 4, 5]] 89 level = "test_level" 90 topology = Topology() 91 topology.add_to_level(level, level_groups) 92 93 self.assertEqual(topology.get_index(level, [1, 2]), 0) 94 self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1) 95 96class BaseTestStats(unittest.TestCase): 97 def setUp(self): 98 trace = trappy.BareTrace() 99 data = { 100 101 "identifier": [ 102 0, 103 0, 104 0, 105 1, 106 1, 107 1, 108 ], 109 "result": [ 110 "fire", 111 "blank", 112 "fire", 113 "blank", 114 "fire", 115 "blank", 116 ], 117 } 118 119 index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time") 120 data_frame = pd.DataFrame(data, index=index) 121 trace.add_parsed_event("aim_and_fire", data_frame) 122 self._trace = trace 123 self.topology = Topology(clusters=[[0], [1]]) 124 125 126class TestTrigger(BaseTestStats): 127 128 def test_trigger_generation(self): 129 """TestTrigger: generate""" 130 131 filters = { 132 "result": "fire" 133 } 134 135 event_class = self._trace.aim_and_fire 136 value = 1 137 pivot = "identifier" 138 139 trigger = Trigger(self._trace, 140 event_class, 141 filters, 142 value, 143 pivot) 144 145 expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time")) 146 assert_series_equal(expected, trigger.generate(0)) 147 148 expected = pd.Series([1], index=pd.Index([0.5], name="Time")) 149 assert_series_equal(expected, trigger.generate(1)) 150 151 def test_trigger_with_func(self): 152 """Trigger works with a function or lambda as filter""" 153 154 def my_filter(val): 155 return val.startswith("fi") 156 157 trigger = Trigger(self._trace, self._trace.aim_and_fire, 158 filters={"result": my_filter}, value=1, 159 pivot="identifier") 160 161 expected = pd.Series([1], index=pd.Index([0.5], name="Time")) 162 assert_series_equal(expected, trigger.generate(1)) 163 164 my_filters = {"result": lambda x: x.startswith("bl")} 165 trigger = Trigger(self._trace, self._trace.aim_and_fire, 166 filters=my_filters, value=1, pivot="identifier") 167 168 expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time")) 169 assert_series_equal(expected, trigger.generate(1)) 170 171 def test_trigger_with_callable_class(self): 172 """Trigger works with a callable class as filter""" 173 174 class my_filter(object): 175 def __init__(self, val_out): 176 self.prev_val = 0 177 self.val_out = val_out 178 179 def __call__(self, val): 180 ret = self.prev_val == self.val_out 181 self.prev_val = val 182 183 return ret 184 185 trigger = Trigger(self._trace, self._trace.aim_and_fire, 186 filters={"identifier": my_filter(1)}, value=1, 187 pivot="result") 188 189 expected = pd.Series([1], index=pd.Index([0.6], name="Time")) 190 assert_series_equal(expected, trigger.generate("blank")) 191 192 def test_filter_prev_values(self): 193 """Trigger works with a filter that depends on previous values of the same pivot""" 194 195 # We generate an example in which we want a trigger whenever the 196 # identifier is no longer 1 for blank 197 198 class my_filter(object): 199 def __init__(self, val_out): 200 self.prev_val = 0 201 self.val_out = val_out 202 203 def __call__(self, val): 204 ret = self.prev_val == self.val_out 205 self.prev_val = val 206 207 return ret 208 209 trace = trappy.BareTrace() 210 data = collections.OrderedDict([ 211 (0.1, ["blank", 1]), 212 (0.2, ["fire", 1]), 213 (0.3, ["blank", 0]), # value is no longer 1, trigger 214 (0.4, ["blank", 1]), 215 (0.5, ["fire", 0]), # This should NOT trigger 216 (0.6, ["blank", 0]), # value is no longer 1 for blank, trigger 217 ]) 218 data_frame = pd.DataFrame.from_dict(data, orient="index", ) 219 data_frame.columns = ["result", "identifier"] 220 trace.add_parsed_event("aim_and_fire", data_frame) 221 222 trigger = Trigger(trace, trace.aim_and_fire, 223 filters={"identifier": my_filter(1)}, value=-1, 224 pivot="result") 225 226 expected = pd.Series([-1, -1], index=[0.3, 0.6]) 227 assert_series_equal(expected, trigger.generate("blank")) 228 229 230 231class TestAggregator(BaseTestStats): 232 233 def test_scalar_aggfunc_single_trigger(self): 234 """TestAggregator: 1 trigger scalar aggfunc""" 235 236 def aggfunc(series): 237 return series.sum() 238 239 filters = { 240 "result": "fire" 241 } 242 243 event_class = self._trace.aim_and_fire 244 value = 1 245 pivot = "identifier" 246 247 trigger = Trigger(self._trace, 248 event_class, 249 filters, 250 value, 251 pivot) 252 253 aggregator = MultiTriggerAggregator([trigger], 254 self.topology, 255 aggfunc=aggfunc) 256 257 # There are three "fire" in total 258 # The all level in topology looks like 259 # [[0, 1]] 260 result = aggregator.aggregate(level="all") 261 self.assertEqual(result, [3.0]) 262 263 # There are two "fire" on the first node group and a 264 # a single "fire" on the second node group at the cluster 265 # level which looks like 266 # [[0], [1]] 267 result = aggregator.aggregate(level="cluster") 268 self.assertEqual(result, [2.0, 1.0]) 269 270 def test_vector_aggfunc_single_trigger(self): 271 """TestAggregator: 1 trigger vector aggfunc""" 272 273 def aggfunc(series): 274 return series.cumsum() 275 276 filters = { 277 "result": "fire" 278 } 279 280 event_class = self._trace.aim_and_fire 281 value = 1 282 pivot = "identifier" 283 284 trigger = Trigger(self._trace, event_class, filters, value, pivot) 285 286 aggregator = MultiTriggerAggregator([trigger], 287 self.topology, 288 aggfunc=aggfunc) 289 290 # There are three "fire" in total 291 # The all level in topology looks like 292 # [[0, 1]] 293 result = aggregator.aggregate(level="all") 294 expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0], 295 index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) 296 ) 297 assert_series_equal(result[0], expected_result) 298 299 def test_vector_aggfunc_multiple_trigger(self): 300 """TestAggregator: multi trigger vector aggfunc""" 301 302 def aggfunc(series): 303 return series.cumsum() 304 305 filters = { 306 "result": "fire" 307 } 308 309 event_class = self._trace.aim_and_fire 310 value = 1 311 pivot = "identifier" 312 313 trigger_fire = Trigger(self._trace, 314 event_class, 315 filters, 316 value, 317 pivot) 318 319 filters = { 320 "result": "blank" 321 } 322 value = -1 323 trigger_blank = Trigger(self._trace, event_class, filters, value, 324 pivot) 325 326 aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], 327 self.topology, 328 aggfunc=aggfunc) 329 330 # There are three "fire" in total 331 # The all level in topology looks like 332 # [[0, 1]] 333 result = aggregator.aggregate(level="all") 334 expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0], 335 index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) 336 ) 337 assert_series_equal(result[0], expected_result) 338 339 def test_default_aggfunc_multiple_trigger(self): 340 """MultiTriggerAggregator with the default aggfunc""" 341 342 trigger_fire = Trigger(self._trace, self._trace.aim_and_fire, 343 filters={"result": "fire"}, 344 pivot="identifier", value=1) 345 346 trigger_blank = Trigger(self._trace, self._trace.aim_and_fire, 347 filters={"result": "blank"}, 348 pivot="identifier", value=2) 349 350 aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], 351 self.topology) 352 353 results = aggregator.aggregate(level="cpu") 354 expected_results = [ 355 pd.Series([1., 2., 1., 0., 0., 0.], 356 index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), 357 pd.Series([0., 0., 0., 2., 1., 2.], 358 index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), 359 ] 360 361 self.assertEquals(len(results), len(expected_results)) 362 for result, expected in zip(results, expected_results): 363 assert_series_equal(result, expected) 364