# Copyright 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import unittest from dashboard import find_change_points from dashboard import math_utils class FindChangePointsTest(unittest.TestCase): def testFindChangePoints(self): # Simple test that the output is as expected for a clear change. # Tests for specific aspects of the algorithm are below. data = [1, 1, 2, 1, 1, 8, 8, 8, 9, 8, 9] series = list(enumerate(data)) expected = [ find_change_points.ChangePoint( x_value=5, median_before=1, median_after=8, window_start=1, window_end=10, size_before=4, size_after=6, relative_change=7, std_dev_before=0.4330127018922193, t_statistic=-24.452628375754593, degrees_of_freedom=6.9938793160801023, p_value=0.001) ] actual = find_change_points.FindChangePoints( series, max_window_size=10, multiple_of_std_dev=3, min_relative_change=0.5, min_absolute_change=1, min_steppiness=0.4, min_segment_size=3) self.assertEqual(expected, actual) def testFindChangePoints_EmptySeries(self): # For an empty series, there are certainly no change points. self.assertEqual([], find_change_points.FindChangePoints([])) def testFindSplit(self): # Find split returns the first index of the segment after the split. self.assertEqual(3, find_change_points._FindSplit([3, 2, 3, 5, 6, 5])) # If there are several splits that are equally interesting, or no # interesting splits, then the first one is returned. self.assertEqual(2, find_change_points._FindSplit([2, 2, 4, 4, 6, 6])) self.assertEqual(1, find_change_points._FindSplit([4, 4, 4, 4, 4, 4])) def _PassesThresholds( self, data, index, multiple_of_std_dev=0, min_relative_change=0, min_absolute_change=0, min_segment_size=0, min_steppiness=0): """Tests whether a split passes the thresholds with the given parameters.""" # All of the threshold parameters are set to zero by default here, so that # we can test just one threshold at a time. return find_change_points._PassesThresholds( data, index, min_segment_size=min_segment_size, min_absolute_change=min_absolute_change, min_relative_change=min_relative_change, min_steppiness=min_steppiness, multiple_of_std_dev=multiple_of_std_dev) def testIsAnomalous_FiltersOnSegmentSize(self): sample = [1, 1, 5, 5, 5, 9, 9] self.assertFalse(self._PassesThresholds(sample, 2, min_segment_size=3)) self.assertFalse(self._PassesThresholds(sample, 5, min_segment_size=3)) self.assertTrue(self._PassesThresholds(sample, 2, min_segment_size=2)) self.assertTrue(self._PassesThresholds(sample, 5, min_segment_size=2)) def testIsAnomalous_FiltersOnAbsoluteChange(self): sample = [2, 2, 2, 4, 4, 4, 4, 8, 8, 8] self.assertTrue(self._PassesThresholds(sample, 7)) self.assertTrue(self._PassesThresholds(sample, 7, min_absolute_change=4)) self.assertFalse(self._PassesThresholds(sample, 3, min_absolute_change=4)) def testIsAnomalous_FiltersOnRelativeChange(self): sample = [2, 2, 2, 6, 6, 6, 6, 10, 10, 10] self.assertTrue(self._PassesThresholds(sample, 3)) self.assertTrue(self._PassesThresholds(sample, 3, min_relative_change=1)) self.assertFalse(self._PassesThresholds(sample, 7, min_relative_change=1)) def testIsAnomalous_FiltersOnMultipleOfStdDev(self): # In the first sample, the standard deviation on either side of index 5 # is between 2 and 4. In the second sample, the standard deviation within # each of the two segments is 0. noisy = [2, 8, 5, 1, 7, 13, 19, 10, 15, 17] steady = [5, 5, 5, 5, 5, 15, 15, 15, 15, 15] self.assertTrue(self._PassesThresholds(steady, 5, multiple_of_std_dev=1)) self.assertTrue(self._PassesThresholds(steady, 5, multiple_of_std_dev=8)) self.assertTrue(self._PassesThresholds(noisy, 5, multiple_of_std_dev=1)) self.assertFalse(self._PassesThresholds(noisy, 5, multiple_of_std_dev=8)) def testIsAnomalous_MultipleOfStdDevFilter_LowOnOneSide(self): # The standard deviation to use is the lower of the standard deviations # of the two sides, so if either side doesn't have a high standard # deviation then an alert can be fired. left = [3, 2, 3, 4, 3, 5, 30, 1, 50, 10] right = [5, 30, 1, 50, 10, 3, 2, 3, 4, 3] # Note that stddev([3, 2, 3, 4, 3]) is less than 1, and the difference # between the medians of the two sides is 7. self.assertTrue(self._PassesThresholds(left, 5, multiple_of_std_dev=7)) self.assertTrue(self._PassesThresholds(right, 5, multiple_of_std_dev=7)) def testZeroMedian_ReturnsValuesWithMedianEqualToZero(self): # The _ZeroMedian function adjusts values so that the median is zero. self.assertEqual([0, 0, 1], find_change_points._ZeroMedian([1, 1, 2])) self.assertEqual([-0.5, 0.5], find_change_points._ZeroMedian([45, 46])) def testZeroMedian_ResultProperties(self): nums = [3.4, 8, 100.2, 78, 3, -4, 12, 3.14, 1024] zeroed_nums = find_change_points._ZeroMedian(nums) # The output of _ZeroMedian has the same standard deviation as the input. self.assertEqual( math_utils.StandardDeviation(nums), math_utils.StandardDeviation(zeroed_nums)) # Also, the median of the output is always zero. self.assertEqual(0, math_utils.Median(zeroed_nums)) def _AssertFindsChangePoints( self, y_values, expected_indexes, max_window_size=50, min_segment_size=6, min_absolute_change=0, min_relative_change=0.01, min_steppiness=0.4, multiple_of_std_dev=2.5): """Asserts that change points are found at particular indexes.""" series = list(enumerate(y_values)) results = find_change_points.FindChangePoints( series, max_window_size=max_window_size, min_segment_size=min_segment_size, min_absolute_change=min_absolute_change, min_relative_change=min_relative_change, min_steppiness=min_steppiness, multiple_of_std_dev=multiple_of_std_dev) actual_indexes = [a.x_value for a in results] self.assertEqual(expected_indexes, actual_indexes) def testFindChangePoints_ShortSequences(self): self._AssertFindsChangePoints( [1, 1, 1, 5, 5, 5, 5, 9, 9, 9], [3], max_window_size=10, min_segment_size=3) self._AssertFindsChangePoints( [1, 1, 5, 5, 5, 5, 9, 9, 9, 9], [6], max_window_size=10, min_segment_size=3) self._AssertFindsChangePoints( [1, 1, 1, 1, 5, 5, 5, 5, 9, 9, 9], [8], max_window_size=11, min_segment_size=3) self._AssertFindsChangePoints( [1, 1, 1, 5, 5, 5, 5, 9, 9, 9, 9], [3], max_window_size=11, min_segment_size=3) self._AssertFindsChangePoints( [1, 1, 5, 5, 5, 5, 9, 9, 9, 9, 9], [6], max_window_size=11, min_segment_size=3) def testChangePoint_CanBeMadeAndConvertedToDict(self): series = list(enumerate([4, 4, 4, 8, 8, 8, 8])) change_point = find_change_points.MakeChangePoint(series, 3) self.assertEqual( find_change_points.ChangePoint( x_value=3, median_before=4.0, median_after=8.0, size_before=3, size_after=4, window_start=0, window_end=6, relative_change=1.0, std_dev_before=0.0, t_statistic=float('inf'), degrees_of_freedom=1.0, p_value=0.001), change_point) self.assertEqual( { 'x_value': 3, 'median_before': 4.0, 'median_after': 8.0, 'size_before': 3, 'size_after': 4, 'window_start': 0, 'window_end': 6, 'relative_change': 1.0, 'std_dev_before': 0.0, 't_statistic': float('inf'), 'degrees_of_freedom': 1.0, 'p_value': 0.001, }, change_point.AsDict()) if __name__ == '__main__': unittest.main()