1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Example of using an exogenous feature to ignore a known anomaly.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import csv 22from os import path 23 24import numpy as np 25import tensorflow as tf 26 27 28try: 29 import matplotlib # pylint: disable=g-import-not-at-top 30 matplotlib.use("TkAgg") # Need Tk for interactive plots. 31 from matplotlib import pyplot # pylint: disable=g-import-not-at-top 32 HAS_MATPLOTLIB = True 33except ImportError: 34 # Plotting requires matplotlib, but the unit test running this code may 35 # execute in an environment without it (i.e. matplotlib is not a build 36 # dependency). We'd still like to test the TensorFlow-dependent parts of this 37 # example, namely train_and_predict. 38 HAS_MATPLOTLIB = False 39 40_MODULE_PATH = path.dirname(__file__) 41_DATA_FILE = path.join(_MODULE_PATH, "data/changepoints.csv") 42 43 44def state_space_estimator(exogenous_feature_columns): 45 """Constructs a StructuralEnsembleRegressor.""" 46 47 def _exogenous_update_condition(times, features): 48 del times # unused 49 # Make exogenous updates sparse by setting an update condition. This in 50 # effect allows missing exogenous features: if the condition evaluates to 51 # False, no update is performed. Otherwise we sometimes end up with "leaky" 52 # updates which add unnecessary uncertainty to the model even when there is 53 # no changepoint. 54 return tf.equal(tf.squeeze(features["is_changepoint"], axis=-1), "yes") 55 56 return ( 57 tf.contrib.timeseries.StructuralEnsembleRegressor( 58 periodicities=12, 59 # Extract a smooth period by constraining the number of latent values 60 # being cycled between. 61 cycle_num_latent_values=3, 62 num_features=1, 63 exogenous_feature_columns=exogenous_feature_columns, 64 exogenous_update_condition=_exogenous_update_condition), 65 # Use truncated backpropagation with a window size of 64, batching 66 # together 4 of these windows (random offsets) per training step. Training 67 # with exogenous features often requires somewhat larger windows. 68 4, 64) 69 70 71def autoregressive_estimator(exogenous_feature_columns): 72 input_window_size = 8 73 output_window_size = 2 74 return ( 75 tf.contrib.timeseries.ARRegressor( 76 periodicities=12, 77 num_features=1, 78 input_window_size=input_window_size, 79 output_window_size=output_window_size, 80 exogenous_feature_columns=exogenous_feature_columns), 81 64, input_window_size + output_window_size) 82 83 84def train_and_evaluate_exogenous( 85 estimator_fn, csv_file_name=_DATA_FILE, train_steps=300): 86 """Training, evaluating, and predicting on a series with changepoints.""" 87 # Indicate the format of our exogenous feature, in this case a string 88 # representing a boolean value. 89 string_feature = tf.feature_column.categorical_column_with_vocabulary_list( 90 key="is_changepoint", vocabulary_list=["no", "yes"]) 91 # Specify the way this feature is presented to the model, here using a one-hot 92 # encoding. 93 one_hot_feature = tf.feature_column.indicator_column( 94 categorical_column=string_feature) 95 96 estimator, batch_size, window_size = estimator_fn( 97 exogenous_feature_columns=[one_hot_feature]) 98 reader = tf.contrib.timeseries.CSVReader( 99 csv_file_name, 100 # Indicate the format of our CSV file. First we have two standard columns, 101 # one for times and one for values. The third column is a custom exogenous 102 # feature indicating whether each timestep is a changepoint. The 103 # changepoint feature name must match the string_feature column name 104 # above. 105 column_names=(tf.contrib.timeseries.TrainEvalFeatures.TIMES, 106 tf.contrib.timeseries.TrainEvalFeatures.VALUES, 107 "is_changepoint"), 108 # Indicate dtypes for our features. 109 column_dtypes=(tf.int64, tf.float32, tf.string), 110 # This CSV has a header line; here we just ignore it. 111 skip_header_lines=1) 112 train_input_fn = tf.contrib.timeseries.RandomWindowInputFn( 113 reader, batch_size=batch_size, window_size=window_size) 114 estimator.train(input_fn=train_input_fn, steps=train_steps) 115 evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader) 116 evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1) 117 # Create an input_fn for prediction, with a simulated changepoint. Since all 118 # of the anomalies in the training data are explained by the exogenous 119 # feature, we should get relatively confident predictions before the indicated 120 # changepoint (since we are telling the model that no changepoint exists at 121 # those times) and relatively uncertain predictions after. 122 (predictions,) = tuple(estimator.predict( 123 input_fn=tf.contrib.timeseries.predict_continuation_input_fn( 124 evaluation, steps=100, 125 exogenous_features={ 126 "is_changepoint": [["no"] * 49 + ["yes"] + ["no"] * 50]}))) 127 times = evaluation["times"][0] 128 observed = evaluation["observed"][0, :, 0] 129 mean = np.squeeze(np.concatenate( 130 [evaluation["mean"][0], predictions["mean"]], axis=0)) 131 variance = np.squeeze(np.concatenate( 132 [evaluation["covariance"][0], predictions["covariance"]], axis=0)) 133 all_times = np.concatenate([times, predictions["times"]], axis=0) 134 upper_limit = mean + np.sqrt(variance) 135 lower_limit = mean - np.sqrt(variance) 136 # Indicate the locations of the changepoints for plotting vertical lines. 137 anomaly_locations = [] 138 with open(csv_file_name, "r") as csv_file: 139 csv_reader = csv.DictReader(csv_file) 140 for row in csv_reader: 141 if row["is_changepoint"] == "yes": 142 anomaly_locations.append(int(row["time"])) 143 anomaly_locations.append(predictions["times"][49]) 144 return (times, observed, all_times, mean, upper_limit, lower_limit, 145 anomaly_locations) 146 147 148def make_plot(name, training_times, observed, all_times, mean, 149 upper_limit, lower_limit, anomaly_locations): 150 """Plot the time series and anomalies in a new figure.""" 151 pyplot.figure() 152 pyplot.plot(training_times, observed, "b", label="training series") 153 pyplot.plot(all_times, mean, "r", label="forecast") 154 pyplot.axvline(anomaly_locations[0], linestyle="dotted", label="changepoints") 155 for anomaly_location in anomaly_locations[1:]: 156 pyplot.axvline(anomaly_location, linestyle="dotted") 157 pyplot.fill_between(all_times, lower_limit, upper_limit, color="grey", 158 alpha="0.2") 159 pyplot.axvline(training_times[-1], color="k", linestyle="--") 160 pyplot.xlabel("time") 161 pyplot.ylabel("observations") 162 pyplot.legend(loc=0) 163 pyplot.title(name) 164 165 166def main(unused_argv): 167 if not HAS_MATPLOTLIB: 168 raise ImportError( 169 "Please install matplotlib to generate a plot from this example.") 170 make_plot("Ignoring a known anomaly (state space)", 171 *train_and_evaluate_exogenous( 172 estimator_fn=state_space_estimator)) 173 make_plot("Ignoring a known anomaly (autoregressive)", 174 *train_and_evaluate_exogenous( 175 estimator_fn=autoregressive_estimator, train_steps=3000)) 176 pyplot.show() 177 178 179if __name__ == "__main__": 180 tf.app.run(main=main) 181