1# Copyright 2015-2017 ARM Limited 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16"""The module responsible for correlation 17and related functionality 18""" 19from trappy.stats import StatConf 20from trappy.stats.Indexer import get_unified_indexer 21import numpy as np 22import math 23 24 25class Correlator(object): 26 """Class that allows to align and correlate two traces 27 :param first: First Aggregator 28 :type first: :mod:`trappy.stats.Aggregator` 29 30 :param second: Second Aggregator 31 :type second: :mod:`trappy.stats.Aggregator` 32 """ 33 34 def __init__(self, first, second, **kwargs): 35 36 self._first_agg = first 37 self._second_agg = second 38 self.indexer = get_unified_indexer([first.indexer, second.indexer]) 39 self._corrfunc = kwargs.pop("corrfunc", None) 40 self._agg_kwargs = kwargs 41 self.corr_graphs = {} 42 self._shift = self._align_top_level() 43 44 def _resample(self, series, delta=StatConf.DELTA_DEFAULT): 45 """Internal method to resample the series 46 to a uniformly spaced index 47 48 :param series: Series io be resampled 49 :type series: :mod:`pandas.Series` 50 51 :param delta: spacing between indices 52 :type delta: float 53 54 :return: resampled :mod:`pandas.Series` 55 """ 56 57 new_index = self.indexer.get_uniform(delta) 58 return series.reindex(index=new_index, method="pad") 59 60 def correlate(self, level, resample=True): 61 """This function returns the correlation between two traces 62 63 :param level: The level at which the correlation is 64 required 65 :type level: str 66 67 :param resample: Resample data 68 :type resample: bool 69 70 :return: A normalized correlation value is returned 71 for each group in the level 72 """ 73 result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs) 74 result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs) 75 76 77 corr_output = [] 78 weights = [] 79 80 for group_id, result_group in enumerate(result_1): 81 series_x = result_group 82 series_y = result_2[group_id] 83 84 if resample: 85 series_x = self._resample(series_x) 86 series_y = self._resample(series_y) 87 88 series_x, series_y = shift_series(series_x, series_y, self._shift) 89 corr_output.append(self._correlate(series_x, series_y)) 90 weights.append(len(series_x[series_x != 0]) + len(series_y[series_y != 0])) 91 92 total = 0 93 for weight, corr in zip(weights, corr_output): 94 if math.isnan(corr): 95 continue 96 total += (weight * corr) / sum(weights) 97 98 return corr_output, total 99 100 101 def plot(self, level, per_line=3): 102 """Temporary function to plot data. Expected to be 103 implemented in plotter 104 105 :param level: Topological Level (level in :mod:`trappy.stats.Topology`) 106 :type level: str 107 108 :param per_line: Number of plots per line 109 :type per_line: int 110 """ 111 from trappy.plotter.PlotLayout import PlotLayout 112 113 num_plots = self._first_agg.topology.level_span(level) 114 result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs) 115 result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs) 116 layout = PlotLayout(per_line, num_plots) 117 118 plot_index = 0 119 120 for group_id, result_group in enumerate(result_1): 121 s_x = result_group 122 s_y = result_2[group_id] 123 124 s_x = self._resample(s_x) 125 s_y = self._resample(s_y) 126 127 s_x, s_y = shift_series(s_x, s_y, self._shift) 128 129 ymax = 1.25 + max(max(s_x.values), max(s_y.values)) + 1 130 ymin = min(min(s_x.values), min(s_y.values)) - 1 131 ylim = [ymin, ymax] 132 ylim = [-1, 3] 133 134 axis = layout.get_axis(plot_index) 135 136 axis.plot(s_x.index, s_x.values) 137 axis.plot(s_y.index, s_y.values) 138 139 axis.set_ylim(ylim) 140 plot_index += 1 141 layout.finish(plot_index) 142 143 def _correlate(self, s_x, s_y): 144 145 if self._corrfunc != None: 146 f = self._corrfunc 147 return f(s_x, s_y) 148 else: 149 return s_x.corr(s_y) 150 151 def _align_top_level(self): 152 """Temporary function to plot data. Expected to be 153 implemented in plotter 154 """ 155 156 result_1 = self._first_agg.aggregate(level="all") 157 result_2 = self._second_agg.aggregate(level="all") 158 159 s_x = self._resample(result_1[0]) 160 s_y = self._resample(result_2[0]) 161 162 163 front_x, front_y, front_shift = align(s_x, s_y, mode="front") 164 front_corr = self._correlate(front_x, front_y) 165 166 back_x, back_y, back_shift = align(s_x, s_y, mode="back") 167 back_corr = self._correlate(back_x, back_y) 168 169 if math.isnan(back_corr): 170 back_corr = 0 171 if math.isnan(front_corr): 172 front_corr = 0 173 174 if front_corr >= back_corr: 175 return front_shift 176 else: 177 return back_shift 178 179 180 181def align(s_x, s_y, mode="front"): 182 """Function to align the input series 183 184 :param s_x: First Series 185 :type s_x: :mod:`pandas.Series` 186 187 :param s_y: Second Series 188 :type s_y: :mod:`pandas.Series` 189 190 :param mode: Align Front/Back 191 :type mode: str 192 """ 193 194 p_x = np.flatnonzero(s_x) 195 p_y = np.flatnonzero(s_y) 196 197 if not len(p_x) or not len(p_y): 198 return s_x, s_y, 0 199 200 if mode == "front": 201 p_x = p_x[0] 202 p_y = p_y[0] 203 204 if mode == "back": 205 p_x = p_x[-1] 206 p_y = p_y[-1] 207 208 shift = p_x - p_y 209 210 s_x, s_y = shift_series(s_x, s_y, shift) 211 return s_x, s_y, shift 212 213def shift_series(s_x, s_y, shift): 214 """Shift series to align 215 :param s_x: First Series 216 :type s_x: :mod:`pandas.Series` 217 218 :param s_y: Second Series 219 :type s_y: :mod:`pandas.Series` 220 221 :param shift: The number of index 222 positions to be shifted 223 :type shift: int 224 """ 225 226 if shift > 0: 227 s_y = s_y.shift(shift) 228 else: 229 s_x = s_x.shift(-1 * shift) 230 231 return s_x, s_y 232