1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Filtering postprocessors for SequentialTimeSeriesModels.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import abc 22 23import six 24 25from tensorflow.contrib.timeseries.python.timeseries import math_utils 26 27from tensorflow.python.framework import dtypes 28from tensorflow.python.framework import ops 29from tensorflow.python.ops import array_ops 30from tensorflow.python.ops import check_ops 31from tensorflow.python.ops import math_ops 32from tensorflow.python.util import nest 33 34 35@six.add_metaclass(abc.ABCMeta) 36class FilteringStepPostprocessor(object): 37 """Base class for processors that are applied after each filter step.""" 38 39 @abc.abstractmethod 40 def process_filtering_step(self, current_times, current_values, 41 predicted_state, filtered_state, outputs): 42 """Extends/modifies a filtering step, altering state and loss. 43 44 Args: 45 current_times: A [batch size] integer Tensor of times. 46 current_values: A [batch size x num features] Tensor of values filtering 47 is being performed on. 48 predicted_state: A (possibly nested) list of Tensors indicating model 49 state which does not take `current_times` and `current_values` into 50 account. 51 filtered_state: Same structure as predicted_state, but updated to take 52 `current_times` and `current_values` into account. 53 outputs: A dictionary of outputs produced by model filtering 54 (SequentialTimeSeriesModel._process_filtering_step). 55 Returns: A tuple of (new_state, updated_outputs); 56 new_state: Updated state with the same structure as `filtered_state` and 57 `predicted_state`. 58 updated_outputs: The `outputs` dictionary, updated with any new outputs 59 from this filtering postprocessor. 60 """ 61 pass 62 63 @abc.abstractproperty 64 def output_names(self): 65 return [] 66 67 68def cauchy_alternative_to_gaussian(current_times, current_values, outputs): 69 """A Cauchy anomaly distribution, centered at a Gaussian prediction. 70 71 Performs an entropy-matching approximation of the scale parameters of 72 independent Cauchy distributions given the covariance matrix of a multivariate 73 Gaussian in outputs["covariance"], and centers the Cauchy distributions at 74 outputs["mean"]. This requires that the model that we are creating an 75 alternative/anomaly distribution for produces a mean and covariance. 76 77 Args: 78 current_times: A [batch size] Tensor of times, unused. 79 current_values: A [batch size x num features] Tensor of values to evaluate 80 the anomaly distribution at. 81 outputs: A dictionary of Tensors with keys "mean" and "covariance" 82 describing the Gaussian to construct an anomaly distribution from. The 83 value corresponding to "mean" has shape [batch size x num features], and 84 the value corresponding to "covariance" has shape [batch size x num 85 features x num features]. 86 Returns: 87 A [batch size] Tensor of log likelihoods; the anomaly log PDF evaluated at 88 `current_values`. 89 """ 90 del current_times # unused 91 cauchy_scale = math_utils.entropy_matched_cauchy_scale(outputs["covariance"]) 92 individual_log_pdfs = math_utils.cauchy_log_prob( 93 loc=outputs["mean"], 94 scale=cauchy_scale, 95 x=current_values) 96 return math_ops.reduce_sum(individual_log_pdfs, axis=1) 97 98 99def _interpolate_state_linear(first_state, second_state, first_responsibility): 100 """Interpolate between two model states linearly.""" 101 interpolated_state_flat = [] 102 for first_state_tensor, second_state_tensor in zip( 103 nest.flatten(first_state), nest.flatten(second_state)): 104 assert first_state_tensor.dtype == second_state_tensor.dtype 105 if first_state_tensor.dtype.is_floating: 106 # Pad the responsibility shape with ones up to the state's rank so that it 107 # broadcasts 108 first_responsibility_padded = array_ops.reshape( 109 tensor=first_responsibility, 110 shape=array_ops.concat([ 111 array_ops.shape(first_responsibility), array_ops.ones( 112 [array_ops.rank(first_state_tensor) - 1], dtype=dtypes.int32) 113 ], 0)) 114 interpolated_state = ( 115 first_responsibility_padded * first_state_tensor 116 + (1. - first_responsibility_padded) * second_state_tensor) 117 interpolated_state.set_shape(first_state_tensor.get_shape()) 118 interpolated_state_flat.append(interpolated_state) 119 else: 120 # Integer dtypes are probably representing times, and don't need 121 # interpolation. Make sure they're identical to be sure. 122 with ops.control_dependencies( 123 [check_ops.assert_equal(first_state_tensor, second_state_tensor)]): 124 interpolated_state_flat.append(array_ops.identity(first_state_tensor)) 125 return nest.pack_sequence_as(first_state, interpolated_state_flat) 126 127 128class StateInterpolatingAnomalyDetector(FilteringStepPostprocessor): 129 """An anomaly detector which guards model state against outliers. 130 131 Smoothly interpolates between a model's predicted and inferred states, based 132 on the posterior probability of an anomaly, p(anomaly | data). This is useful 133 if anomalies would otherwise lead to model state which is hard to recover 134 from (Gaussian state space models suffer from this, for example). 135 136 Relies on (1) an alternative distribution, typically with heavier tails than 137 the model's normal predictions, and (2) a prior probability of an anomaly. The 138 prior probability acts as a penalty, discouraging the system from marking too 139 many points as anomalies. The alternative distribution indicates the 140 probability of a datapoint given that it is an anomaly, and is a heavy-tailed 141 distribution (Cauchy) centered around the model's predictions by default. 142 143 Specifically, we have: 144 145 p(anomaly | data) = p(data | anomaly) * anomaly_prior_probability 146 / (p(data | not anomaly) * (1 - anomaly_prior_probability) 147 + p(data | anomaly) * anomaly_prior_probability) 148 149 This is simply Bayes' theorem, where p(data | anomaly) is the 150 alternative/anomaly distribution, p(data | not anomaly) is the model's 151 predicted distribution, and anomaly_prior_probability is the prior probability 152 of an anomaly occurring (user-specified, defaulting to 1%). 153 154 Rather than computing p(anomaly | data) directly, we use the odds ratio: 155 156 odds_ratio = p(data | anomaly) * anomaly_prior_probability 157 / (p(data | not anomaly) * (1 - anomaly_prior_probability)) 158 159 This has the same information as p(anomaly | data): 160 161 odds_ratio = p(anomaly | data) / p(not anomaly | data) 162 163 A "responsibility" score is computed for the model based on the log odds 164 ratio, and state interpolated based on this responsibility: 165 166 model_responsibility = 1 / (1 + exp(-responsibility_scaling 167 * ln(odds_ratio))) 168 model_state = filtered_model_state * model_responsibility 169 + predicted_model_state * (1 - model_responsibility) 170 loss = model_responsibility 171 * ln(p(data | not anomaly) * (1 - anomaly_prior_probability)) 172 + (1 - model_responsibility) 173 * ln(p(data | anomaly) * anomaly_prior_probability) 174 175 """ 176 177 output_names = ["anomaly_score"] 178 179 def __init__(self, 180 anomaly_log_likelihood=cauchy_alternative_to_gaussian, 181 anomaly_prior_probability=0.01, 182 responsibility_scaling=1.0): 183 """Configure the anomaly detector. 184 185 Args: 186 anomaly_log_likelihood: A function taking `current_times`, 187 `current_values`, and `outputs` (same as the corresponding arguments 188 to process_filtering_step) and returning a [batch size] Tensor of log 189 likelihoods under an anomaly distribution. 190 anomaly_prior_probability: A scalar value, between 0 and 1, indicating the 191 prior probability of a particular example being an anomaly. 192 responsibility_scaling: A positive scalar controlling how fast 193 interpolation transitions between not-anomaly and anomaly; lower 194 values (closer to 0) create a smoother/slower transition. 195 """ 196 self._anomaly_log_likelihood = anomaly_log_likelihood 197 self._responsibility_scaling = responsibility_scaling 198 self._anomaly_prior_probability = anomaly_prior_probability 199 200 def process_filtering_step(self, current_times, current_values, 201 predicted_state, filtered_state, outputs): 202 """Fall back on `predicted_state` for anomalies. 203 204 Args: 205 current_times: A [batch size] integer Tensor of times. 206 current_values: A [batch size x num features] Tensor of values filtering 207 is being performed on. 208 predicted_state: A (possibly nested) list of Tensors indicating model 209 state which does not take `current_times` and `current_values` into 210 account. 211 filtered_state: Same structure as predicted_state, but updated to take 212 `current_times` and `current_values` into account. 213 outputs: A dictionary of outputs produced by model filtering. Must 214 include `log_likelihood`, a [batch size] Tensor indicating the log 215 likelihood of the observations under the model's predictions. 216 Returns: 217 A tuple of (new_state, updated_outputs); 218 new_state: Updated state with the same structure as `filtered_state` and 219 `predicted_state`; predicted_state for anomalies and filtered_state 220 otherwise (per batch element). 221 updated_outputs: The `outputs` dictionary, updated with a new "loss" 222 (the interpolated negative log likelihoods under the model and 223 anomaly distributions) and "anomaly_score" (the log odds ratio of 224 each part of the batch being an anomaly). 225 """ 226 anomaly_log_likelihood = self._anomaly_log_likelihood( 227 current_times=current_times, 228 current_values=current_values, 229 outputs=outputs) 230 anomaly_prior_probability = ops.convert_to_tensor( 231 self._anomaly_prior_probability, dtype=current_values.dtype) 232 # p(data | anomaly) * p(anomaly) 233 data_and_anomaly_log_probability = ( 234 anomaly_log_likelihood + math_ops.log(anomaly_prior_probability)) 235 # p(data | no anomaly) * p(no anomaly) 236 data_and_no_anomaly_log_probability = ( 237 outputs["log_likelihood"] + math_ops.log(1. - anomaly_prior_probability) 238 ) 239 # A log odds ratio is slightly nicer here than computing p(anomaly | data), 240 # since it is centered around zero 241 anomaly_log_odds_ratio = ( 242 data_and_anomaly_log_probability 243 - data_and_no_anomaly_log_probability) 244 model_responsibility = math_ops.sigmoid(-self._responsibility_scaling * 245 anomaly_log_odds_ratio) 246 # Do a linear interpolation between predicted and inferred model state 247 # based on the model's "responsibility". If we knew for sure whether 248 # this was an anomaly or not (binary responsibility), this would be the 249 # correct thing to do, but given that we don't it's just a 250 # (differentiable) heuristic. 251 interpolated_state = _interpolate_state_linear( 252 first_state=filtered_state, 253 second_state=predicted_state, 254 first_responsibility=model_responsibility) 255 # TODO(allenl): Try different responsibility scalings and interpolation 256 # methods (e.g. average in probability space rather than log space). 257 interpolated_log_likelihood = ( 258 model_responsibility * data_and_no_anomaly_log_probability 259 + (1. - model_responsibility) * data_and_anomaly_log_probability) 260 outputs["loss"] = -interpolated_log_likelihood 261 outputs["anomaly_score"] = anomaly_log_odds_ratio 262 return (interpolated_state, outputs) 263