1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Auto-Regressive models for time series data.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21from tensorflow.contrib.rnn.python.ops import lstm_ops 22from tensorflow.contrib.timeseries.python.timeseries import math_utils 23from tensorflow.contrib.timeseries.python.timeseries import model 24from tensorflow.contrib.timeseries.python.timeseries import model_utils 25from tensorflow.contrib.timeseries.python.timeseries.feature_keys import PredictionFeatures 26from tensorflow.contrib.timeseries.python.timeseries.feature_keys import TrainEvalFeatures 27 28from tensorflow.python.estimator import estimator_lib 29from tensorflow.python.framework import constant_op 30from tensorflow.python.framework import dtypes 31from tensorflow.python.framework import ops 32from tensorflow.python.keras.engine import sequential 33from tensorflow.python.keras.engine import training 34from tensorflow.python.keras.layers import core 35from tensorflow.python.ops import array_ops 36from tensorflow.python.ops import check_ops 37from tensorflow.python.ops import control_flow_ops 38from tensorflow.python.ops import gen_math_ops 39from tensorflow.python.ops import init_ops 40from tensorflow.python.ops import math_ops 41from tensorflow.python.ops import nn_ops 42from tensorflow.python.ops import tensor_array_ops 43from tensorflow.python.ops import variable_scope 44 45 46class FlatPredictionModel(training.Model): 47 """Flattens input and output windows and puts them through dense layers. 48 49 This model does not operate on its own, but rather is a plugin to 50 `ARModel`. See `ARModel`'s constructor documentation 51 (`prediction_model_factory`) for a usage example. 52 """ 53 54 def __init__(self, 55 num_features, 56 input_window_size, 57 output_window_size, 58 hidden_layer_sizes=None): 59 """Construct the flat prediction model. 60 61 Args: 62 num_features: number of input features per time step. 63 input_window_size: Number of past time steps of data to look at when doing 64 the regression. 65 output_window_size: Number of future time steps to predict. Note that 66 setting it to > 1 empirically seems to give a better fit. 67 hidden_layer_sizes: list of sizes of hidden layers. 68 """ 69 super(FlatPredictionModel, self).__init__() 70 self._input_flatten = core.Flatten() 71 self._output_flatten = core.Flatten() 72 if hidden_layer_sizes: 73 self._hidden_layers = sequential.Sequential([ 74 core.Dense(layer_size, activation=nn_ops.relu) 75 for layer_size in hidden_layer_sizes]) 76 else: 77 self._hidden_layers = None 78 self._mean_transform = core.Dense(num_features * output_window_size, 79 name="predicted_mean") 80 self._covariance_transform = core.Dense(num_features * output_window_size, 81 name="log_sigma_square") 82 self._prediction_shape = [-1, output_window_size, num_features] 83 84 def call(self, input_window_features, output_window_features): 85 """Compute predictions from input and output windows. 86 87 Args: 88 input_window_features: A floating point Tensor with shape [batch size, 89 input window size, input features]. The batch dimension may not have 90 static shape information, but the window size and number of input 91 features are known at graph construction time and recorded in the static 92 shape information for the `input_window_features` `Tensor`. Note that 93 `input_window_size` may be zero. 94 output_window_features: A floating point Tensor with shape [batch size, 95 output window size, output features]. As with `input_window_features`, 96 the last two dimensions have static shape information. If there are no 97 output features, the size of the last dimension will be zero. 98 Returns: 99 A dictionary of predictions with keys "mean" and "covariance" (only 100 diagonal covariances are currently supported). Each has shape 101 [batch size, output window size, num_features], where num_features is the 102 same as the constructor argument. 103 """ 104 if input_window_features.shape.dims[1].value == 0: 105 # TODO(allenl): Make reshape()'s static shape information work on 106 # zero-size Tensors? Currently this special case is required because 107 # otherwise the Dense layers get unknown last dimensions. 108 activation = self._output_flatten(output_window_features) 109 elif output_window_features.shape.dims[2].value == 0: 110 activation = self._input_flatten(input_window_features) 111 else: 112 activation = array_ops.concat( 113 [self._input_flatten(input_window_features), 114 self._output_flatten(output_window_features)], 115 axis=1) 116 if self._hidden_layers: 117 activation = self._hidden_layers(activation) 118 predicted_mean = array_ops.reshape( 119 self._mean_transform(activation), 120 self._prediction_shape) 121 predicted_covariance = array_ops.reshape( 122 gen_math_ops.exp(self._covariance_transform(activation)), 123 self._prediction_shape) 124 return {"mean": predicted_mean, 125 "covariance": predicted_covariance} 126 127 128class LSTMPredictionModel(training.Model): 129 """A simple encoder/decoder model using an LSTM. 130 131 This model does not operate on its own, but rather is a plugin to 132 `ARModel`. See `ARModel`'s constructor documentation 133 (`prediction_model_factory`) for a usage example. 134 """ 135 136 def __init__(self, 137 num_features, 138 input_window_size, 139 output_window_size, 140 num_units=128): 141 """Construct the LSTM prediction model. 142 143 Args: 144 num_features: number of input features per time step. 145 input_window_size: Number of past time steps of data to look at when doing 146 the regression. 147 output_window_size: Number of future time steps to predict. Note that 148 setting it to > 1 empirically seems to give a better fit. 149 num_units: The number of units in the encoder and decoder LSTM cells. 150 """ 151 super(LSTMPredictionModel, self).__init__() 152 self._encoder = lstm_ops.LSTMBlockFusedCell( 153 num_units=num_units, name="encoder") 154 self._decoder = lstm_ops.LSTMBlockFusedCell( 155 num_units=num_units, name="decoder") 156 self._mean_transform = core.Dense(num_features, 157 name="mean_transform") 158 self._covariance_transform = core.Dense(num_features, 159 name="covariance_transform") 160 161 def call(self, input_window_features, output_window_features): 162 """Compute predictions from input and output windows.""" 163 # Convert to time major 164 input_window_features = array_ops.transpose(input_window_features, 165 [1, 0, 2]) 166 output_window_features = array_ops.transpose(output_window_features, 167 [1, 0, 2]) 168 _, encoder_state = self._encoder( 169 input_window_features, dtype=self.dtype) 170 decoder_output, _ = self._decoder( 171 output_window_features, dtype=self.dtype, 172 initial_state=encoder_state) 173 174 # Switch back to batch major 175 decoder_output = array_ops.transpose(decoder_output, [1, 0, 2]) 176 predicted_mean = self._mean_transform(decoder_output) 177 predicted_covariance = gen_math_ops.exp( 178 self._covariance_transform(decoder_output)) 179 return {"mean": predicted_mean, 180 "covariance": predicted_covariance} 181 182 183class ARModel(model.TimeSeriesModel): 184 """Auto-regressive model, both linear and non-linear. 185 186 Features to the model include time and values of input_window_size timesteps, 187 and times for output_window_size timesteps. These are passed through a 188 configurable prediction model, and then fed to a loss function (e.g. squared 189 loss). 190 191 Note that this class can also be used to regress against time only by setting 192 the input_window_size to zero. 193 194 Each periodicity in the `periodicities` arg is divided by the 195 `num_time_buckets` into time buckets that are represented as features added 196 to the model. 197 198 A good heuristic for picking an appropriate periodicity for a given data set 199 would be the length of cycles in the data. For example, energy usage in a 200 home is typically cyclic each day. If the time feature in a home energy 201 usage dataset is in the unit of hours, then 24 would be an appropriate 202 periodicity. Similarly, a good heuristic for `num_time_buckets` is how often 203 the data is expected to change within the cycle. For the aforementioned home 204 energy usage dataset and periodicity of 24, then 48 would be a reasonable 205 value if usage is expected to change every half hour. 206 207 Each feature's value for a given example with time t is the difference 208 between t and the start of the time bucket it falls under. If it doesn't fall 209 under a feature's associated time bucket, then that feature's value is zero. 210 211 For example: if `periodicities` = (9, 12) and `num_time_buckets` = 3, then 6 212 features would be added to the model, 3 for periodicity 9 and 3 for 213 periodicity 12. 214 215 For an example data point where t = 17: 216 - It's in the 3rd time bucket for periodicity 9 (2nd period is 9-18 and 3rd 217 time bucket is 15-18) 218 - It's in the 2nd time bucket for periodicity 12 (2nd period is 12-24 and 219 2nd time bucket is between 16-20). 220 221 Therefore the 6 added features for this row with t = 17 would be: 222 223 # Feature name (periodicity#_timebucket#), feature value 224 P9_T1, 0 # not in first time bucket 225 P9_T2, 0 # not in second time bucket 226 P9_T3, 2 # 17 - 15 since 15 is the start of the 3rd time bucket 227 P12_T1, 0 # not in first time bucket 228 P12_T2, 1 # 17 - 16 since 16 is the start of the 2nd time bucket 229 P12_T3, 0 # not in third time bucket 230 """ 231 SQUARED_LOSS = "squared_loss" 232 NORMAL_LIKELIHOOD_LOSS = "normal_likelihood_loss" 233 234 def __init__(self, 235 periodicities, 236 input_window_size, 237 output_window_size, 238 num_features, 239 prediction_model_factory=FlatPredictionModel, 240 num_time_buckets=10, 241 loss=NORMAL_LIKELIHOOD_LOSS, 242 exogenous_feature_columns=None): 243 """Constructs an auto-regressive model. 244 245 Args: 246 periodicities: periodicities of the input data, in the same units as the 247 time feature (for example 24 if feeding hourly data with a daily 248 periodicity, or 60 * 24 if feeding minute-level data with daily 249 periodicity). Note this can be a single value or a list of values for 250 multiple periodicities. 251 input_window_size: Number of past time steps of data to look at when doing 252 the regression. 253 output_window_size: Number of future time steps to predict. Note that 254 setting it to > 1 empirically seems to give a better fit. 255 num_features: number of input features per time step. 256 prediction_model_factory: A callable taking arguments `num_features`, 257 `input_window_size`, and `output_window_size` and returning a 258 `tf.keras.Model`. The `Model`'s `call()` takes two arguments: an input 259 window and an output window, and returns a dictionary of predictions. 260 See `FlatPredictionModel` for an example. Example usage: 261 262 ```python model = ar_model.ARModel( periodicities=2, num_features=3, 263 prediction_model_factory=functools.partial( FlatPredictionModel, 264 hidden_layer_sizes=[10, 10])) ``` 265 266 The default model computes predictions as a linear function of flattened 267 input and output windows. 268 num_time_buckets: Number of buckets into which to divide (time % 269 periodicity). This value multiplied by the number of periodicities is 270 the number of time features added to the model. 271 loss: Loss function to use for training. Currently supported values are 272 SQUARED_LOSS and NORMAL_LIKELIHOOD_LOSS. Note that for 273 NORMAL_LIKELIHOOD_LOSS, we train the covariance term as well. For 274 SQUARED_LOSS, the evaluation loss is reported based on un-scaled 275 observations and predictions, while the training loss is computed on 276 normalized data (if input statistics are available). 277 exogenous_feature_columns: A list of `tf.feature_column`s (for example 278 `tf.feature_column.embedding_column`) corresponding to 279 features which provide extra information to the model but are not part 280 of the series to be predicted. 281 """ 282 self._model_factory = prediction_model_factory 283 self.input_window_size = input_window_size 284 self.output_window_size = output_window_size 285 self.window_size = self.input_window_size + self.output_window_size 286 self.loss = loss 287 super(ARModel, self).__init__( 288 num_features=num_features, 289 exogenous_feature_columns=exogenous_feature_columns) 290 if exogenous_feature_columns is not None: 291 self.exogenous_size = self._get_exogenous_embedding_shape()[-1] 292 else: 293 self.exogenous_size = 0 294 assert num_time_buckets > 0 295 self._buckets = int(num_time_buckets) 296 if periodicities is None or not periodicities: 297 periodicities = [] 298 elif (not isinstance(periodicities, list) and 299 not isinstance(periodicities, tuple)): 300 periodicities = [periodicities] 301 self._periodicities = [int(p) for p in periodicities] 302 for p in self._periodicities: 303 assert p > 0 304 assert len(self._periodicities) or self.input_window_size 305 assert output_window_size > 0 306 307 def initialize_graph(self, input_statistics=None): 308 super(ARModel, self).initialize_graph(input_statistics=input_statistics) 309 self._model_scope = variable_scope.variable_scope( 310 # The trailing slash means we strip all enclosing variable_scopes, which 311 # unfortunately is necessary because the model gets called inside and 312 # outside a "while" scope (for prediction and training respectively), 313 # and the variables names need to match. 314 "model/", use_resource=True) 315 self._model_instance = self._model_factory( 316 num_features=self.num_features, 317 input_window_size=self.input_window_size, 318 output_window_size=self.output_window_size) 319 320 def get_start_state(self): 321 # State which matches the format we'll return later. Typically this will not 322 # be used by the model directly, but the shapes and dtypes should match so 323 # that the serving input_receiver_fn gets placeholder shapes correct. 324 return (array_ops.zeros([self.input_window_size], dtype=dtypes.int64), 325 array_ops.zeros( 326 [self.input_window_size, self.num_features], dtype=self.dtype), 327 array_ops.zeros( 328 [self.input_window_size, self.exogenous_size], 329 dtype=self.dtype)) 330 331 # TODO(allenl,agarwal): Support sampling for AR. 332 def random_model_parameters(self, seed=None): 333 pass 334 335 def generate(self, number_of_series, series_length, 336 model_parameters=None, seed=None): 337 pass 338 339 def _predicted_covariance_op(self, activations, num_values): 340 activation, activation_size = activations[-1] 341 if self.loss == ARModel.NORMAL_LIKELIHOOD_LOSS: 342 log_sigma_square = model_utils.fully_connected( 343 activation, 344 activation_size, 345 self.output_window_size * num_values, 346 name="log_sigma_square", 347 activation=None) 348 predicted_covariance = gen_math_ops.exp(log_sigma_square) 349 predicted_covariance = array_ops.reshape( 350 predicted_covariance, [-1, self.output_window_size, num_values]) 351 else: 352 shape = array_ops.stack([ 353 array_ops.shape(activation)[0], 354 constant_op.constant(self.output_window_size), 355 constant_op.constant(num_values) 356 ]) 357 predicted_covariance = array_ops.ones(shape=shape, dtype=activation.dtype) 358 return predicted_covariance 359 360 def _predicted_mean_op(self, activations): 361 activation, activation_size = activations[-1] 362 predicted_mean = model_utils.fully_connected( 363 activation, 364 activation_size, 365 self.output_window_size * self.num_features, 366 name="predicted_mean", 367 activation=None) 368 return array_ops.reshape(predicted_mean, 369 [-1, self.output_window_size, self.num_features]) 370 371 def prediction_ops(self, times, values, exogenous_regressors): 372 """Compute model predictions given input data. 373 374 Args: 375 times: A [batch size, self.window_size] integer Tensor, the first 376 self.input_window_size times in each part of the batch indicating 377 input features, and the last self.output_window_size times indicating 378 prediction times. 379 values: A [batch size, self.input_window_size, self.num_features] Tensor 380 with input features. 381 exogenous_regressors: A [batch size, self.window_size, 382 self.exogenous_size] Tensor with exogenous features. 383 Returns: 384 Tuple (predicted_mean, predicted_covariance), where each element is a 385 Tensor with shape [batch size, self.output_window_size, 386 self.num_features]. 387 """ 388 times.get_shape().assert_is_compatible_with([None, self.window_size]) 389 batch_size = array_ops.shape(times)[0] 390 if self.input_window_size: 391 values.get_shape().assert_is_compatible_with( 392 [None, self.input_window_size, self.num_features]) 393 if exogenous_regressors is not None: 394 exogenous_regressors.get_shape().assert_is_compatible_with( 395 [None, self.window_size, self.exogenous_size]) 396 # Create input features. 397 input_window_features = [] 398 input_feature_size = 0 399 output_window_features = [] 400 output_feature_size = 0 401 if self._periodicities: 402 _, time_features = self._compute_time_features(times) 403 num_time_features = self._buckets * len(self._periodicities) 404 time_features = array_ops.reshape( 405 time_features, 406 [batch_size, 407 self.window_size, 408 num_time_features]) 409 input_time_features, output_time_features = array_ops.split( 410 time_features, (self.input_window_size, self.output_window_size), 411 axis=1) 412 input_feature_size += num_time_features 413 output_feature_size += num_time_features 414 input_window_features.append(input_time_features) 415 output_window_features.append(output_time_features) 416 if self.input_window_size: 417 inp = array_ops.slice(values, [0, 0, 0], [-1, self.input_window_size, -1]) 418 input_window_features.append( 419 array_ops.reshape( 420 inp, 421 [batch_size, self.input_window_size, self.num_features])) 422 input_feature_size += self.num_features 423 if self.exogenous_size: 424 input_exogenous_features, output_exogenous_features = array_ops.split( 425 exogenous_regressors, 426 (self.input_window_size, self.output_window_size), 427 axis=1) 428 input_feature_size += self.exogenous_size 429 output_feature_size += self.exogenous_size 430 input_window_features.append(input_exogenous_features) 431 output_window_features.append(output_exogenous_features) 432 assert input_window_features 433 input_window_features = array_ops.concat(input_window_features, axis=2) 434 if output_window_features: 435 output_window_features = array_ops.concat(output_window_features, axis=2) 436 else: 437 output_window_features = array_ops.zeros( 438 [batch_size, self.output_window_size, 0], 439 dtype=self.dtype) 440 static_batch_size = times.get_shape().dims[0].value 441 input_window_features.set_shape( 442 [static_batch_size, self.input_window_size, input_feature_size]) 443 output_window_features.set_shape( 444 [static_batch_size, self.output_window_size, output_feature_size]) 445 return self._output_window_predictions(input_window_features, 446 output_window_features) 447 448 def _output_window_predictions( 449 self, input_window_features, output_window_features): 450 with self._model_scope: 451 predictions = self._model_instance( 452 input_window_features, output_window_features) 453 result_shape = [None, self.output_window_size, self.num_features] 454 for v in predictions.values(): 455 v.set_shape(result_shape) 456 return predictions 457 458 def loss_op(self, targets, prediction_ops): 459 """Create loss_op.""" 460 prediction = prediction_ops["mean"] 461 if self.loss == ARModel.NORMAL_LIKELIHOOD_LOSS: 462 covariance = prediction_ops["covariance"] 463 sigma = math_ops.sqrt(gen_math_ops.maximum(covariance, 1e-5)) 464 loss_op = -math_ops.reduce_sum( 465 math_utils.normal_log_prob(targets, sigma, prediction)) 466 else: 467 assert self.loss == ARModel.SQUARED_LOSS, self.loss 468 loss_op = math_ops.reduce_sum( 469 math_ops.squared_difference(prediction, targets)) 470 loss_op /= math_ops.cast( 471 math_ops.reduce_prod(array_ops.shape(targets)), loss_op.dtype) 472 return loss_op 473 474 def _process_exogenous_features(self, times, features): 475 embedded = super(ARModel, self)._process_exogenous_features( 476 times=times, features=features) 477 if embedded is None: 478 assert self.exogenous_size == 0 479 # No embeddings. Return a zero-size [batch, times, 0] array so we don't 480 # have to special case it downstream. 481 return array_ops.zeros( 482 array_ops.concat([array_ops.shape(times), constant_op.constant([0])], 483 axis=0)) 484 else: 485 return embedded 486 487 # TODO(allenl, agarwal): Consider better ways of warm-starting predictions. 488 def predict(self, features): 489 """Computes predictions multiple steps into the future. 490 491 Args: 492 features: A dictionary with the following key/value pairs: 493 PredictionFeatures.TIMES: A [batch size, predict window size] 494 integer Tensor of times, after the window of data indicated by 495 `STATE_TUPLE`, to make predictions for. 496 PredictionFeatures.STATE_TUPLE: A tuple of (times, values), times with 497 shape [batch size, self.input_window_size], values with shape [batch 498 size, self.input_window_size, self.num_features] representing a 499 segment of the time series before `TIMES`. This data is used 500 to start of the autoregressive computation. This should have data for 501 at least self.input_window_size timesteps. 502 And any exogenous features, with shapes prefixed by shape of `TIMES`. 503 Returns: 504 A dictionary with keys, "mean", "covariance". The 505 values are Tensors of shape [batch_size, predict window size, 506 num_features] and correspond to the values passed in `TIMES`. 507 """ 508 if not self._graph_initialized: 509 self.initialize_graph() 510 predict_times = math_ops.cast( 511 ops.convert_to_tensor(features[PredictionFeatures.TIMES]), dtypes.int32) 512 exogenous_regressors = self._process_exogenous_features( 513 times=predict_times, 514 features={key: value for key, value in features.items() 515 if key not in [TrainEvalFeatures.TIMES, 516 TrainEvalFeatures.VALUES, 517 PredictionFeatures.STATE_TUPLE]}) 518 with ops.control_dependencies( 519 [check_ops.assert_equal(array_ops.shape(predict_times)[1], 520 array_ops.shape(exogenous_regressors)[1])]): 521 exogenous_regressors = array_ops.identity(exogenous_regressors) 522 batch_size = array_ops.shape(predict_times)[0] 523 num_predict_values = array_ops.shape(predict_times)[1] 524 prediction_iterations = ((num_predict_values + self.output_window_size - 1) 525 // self.output_window_size) 526 # Pad predict_times and exogenous regressors so as to have exact multiple of 527 # self.output_window_size values per example. 528 padding_size = (prediction_iterations * self.output_window_size - 529 num_predict_values) 530 predict_times = array_ops.pad( 531 predict_times, [[0, 0], [0, padding_size]]) 532 exogenous_regressors = array_ops.pad( 533 exogenous_regressors, [[0, 0], [0, padding_size], [0, 0]]) 534 state = features[PredictionFeatures.STATE_TUPLE] 535 (state_times, state_values, state_exogenous_regressors) = state 536 state_times = math_ops.cast( 537 ops.convert_to_tensor(state_times), dtypes.int32) 538 state_values = ops.convert_to_tensor(state_values, dtype=self.dtype) 539 state_exogenous_regressors = ops.convert_to_tensor( 540 state_exogenous_regressors, dtype=self.dtype) 541 542 initial_input_times = predict_times[:, :self.output_window_size] 543 initial_input_exogenous_regressors = ( 544 exogenous_regressors[:, :self.output_window_size, :]) 545 if self.input_window_size > 0: 546 initial_input_times = array_ops.concat( 547 [state_times[:, -self.input_window_size:], initial_input_times], 1) 548 values_size = array_ops.shape(state_values)[1] 549 times_size = array_ops.shape(state_times)[1] 550 with ops.control_dependencies([ 551 check_ops.assert_greater_equal(values_size, self.input_window_size), 552 check_ops.assert_equal(values_size, times_size) 553 ]): 554 initial_input_values = state_values[:, -self.input_window_size:, :] 555 initial_input_exogenous_regressors = array_ops.concat( 556 [state_exogenous_regressors[:, -self.input_window_size:, :], 557 initial_input_exogenous_regressors[ 558 :, :self.output_window_size, :]], 559 axis=1) 560 else: 561 initial_input_values = 0 562 563 # Iterate over the predict_times, predicting self.output_window_size values 564 # in each iteration. 565 def _while_condition(iteration_number, *unused_args): 566 return math_ops.less(iteration_number, prediction_iterations) 567 568 def _while_body(iteration_number, input_times, input_values, 569 input_exogenous_regressors, mean_ta, covariance_ta): 570 """Predict self.output_window_size values.""" 571 prediction_ops = self.prediction_ops( 572 input_times, input_values, input_exogenous_regressors) 573 predicted_mean = prediction_ops["mean"] 574 predicted_covariance = prediction_ops["covariance"] 575 offset = self.output_window_size * gen_math_ops.minimum( 576 iteration_number + 1, prediction_iterations - 1) 577 if self.input_window_size > 0: 578 if self.output_window_size < self.input_window_size: 579 new_input_values = array_ops.concat( 580 [input_values[:, self.output_window_size:, :], predicted_mean], 1) 581 new_input_exogenous_regressors = array_ops.concat( 582 [input_exogenous_regressors[:, -self.input_window_size:, :], 583 exogenous_regressors[ 584 :, offset:offset + self.output_window_size, :]], 585 axis=1) 586 new_input_times = array_ops.concat([ 587 input_times[:, -self.input_window_size:], 588 predict_times[:, offset:offset + self.output_window_size] 589 ], 1) 590 else: 591 new_input_values = predicted_mean[:, -self.input_window_size:, :] 592 new_input_exogenous_regressors = exogenous_regressors[ 593 :, 594 offset - self.input_window_size:offset + self.output_window_size, 595 :] 596 new_input_times = predict_times[ 597 :, 598 offset - self.input_window_size:offset + self.output_window_size] 599 else: 600 new_input_values = input_values 601 new_input_exogenous_regressors = exogenous_regressors[ 602 :, offset:offset + self.output_window_size, :] 603 new_input_times = predict_times[:, 604 offset:offset + self.output_window_size] 605 new_input_times.set_shape(initial_input_times.get_shape()) 606 new_input_exogenous_regressors.set_shape( 607 initial_input_exogenous_regressors.get_shape()) 608 new_mean_ta = mean_ta.write(iteration_number, predicted_mean) 609 if isinstance(covariance_ta, tensor_array_ops.TensorArray): 610 new_covariance_ta = covariance_ta.write(iteration_number, 611 predicted_covariance) 612 else: 613 new_covariance_ta = covariance_ta 614 return (iteration_number + 1, 615 new_input_times, 616 new_input_values, 617 new_input_exogenous_regressors, 618 new_mean_ta, 619 new_covariance_ta) 620 621 # Note that control_flow_ops.while_loop doesn't seem happy with None. Hence 622 # using 0 for cases where we don't want to predict covariance. 623 covariance_ta_init = (tensor_array_ops.TensorArray( 624 dtype=self.dtype, size=prediction_iterations) 625 if self.loss != ARModel.SQUARED_LOSS else 0.) 626 mean_ta_init = tensor_array_ops.TensorArray( 627 dtype=self.dtype, size=prediction_iterations) 628 _, _, _, _, mean_ta, covariance_ta = control_flow_ops.while_loop( 629 _while_condition, _while_body, [ 630 0, 631 initial_input_times, 632 initial_input_values, 633 initial_input_exogenous_regressors, 634 mean_ta_init, 635 covariance_ta_init 636 ]) 637 638 def _parse_ta(values_ta): 639 """Helper function to parse the returned TensorArrays.""" 640 641 if not isinstance(values_ta, tensor_array_ops.TensorArray): 642 return None 643 predictions_length = prediction_iterations * self.output_window_size 644 # Shape [prediction_iterations, batch_size, self.output_window_size, 645 # self.num_features] 646 values_packed = values_ta.stack() 647 # Transpose to move batch dimension outside. 648 output_values = array_ops.reshape( 649 array_ops.transpose(values_packed, [1, 0, 2, 3]), 650 array_ops.stack([batch_size, predictions_length, -1])) 651 # Clip to desired size 652 return output_values[:, :num_predict_values, :] 653 654 predicted_mean = _parse_ta(mean_ta) 655 predicted_covariance = _parse_ta(covariance_ta) 656 if predicted_covariance is None: 657 predicted_covariance = array_ops.ones_like(predicted_mean) 658 659 # Transform and scale the mean and covariance appropriately. 660 predicted_mean = self._scale_back_data(predicted_mean) 661 predicted_covariance = self._scale_back_variance(predicted_covariance) 662 663 return {"mean": predicted_mean, 664 "covariance": predicted_covariance} 665 666 def _process_window(self, features, mode, exogenous_regressors): 667 """Compute model outputs on a single window of data.""" 668 times = math_ops.cast(features[TrainEvalFeatures.TIMES], dtypes.int64) 669 values = math_ops.cast(features[TrainEvalFeatures.VALUES], dtype=self.dtype) 670 exogenous_regressors = math_ops.cast(exogenous_regressors, dtype=self.dtype) 671 original_values = values 672 673 # Extra shape checking for the window size (above that in 674 # `head.create_estimator_spec`). 675 expected_times_shape = [None, self.window_size] 676 if not times.get_shape().is_compatible_with(expected_times_shape): 677 raise ValueError( 678 ("ARModel with input_window_size={input_window_size} " 679 "and output_window_size={output_window_size} expects " 680 "feature '{times_feature}' to have shape (batch_size, " 681 "{window_size}) (for any batch_size), but got shape {times_shape}. " 682 "If you are using RandomWindowInputFn, set " 683 "window_size={window_size} or adjust the input_window_size and " 684 "output_window_size arguments to ARModel.").format( 685 input_window_size=self.input_window_size, 686 output_window_size=self.output_window_size, 687 times_feature=TrainEvalFeatures.TIMES, 688 window_size=self.window_size, 689 times_shape=times.get_shape())) 690 values = self._scale_data(values) 691 if self.input_window_size > 0: 692 input_values = values[:, :self.input_window_size, :] 693 else: 694 input_values = None 695 prediction_ops = self.prediction_ops( 696 times, input_values, exogenous_regressors) 697 prediction = prediction_ops["mean"] 698 covariance = prediction_ops["covariance"] 699 targets = array_ops.slice(values, [0, self.input_window_size, 0], 700 [-1, -1, -1]) 701 targets.get_shape().assert_is_compatible_with(prediction.get_shape()) 702 if (mode == estimator_lib.ModeKeys.EVAL 703 and self.loss == ARModel.SQUARED_LOSS): 704 # Report an evaluation loss which matches the expected 705 # (observed - predicted) ** 2. 706 # Note that this affects only evaluation; the training loss is unaffected. 707 loss = self.loss_op( 708 self._scale_back_data(targets), 709 {"mean": self._scale_back_data(prediction_ops["mean"])}) 710 else: 711 loss = self.loss_op(targets, prediction_ops) 712 713 # Scale back the prediction. 714 prediction = self._scale_back_data(prediction) 715 covariance = self._scale_back_variance(covariance) 716 717 return model.ModelOutputs( 718 loss=loss, 719 end_state=(times[:, -self.input_window_size:], 720 values[:, -self.input_window_size:, :], 721 exogenous_regressors[:, -self.input_window_size:, :]), 722 predictions={"mean": prediction, "covariance": covariance, 723 "observed": original_values[:, -self.output_window_size:]}, 724 prediction_times=times[:, -self.output_window_size:]) 725 726 def get_batch_loss(self, features, mode, state): 727 """Computes predictions and a loss. 728 729 Args: 730 features: A dictionary (such as is produced by a chunker) with the 731 following key/value pairs (shapes are given as required for training): 732 TrainEvalFeatures.TIMES: A [batch size, self.window_size] integer 733 Tensor with times for each observation. To train on longer 734 sequences, the data should first be chunked. 735 TrainEvalFeatures.VALUES: A [batch size, self.window_size, 736 self.num_features] Tensor with values for each observation. 737 When evaluating, `TIMES` and `VALUES` must have a window size of at 738 least self.window_size, but it may be longer, in which case the last 739 window_size - self.input_window_size times (or fewer if this is not 740 divisible by self.output_window_size) will be evaluated on with 741 non-overlapping output windows (and will have associated 742 predictions). This is primarily to support qualitative 743 evaluation/plotting, and is not a recommended way to compute evaluation 744 losses (since there is no overlap in the output windows, which for 745 window-based models is an undesirable bias). 746 mode: The tf.estimator.ModeKeys mode to use (TRAIN or EVAL). 747 state: Unused 748 Returns: 749 A model.ModelOutputs object. 750 Raises: 751 ValueError: If `mode` is not TRAIN or EVAL, or if static shape information 752 is incorrect. 753 """ 754 features = {feature_name: ops.convert_to_tensor(feature_value) 755 for feature_name, feature_value in features.items()} 756 times = features[TrainEvalFeatures.TIMES] 757 exogenous_regressors = self._process_exogenous_features( 758 times=times, 759 features={key: value for key, value in features.items() 760 if key not in [TrainEvalFeatures.TIMES, 761 TrainEvalFeatures.VALUES, 762 PredictionFeatures.STATE_TUPLE]}) 763 if mode == estimator_lib.ModeKeys.TRAIN: 764 # For training, we require the window size to be self.window_size as 765 # iterating sequentially on larger windows could introduce a bias. 766 return self._process_window( 767 features, mode=mode, exogenous_regressors=exogenous_regressors) 768 elif mode == estimator_lib.ModeKeys.EVAL: 769 # For evaluation, we allow the user to pass in a larger window, in which 770 # case we try to cover as much of the window as possible without 771 # overlap. Quantitative evaluation is more efficient/correct with fixed 772 # windows matching self.window_size (as with training), but this looping 773 # allows easy plotting of "in-sample" predictions. 774 times.get_shape().assert_has_rank(2) 775 static_window_size = times.get_shape().dims[1].value 776 if (static_window_size is not None 777 and static_window_size < self.window_size): 778 raise ValueError( 779 ("ARModel requires a window of at least input_window_size + " 780 "output_window_size to evaluate on (input_window_size={}, " 781 "output_window_size={}, and got shape {} for feature '{}' (batch " 782 "size, window size)).").format( 783 self.input_window_size, self.output_window_size, 784 times.get_shape(), TrainEvalFeatures.TIMES)) 785 num_iterations = ((array_ops.shape(times)[1] - self.input_window_size) 786 // self.output_window_size) 787 output_size = num_iterations * self.output_window_size 788 # Rather than dealing with overlapping windows of output, discard a bit at 789 # the beginning if output windows don't cover evenly. 790 crop_length = output_size + self.input_window_size 791 features = {feature_name: feature_value[:, -crop_length:] 792 for feature_name, feature_value in features.items()} 793 # Note that, unlike the ARModel's predict() while_loop and the 794 # SequentialTimeSeriesModel while_loop, each iteration here can run in 795 # parallel, since we are not feeding predictions or state from previous 796 # iterations. 797 def _while_condition(iteration_number, loss_ta, mean_ta, covariance_ta): 798 del loss_ta, mean_ta, covariance_ta # unused 799 return iteration_number < num_iterations 800 801 def _while_body(iteration_number, loss_ta, mean_ta, covariance_ta): 802 """Perform a processing step on a single window of data.""" 803 base_offset = iteration_number * self.output_window_size 804 model_outputs = self._process_window( 805 features={ 806 feature_name: 807 feature_value[:, base_offset:base_offset + self.window_size] 808 for feature_name, feature_value in features.items()}, 809 mode=mode, 810 exogenous_regressors=exogenous_regressors[ 811 :, base_offset:base_offset + self.window_size]) 812 # This code needs to be updated if new predictions are added in 813 # self._process_window 814 assert len(model_outputs.predictions) == 3 815 assert "mean" in model_outputs.predictions 816 assert "covariance" in model_outputs.predictions 817 assert "observed" in model_outputs.predictions 818 return (iteration_number + 1, 819 loss_ta.write( 820 iteration_number, model_outputs.loss), 821 mean_ta.write( 822 iteration_number, model_outputs.predictions["mean"]), 823 covariance_ta.write( 824 iteration_number, model_outputs.predictions["covariance"])) 825 _, loss_ta, mean_ta, covariance_ta = control_flow_ops.while_loop( 826 _while_condition, _while_body, 827 [0, 828 tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations), 829 tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations), 830 tensor_array_ops.TensorArray(dtype=self.dtype, size=num_iterations)]) 831 values = math_ops.cast(features[TrainEvalFeatures.VALUES], 832 dtype=self.dtype) 833 batch_size = array_ops.shape(times)[0] 834 prediction_shape = [batch_size, self.output_window_size * num_iterations, 835 self.num_features] 836 (previous_state_times, 837 previous_state_values, 838 previous_state_exogenous_regressors) = state 839 # Make sure returned state always has windows of self.input_window_size, 840 # even if we were passed fewer than self.input_window_size points this 841 # time. 842 if self.input_window_size > 0: 843 new_state_times = array_ops.concat( 844 [previous_state_times, 845 math_ops.cast(times, dtype=dtypes.int64)], 846 axis=1)[:, -self.input_window_size:] 847 new_state_times.set_shape((None, self.input_window_size)) 848 new_state_values = array_ops.concat( 849 [previous_state_values, 850 self._scale_data(values)], axis=1)[:, -self.input_window_size:, :] 851 new_state_values.set_shape((None, self.input_window_size, 852 self.num_features)) 853 new_exogenous_regressors = array_ops.concat( 854 [previous_state_exogenous_regressors, 855 exogenous_regressors], axis=1)[:, -self.input_window_size:, :] 856 new_exogenous_regressors.set_shape( 857 (None, 858 self.input_window_size, 859 self.exogenous_size)) 860 else: 861 # There is no state to keep, and the strided slices above do not handle 862 # input_window_size=0. 863 new_state_times = previous_state_times 864 new_state_values = previous_state_values 865 new_exogenous_regressors = previous_state_exogenous_regressors 866 return model.ModelOutputs( 867 loss=math_ops.reduce_mean(loss_ta.stack(), axis=0), 868 end_state=(new_state_times, 869 new_state_values, 870 new_exogenous_regressors), 871 predictions={ 872 "mean": array_ops.reshape( 873 array_ops.transpose(mean_ta.stack(), [1, 0, 2, 3]), 874 prediction_shape), 875 "covariance": array_ops.reshape( 876 array_ops.transpose(covariance_ta.stack(), [1, 0, 2, 3]), 877 prediction_shape), 878 "observed": values[:, -output_size:]}, 879 prediction_times=times[:, -output_size:]) 880 else: 881 raise ValueError( 882 "Unknown mode '{}' passed to get_batch_loss.".format(mode)) 883 884 def _compute_time_features(self, time): 885 """Compute some features on the time value.""" 886 batch_size = array_ops.shape(time)[0] 887 num_periods = len(self._periodicities) 888 # Reshape to 3D. 889 periods = constant_op.constant( 890 self._periodicities, shape=[1, 1, num_periods, 1], dtype=time.dtype) 891 time = array_ops.reshape(time, [batch_size, -1, 1, 1]) 892 window_offset = time / self._periodicities 893 # Cast to appropriate type and scale to [0, 1) range 894 mod = (math_ops.cast(time % periods, self.dtype) * self._buckets / 895 math_ops.cast(periods, self.dtype)) 896 # Bucketize based on some fixed width intervals. For a value t and interval 897 # [a, b), we return (t - a) if a <= t < b, else 0. 898 intervals = array_ops.reshape( 899 math_ops.range(self._buckets, dtype=self.dtype), 900 [1, 1, 1, self._buckets]) 901 mod = nn_ops.relu(mod - intervals) 902 mod = array_ops.where(mod < 1.0, mod, array_ops.zeros_like(mod)) 903 return window_offset, mod 904 905 906class AnomalyMixtureARModel(ARModel): 907 """Model data as a mixture of normal and anomaly distributions. 908 909 Note that this model works by changing the loss function to reduce the penalty 910 when predicting an anomalous target. However the predictions are still based 911 on anomalous input features, and this may affect the quality of fit. One 912 possible solution is to downweight/filter anomalous inputs, but that requires 913 more sequential processing instead of completely random windows. 914 """ 915 916 GAUSSIAN_ANOMALY = "gaussian" 917 CAUCHY_ANOMALY = "cauchy" 918 919 def __init__(self, 920 periodicities, 921 anomaly_prior_probability, 922 input_window_size, 923 output_window_size, 924 num_features, 925 prediction_model_factory=FlatPredictionModel, 926 anomaly_distribution=GAUSSIAN_ANOMALY, 927 num_time_buckets=10, 928 exogenous_feature_columns=None): 929 assert (anomaly_prior_probability < 1.0 and 930 anomaly_prior_probability > 0.0) 931 self._anomaly_prior_probability = anomaly_prior_probability 932 assert anomaly_distribution in [ 933 AnomalyMixtureARModel.GAUSSIAN_ANOMALY, 934 AnomalyMixtureARModel.CAUCHY_ANOMALY] 935 self._anomaly_distribution = anomaly_distribution 936 super(AnomalyMixtureARModel, self).__init__( 937 periodicities=periodicities, 938 num_features=num_features, 939 num_time_buckets=num_time_buckets, 940 input_window_size=input_window_size, 941 output_window_size=output_window_size, 942 loss=ARModel.NORMAL_LIKELIHOOD_LOSS, 943 prediction_model_factory=prediction_model_factory, 944 exogenous_feature_columns=exogenous_feature_columns) 945 946 def _create_anomaly_ops(self, times, values, prediction_ops_dict): 947 anomaly_log_param = variable_scope.get_variable( 948 "anomaly_log_param", 949 shape=[], 950 dtype=self.dtype, 951 initializer=init_ops.zeros_initializer()) 952 # Anomaly param is the variance for Gaussian and scale for Cauchy 953 # distribution. 954 prediction_ops_dict["anomaly_params"] = gen_math_ops.exp(anomaly_log_param) 955 956 def prediction_ops(self, times, values, exogenous_regressors): 957 prediction_ops_dict = super(AnomalyMixtureARModel, self).prediction_ops( 958 times, values, exogenous_regressors) 959 self._create_anomaly_ops(times, values, prediction_ops_dict) 960 return prediction_ops_dict 961 962 def _anomaly_log_prob(self, targets, prediction_ops): 963 prediction = prediction_ops["mean"] 964 if self._anomaly_distribution == AnomalyMixtureARModel.GAUSSIAN_ANOMALY: 965 anomaly_variance = prediction_ops["anomaly_params"] 966 anomaly_sigma = math_ops.sqrt( 967 gen_math_ops.maximum(anomaly_variance, 1e-5)) 968 log_prob = math_utils.normal_log_prob(targets, anomaly_sigma, prediction) 969 else: 970 assert self._anomaly_distribution == AnomalyMixtureARModel.CAUCHY_ANOMALY 971 anomaly_scale = prediction_ops["anomaly_params"] 972 log_prob = math_utils.cauchy_log_prob(targets, anomaly_scale, prediction) 973 return log_prob 974 975 def loss_op(self, targets, prediction_ops): 976 """Create loss_op.""" 977 prediction = prediction_ops["mean"] 978 covariance = prediction_ops["covariance"] 979 # Normal data log probability. 980 sigma = math_ops.sqrt(gen_math_ops.maximum(covariance, 1e-5)) 981 log_prob1 = math_utils.normal_log_prob(targets, sigma, prediction) 982 log_prob1 += math_ops.log(1 - self._anomaly_prior_probability) 983 # Anomaly log probability. 984 log_prob2 = self._anomaly_log_prob(targets, prediction_ops) 985 log_prob2 += math_ops.log(self._anomaly_prior_probability) 986 # We need to compute log(exp(log_prob1) + exp(log_prob2). For numerical 987 # stability, we rewrite the expression as below. 988 p1 = gen_math_ops.minimum(log_prob1, log_prob2) 989 p2 = gen_math_ops.maximum(log_prob1, log_prob2) 990 mixed_log_prob = p2 + math_ops.log(1 + gen_math_ops.exp(p1 - p2)) 991 loss_op = -math_ops.reduce_sum(mixed_log_prob) 992 loss_op /= math_ops.cast( 993 math_ops.reduce_prod(array_ops.shape(targets)), self.dtype) 994 return loss_op 995