1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Defines ways of splicing and re-arranging time series. 16 17This file provides methods for reading, parsing, and re-arranging a time 18series. The main departure from standard TensorFlow input pipelines is a focus 19on "chunking" a time series, i.e. slicing it into small contiguous windows which 20are then batched together for training, a form of truncated 21backpropagation. This typically provides a significant speedup compared to 22looping over the whole series sequentially, by exploiting data parallelism and 23by reducing redundant contributions to gradients (due to redundant information 24in the series itself). 25 26A series, consisting of times (an increasing vector of integers) and values (one 27or more floating point values for each time) along with any exogenous features, 28is stored either in memory or on disk in various formats (e.g. "one record per 29timestep" on disk, or as a dictionary of Numpy arrays in memory). The location 30and format is specified by configuring a `TimeSeriesReader` object 31(e.g. `NumpyReader`, `CSVReader`), which reads the data into the TensorFlow 32graph. A `TimeSeriesInputFn` object (typically `RandomWindowInputFn`) then 33performs windowing and batching. 34 35Time series are passed through this pipeline as dictionaries mapping feature 36names to their values. For training and evaluation, these require at minimum 37`TrainEvalFeatures.TIMES` (scalar integers, one per timestep) and 38`TrainEvalFeatures.VALUES` (may be either univariate or multivariate). Exogenous 39features may have any shape, but are likewise associated with a timestep. Times 40themselves need not be contiguous or regular (although smaller/fewer gaps are 41generally better), but each timestep must have all `VALUES` and any exogenous 42features (i.e. times may be missing, but given that a time is specified, every 43other feature must also be specified for that step; some models may support 44making exogenous updates conditional). 45 46The expected use case of a `TimeSeriesInputFn` is that it is first configured 47(for example setting a batch or window size) and passed a reader (a 48`TimeSeriesReader` object). The `TimeSeriesInputFn` can then be passed as the 49input_fn of an Estimator. 50 51For example, `RandomWindowInputFn` is useful for creating batches of random 52chunks of a series for training: 53 54``` 55 # Read data in the default "time,value" CSV format with no header 56 reader = input_pipeline.CSVReader(csv_file_name) 57 # Set up windowing and batching for training 58 train_input_fn = input_pipeline.RandomWindowInputFn( 59 reader, batch_size=16, window_size=16) 60 # Fit model parameters to data 61 estimator.train(input_fn=train_input_fn, steps=150) 62``` 63 64`RandomWindowInputFn` is the primary tool for training and quantitative 65evaluation of time series. `WholeDatasetInputFn`, which reads a whole series 66into memory, is useful for qualitative evaluation and preparing to make 67predictions with `predict_continuation_input_fn`. 68""" 69 70from __future__ import absolute_import 71from __future__ import division 72from __future__ import print_function 73 74import abc 75 76import numpy 77 78from tensorflow.contrib.timeseries.python.timeseries import feature_keys 79from tensorflow.contrib.timeseries.python.timeseries import model_utils 80 81from tensorflow.python.estimator import estimator_lib 82from tensorflow.python.framework import constant_op 83from tensorflow.python.framework import dtypes 84from tensorflow.python.framework import ops 85from tensorflow.python.framework import tensor_shape 86from tensorflow.python.ops import array_ops 87from tensorflow.python.ops import control_flow_ops 88from tensorflow.python.ops import io_ops 89from tensorflow.python.ops import math_ops 90from tensorflow.python.ops import nn 91from tensorflow.python.ops import parsing_ops 92from tensorflow.python.ops import random_ops 93from tensorflow.python.ops import state_ops 94from tensorflow.python.ops import tensor_array_ops 95from tensorflow.python.ops import variable_scope 96from tensorflow.python.training import input as input_lib 97from tensorflow.python.training import training 98from tensorflow.python.util import nest 99 100 101def predict_continuation_input_fn( 102 evaluation, steps=None, times=None, exogenous_features=None): 103 """An Estimator input_fn for running predict() after evaluate(). 104 105 If the call to evaluate() we are making predictions based on had a batch_size 106 greater than one, predictions will start after each of these windows 107 (i.e. will have the same batch dimension). 108 109 Args: 110 evaluation: The dictionary returned by `Estimator.evaluate`, with keys 111 FilteringResults.STATE_TUPLE and FilteringResults.TIMES. 112 steps: The number of steps to predict (scalar), starting after the 113 evaluation. If `times` is specified, `steps` must not be; one is required. 114 times: A [batch_size x window_size] array of integers (not a Tensor) 115 indicating times to make predictions for. These times must be after the 116 corresponding evaluation. If `steps` is specified, `times` must not be; 117 one is required. If the batch dimension is omitted, it is assumed to be 1. 118 exogenous_features: Optional dictionary. If specified, indicates exogenous 119 features for the model to use while making the predictions. Values must 120 have shape [batch_size x window_size x ...], where `batch_size` matches 121 the batch dimension used when creating `evaluation`, and `window_size` is 122 either the `steps` argument or the `window_size` of the `times` argument 123 (depending on which was specified). 124 Returns: 125 An `input_fn` suitable for passing to the `predict` function of a time 126 series `Estimator`. 127 Raises: 128 ValueError: If `times` or `steps` are misspecified. 129 """ 130 if exogenous_features is None: 131 exogenous_features = {} 132 predict_times = model_utils.canonicalize_times_or_steps_from_output( 133 times=times, steps=steps, previous_model_output=evaluation) 134 features = { 135 feature_keys.PredictionFeatures.STATE_TUPLE: 136 evaluation[feature_keys.FilteringResults.STATE_TUPLE], 137 feature_keys.PredictionFeatures.TIMES: 138 predict_times 139 } 140 features.update(exogenous_features) 141 def _predict_input_fn(): 142 """An input_fn for predict().""" 143 # Prevents infinite iteration with a constant output in an Estimator's 144 # predict(). 145 limited_features = {} 146 for key, values in features.items(): 147 limited_values = nest.map_structure( 148 lambda value: training.limit_epochs(value, num_epochs=1), values) 149 limited_features[key] = limited_values 150 return (limited_features, None) 151 return _predict_input_fn 152 153 154class TimeSeriesReader(object): 155 """Reads from and parses a data source for a `TimeSeriesInputFn`. 156 157 This class provides methods that read a few records (`read`) or the full data 158 set at once (`read_full`), and returns them as dictionaries mapping feature 159 names to feature Tensors. Please see note at the top of the file for the 160 structure of these dictionaries. The output is generally chunked by a 161 `TimeSeriesInputFn` before being passed to the model. 162 """ 163 164 def check_dataset_size(self, minimum_dataset_size): 165 """When possible, raises an error if the dataset is too small. 166 167 This method allows TimeSeriesReaders to raise informative error messages if 168 the user has selected a window size in their TimeSeriesInputFn which is 169 larger than the dataset size. However, many TimeSeriesReaders will not have 170 access to a dataset size, in which case they do not need to override this 171 method. 172 173 Args: 174 minimum_dataset_size: The minimum number of records which should be 175 contained in the dataset. Readers should attempt to raise an error when 176 possible if an epoch of data contains fewer records. 177 """ 178 pass 179 180 @abc.abstractmethod 181 def read(self): 182 """Parses one or more records into a feature dictionary. 183 184 This method is expected to be called by a `TimeSeriesInputFn` object, and is 185 not for use with models directly. 186 187 A `TimeSeriesReader` object reads multiple records at a single time for 188 efficiency; the size of these batches is an implementation detail internal 189 to the input pipeline. These records should generally be sequential, 190 although some out-of-order records due to file wraparounds are expected and 191 must be handled by callers. 192 193 Returns: 194 A dictionary mapping feature names to `Tensor` values, each with an 195 arbitrary batch dimension (for efficiency) as their first dimension. 196 """ 197 pass 198 199 @abc.abstractmethod 200 def read_full(self): 201 """Return the full dataset. 202 203 Largely for interactive use/plotting (or evaluation on small 204 datasets). Generally not very efficient. Not recommended for training. 205 206 Returns: 207 Same return type as `read`, but with the full dataset rather than an 208 arbitrary chunk of it. A dictionary mapping feature names to `Tensor` 209 values, where the size of the first dimension of each `Tensor` is the 210 number of samples in the entire dataset. These `Tensor`s should be 211 constant across graph invocations, assuming that the underlying data 212 remains constant. Current implementations re-read data on each graph 213 invocation, although this may change in the future. 214 """ 215 pass 216 217 218class NumpyReader(TimeSeriesReader): 219 """A time series parser for feeding Numpy arrays to a `TimeSeriesInputFn`. 220 221 Avoids embedding data in the graph as constants. 222 """ 223 224 def __init__(self, data, read_num_records_hint=4096): 225 """Numpy array input for a `TimeSeriesInputFn`. 226 227 Args: 228 data: A dictionary mapping feature names to Numpy arrays, with two 229 possible shapes (requires keys `TrainEvalFeatures.TIMES` and 230 `TrainEvalFeatures.VALUES`): 231 Univariate; `TIMES` and `VALUES` are both vectors of shape [series 232 length] 233 Multivariate; `TIMES` is a vector of shape [series length], `VALUES` 234 has shape [series length x number of features]. 235 In any case, `VALUES` and any exogenous features must have their shapes 236 prefixed by the shape of the value corresponding to the `TIMES` key. 237 read_num_records_hint: The maximum number of samples to read at one time, 238 for efficiency. 239 """ 240 self._features = _canonicalize_numpy_data( 241 data, require_single_batch=True) 242 self._read_num_records_hint = read_num_records_hint 243 244 def check_dataset_size(self, minimum_dataset_size): 245 """Raise an error if the dataset is too small.""" 246 dataset_size = self._features[feature_keys.TrainEvalFeatures.TIMES].shape[1] 247 if dataset_size < minimum_dataset_size: 248 raise ValueError( 249 ("A TimeSeriesInputFn is configured to create windows of size {}, " 250 "but only {} records were available in the dataset. Either decrease " 251 "the window size or provide more records.").format( 252 minimum_dataset_size, dataset_size)) 253 254 def read(self): 255 """Returns a large chunk of the Numpy arrays for later re-chunking.""" 256 # Remove the batch dimension from all features 257 features = {key: numpy.squeeze(value, axis=0) 258 for key, value in self._features.items()} 259 return estimator_lib.inputs.numpy_input_fn( 260 x=features, 261 # The first dimensions of features are the series length, since we have 262 # removed the batch dimension above. We now pull out 263 # self._read_num_records_hint steps of this single time series to pass 264 # to the TimeSeriesInputFn. 265 batch_size=self._read_num_records_hint, 266 num_epochs=None, 267 shuffle=False)() 268 269 def read_full(self): 270 """Returns `Tensor` versions of the full Numpy arrays.""" 271 features = estimator_lib.inputs.numpy_input_fn( 272 x=self._features, 273 batch_size=1, 274 num_epochs=None, 275 queue_capacity=2, # Each queue element is a full copy of the dataset 276 shuffle=False)() 277 # TimeSeriesInputFn expect just a batch dimension 278 return {feature_name: array_ops.squeeze(feature_value, axis=0) 279 for feature_name, feature_value in features.items()} 280 281 282class ReaderBaseTimeSeriesParser(TimeSeriesReader): 283 """Base for time series readers which wrap a `tf.ReaderBase`.""" 284 285 def __init__(self, filenames, read_num_records_hint=4096): 286 """Configure the time series reader. 287 288 Args: 289 filenames: A string or list of strings indicating files to read records 290 from. 291 read_num_records_hint: When not reading a full dataset, indicates the 292 number of records to transfer in a single chunk (for efficiency). The 293 actual number transferred at one time may vary. 294 """ 295 self._filenames = filenames 296 self._read_num_records_hint = read_num_records_hint 297 298 @abc.abstractmethod 299 def _get_reader(self): 300 """Get an instance of the tf.ReaderBase associated with this class.""" 301 pass 302 303 @abc.abstractmethod 304 def _process_records(self, lines): 305 """Given string items, return a processed dictionary of Tensors. 306 307 Args: 308 lines: A 1-dimensional string Tensor, each representing a record to parse 309 (source dependent, e.g. a line of a file, or a serialized protocol 310 buffer). 311 312 Returns: 313 A dictionary mapping feature names to their values. The batch dimensions 314 should match the length of `lines`. 315 """ 316 pass 317 318 def _get_filename_queue(self, epoch_limit): 319 """Constructs a filename queue with an epoch limit. 320 321 `epoch_limit` is intended as an error checking fallback to prevent a reader 322 from infinitely looping in its requests for more work items if none are 323 available in any file. It should be set high enough that it is never reached 324 assuming at least one record exists in some file. 325 326 Args: 327 epoch_limit: The maximum number of times to read through the complete list 328 of files before throwing an OutOfRangeError. 329 Returns: 330 A tuple of (filename_queue, epoch_limiter): 331 filename_queue: A FIFOQueue with filename work items. 332 epoch_limiter: The local variable used for epoch limitation. This should 333 be set to zero before a reader is passed `filename_queue` in order to 334 reset the epoch limiter's state. 335 """ 336 epoch_limiter = variable_scope.variable( 337 initial_value=constant_op.constant(0, dtype=dtypes.int64), 338 name="epoch_limiter", 339 trainable=False, 340 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 341 filenames_tensor = array_ops.reshape( 342 ops.convert_to_tensor(self._filenames), [-1]) 343 # We can't rely on epoch_limiter being initialized, since queue runners are 344 # started before local variables are initialized. Instead, we ignore epoch 345 # limits before variable initialization. This means that prior to variable 346 # initialization, a QueueRunner may cause a reader to enter an un-checked 347 # infinite loop. However, as soon as local variables are initialized, we 348 # will start incrementing and checking epoch_limiter, which will interrupt 349 # any in-progress loops. 350 conditional_count_up_to = control_flow_ops.cond( 351 state_ops.is_variable_initialized(epoch_limiter), 352 lambda: epoch_limiter.count_up_to(epoch_limit), 353 lambda: constant_op.constant(0, dtype=dtypes.int64)) 354 with ops.control_dependencies([conditional_count_up_to]): 355 filenames_tensor = array_ops.identity(filenames_tensor) 356 filename_queue = input_lib.string_input_producer( 357 filenames_tensor, shuffle=False, capacity=1) 358 return filename_queue, epoch_limiter 359 360 def read(self): 361 """Reads a chunk of data from the `tf.ReaderBase` for later re-chunking.""" 362 # Assuming there is at least one item to be read among all of the files in 363 # self._filenames, we will not need to go through more than 364 # self._read_num_records_hint epochs to get a batch of 365 # self._read_num_records_hint records. Setting this limit and resetting it 366 # before each reader.read_up_to call prevents infinite looping when there 367 # are no records available in any of the files. 368 filename_queue, epoch_limiter = self._get_filename_queue( 369 epoch_limit=self._read_num_records_hint) 370 reader = self._get_reader() 371 epoch_reset_op = state_ops.assign(epoch_limiter, 0) 372 with ops.control_dependencies([epoch_reset_op]): 373 _, records = reader.read_up_to( 374 filename_queue, self._read_num_records_hint) 375 return self._process_records(records) 376 377 def read_full(self): 378 """Reads a full epoch of data into memory.""" 379 reader = self._get_reader() 380 # Set a hard limit of 2 epochs through self._filenames. If there are any 381 # records available, we should only end up reading the first record in the 382 # second epoch before exiting the while loop and subsequently resetting the 383 # epoch limit. If there are no records available in any of the files, this 384 # hard limit prevents the reader.read_up_to call from looping infinitely. 385 filename_queue, epoch_limiter = self._get_filename_queue(epoch_limit=2) 386 epoch_reset_op = state_ops.assign(epoch_limiter, 0) 387 with ops.control_dependencies([epoch_reset_op]): 388 first_key, first_value = reader.read_up_to(filename_queue, 1) 389 # Read until we get a duplicate key (one epoch) 390 def _while_condition( 391 current_key, current_value, current_index, collected_records): 392 del current_value, current_index, collected_records # unused 393 return math_ops.not_equal(array_ops.squeeze(current_key, axis=0), 394 array_ops.squeeze(first_key, axis=0)) 395 396 def _while_body( 397 current_key, current_value, current_index, collected_records): 398 del current_key # unused 399 new_key, new_value = reader.read_up_to(filename_queue, 1) 400 new_key.set_shape([1]) 401 new_value.set_shape([1]) 402 return (new_key, 403 new_value, 404 current_index + 1, 405 collected_records.write(current_index, current_value)) 406 _, _, _, records_ta = control_flow_ops.while_loop( 407 _while_condition, 408 _while_body, 409 [constant_op.constant([""]), first_value, 410 0, # current_index starting value 411 tensor_array_ops.TensorArray( # collected_records 412 dtype=dtypes.string, size=0, dynamic_size=True)]) 413 records = records_ta.concat() 414 # Reset the reader when we're done so that subsequent requests for data get 415 # the dataset in the proper order. 416 with ops.control_dependencies([records]): 417 reader_reset_op = reader.reset() 418 with ops.control_dependencies([reader_reset_op]): 419 records = array_ops.identity(records) 420 return self._process_records(records) 421 422 423class CSVReader(ReaderBaseTimeSeriesParser): 424 """Reads from a collection of CSV-formatted files.""" 425 426 def __init__(self, 427 filenames, 428 column_names=(feature_keys.TrainEvalFeatures.TIMES, 429 feature_keys.TrainEvalFeatures.VALUES), 430 column_dtypes=None, 431 skip_header_lines=None, 432 read_num_records_hint=4096): 433 """CSV-parsing reader for a `TimeSeriesInputFn`. 434 435 Args: 436 filenames: A filename or list of filenames to read the time series 437 from. Each line must have columns corresponding to `column_names`. 438 column_names: A list indicating names for each 439 feature. `TrainEvalFeatures.TIMES` and `TrainEvalFeatures.VALUES` are 440 required; `VALUES` may be repeated to indicate a multivariate series. 441 column_dtypes: If provided, must be a list with the same length as 442 `column_names`, indicating dtypes for each column. Defaults to 443 `tf.int64` for `TrainEvalFeatures.TIMES` and `tf.float32` for 444 everything else. 445 skip_header_lines: Passed on to `tf.TextLineReader`; skips this number of 446 lines at the beginning of each file. 447 read_num_records_hint: When not reading a full dataset, indicates the 448 number of records to parse/transfer in a single chunk (for 449 efficiency). The actual number transferred at one time may be more or 450 less. 451 Raises: 452 ValueError: If required column names are not specified, or if lengths do 453 not match. 454 """ 455 if feature_keys.TrainEvalFeatures.TIMES not in column_names: 456 raise ValueError("'{}' is a required column.".format( 457 feature_keys.TrainEvalFeatures.TIMES)) 458 if feature_keys.TrainEvalFeatures.VALUES not in column_names: 459 raise ValueError("'{}' is a required column.".format( 460 feature_keys.TrainEvalFeatures.VALUES)) 461 if column_dtypes is not None and len(column_dtypes) != len(column_names): 462 raise ValueError( 463 ("If specified, the length of column_dtypes must match the length of " 464 "column_names (got column_dtypes={} and column_names={}).").format( 465 column_dtypes, column_names)) 466 if sum(1 for column_name in column_names 467 if column_name == feature_keys.TrainEvalFeatures.TIMES) != 1: 468 raise ValueError( 469 "Got more than one times column ('{}'), but exactly " 470 "one is required.".format(feature_keys.TrainEvalFeatures.TIMES)) 471 self._column_names = column_names 472 self._column_dtypes = column_dtypes 473 self._skip_header_lines = skip_header_lines 474 super(CSVReader, self).__init__( 475 filenames=filenames, read_num_records_hint=read_num_records_hint) 476 477 def _get_reader(self): 478 return io_ops.TextLineReader(skip_header_lines=self._skip_header_lines) 479 480 def _process_records(self, lines): 481 """Parse `lines` as CSV records.""" 482 if self._column_dtypes is None: 483 default_values = [(array_ops.zeros([], dtypes.int64),) 484 if column_name == feature_keys.TrainEvalFeatures.TIMES 485 else () for column_name in self._column_names] 486 else: 487 default_values = [(array_ops.zeros([], dtype),) 488 for dtype in self._column_dtypes] 489 columns = parsing_ops.decode_csv(lines, default_values) 490 features_lists = {} 491 for column_name, value in zip(self._column_names, columns): 492 features_lists.setdefault(column_name, []).append(value) 493 features = {} 494 for column_name, values in features_lists.items(): 495 if column_name == feature_keys.TrainEvalFeatures.TIMES: 496 features[column_name] = values[0] 497 else: 498 features[column_name] = array_ops.stack(values, axis=1) 499 return features 500 501 502class TFExampleReader(ReaderBaseTimeSeriesParser): 503 """Reads and parses `tf.Example`s from a TFRecords file.""" 504 505 def __init__(self, 506 filenames, 507 features): 508 """Configure `tf.Example` parsing. 509 510 Args: 511 filenames: A filename or list of filenames to read the time series 512 from. Each line must have columns corresponding to `column_names`. 513 features: A dictionary mapping from feature keys to `tf.FixedLenFeature` 514 objects. Must include `TrainEvalFeatures.TIMES` (scalar integer) and 515 `TrainEvalFeatures.VALUES` (floating point vector) features. 516 Raises: 517 ValueError: If required times/values features are not present. 518 """ 519 if feature_keys.TrainEvalFeatures.TIMES not in features: 520 raise ValueError("'{}' is a required column.".format( 521 feature_keys.TrainEvalFeatures.TIMES)) 522 if feature_keys.TrainEvalFeatures.VALUES not in features: 523 raise ValueError("'{}' is a required column.".format( 524 feature_keys.TrainEvalFeatures.VALUES)) 525 self._features = features 526 super(TFExampleReader, self).__init__(filenames=filenames) 527 528 def _get_reader(self): 529 return io_ops.TFRecordReader() 530 531 def _process_records(self, examples): 532 """Parse `tf.Example`s into `Tensors`.""" 533 return parsing_ops.parse_example( 534 serialized=examples, features=self._features) 535 536 537class TimeSeriesInputFn(object): 538 """Base for classes which create batches of windows from a time series.""" 539 540 @abc.abstractmethod 541 def create_batch(self): 542 """Creates chunked Tensors from times, values, and other features. 543 544 Suitable for use as the input_fn argument of a tf.estimator.Estimator's 545 fit() or evaluate() method. 546 547 Returns: 548 A tuple of (features, targets): 549 features: A dictionary with `TrainEvalFeatures.TIMES` and 550 `TrainEvalFeatures.VALUES` as keys, `TIMES` having an associated value 551 with shape [batch size x window length], `VALUES` with shape [batch 552 size x window length x number of features]. Any other features will 553 also have shapes prefixed with [batch size x window length]. 554 targets: Not used, but must have a value for compatibility with the 555 Estimator API. That value should be None. 556 """ 557 pass 558 559 def __call__(self): 560 # Allow a TimeSeriesInputFn to be used as an input function directly 561 return self.create_batch() 562 563 564class WholeDatasetInputFn(TimeSeriesInputFn): 565 """Supports passing a full time series to a model for evaluation/inference. 566 567 Note that this `TimeSeriesInputFn` is not designed for high throughput, and 568 should not be used for training. It allows for sequential evaluation on a full 569 dataset (with sequential in-sample predictions), which then feeds naturally 570 into `predict_continuation_input_fn` for making out-of-sample 571 predictions. While this is useful for plotting and interactive use, 572 `RandomWindowInputFn` is better suited to training and quantitative 573 evaluation. 574 """ 575 # TODO(allenl): A SequentialWindowInputFn for getting model end state without 576 # loading the whole dataset into memory (or for quantitative evaluation of 577 # sequential models). Note that an Estimator using such a TimeSeriesInputFn 578 # won't return in-sample predictions for the whole dataset, which means it 579 # won't be terribly useful for interactive use/plotting (unless the user 580 # passes in concat metrics). Also need to be careful about state saving for 581 # sequential models, particularly the gaps between chunks. 582 583 def __init__(self, time_series_reader): 584 """Initialize the `TimeSeriesInputFn`. 585 586 Args: 587 time_series_reader: A TimeSeriesReader object. 588 """ 589 self._reader = time_series_reader 590 super(WholeDatasetInputFn, self).__init__() 591 592 def create_batch(self): 593 """A suitable `input_fn` for an `Estimator`'s `evaluate()`. 594 595 Returns: 596 A dictionary mapping feature names to `Tensors`, each shape 597 prefixed by [1, data set size] (i.e. a batch size of 1). 598 """ 599 features = self._reader.read_full() 600 # Add a batch dimension of one to each feature. 601 return ({feature_name: feature_value[None, ...] 602 for feature_name, feature_value in features.items()}, 603 None) 604 605 606class RandomWindowInputFn(TimeSeriesInputFn): 607 """Wraps a `TimeSeriesReader` to create random batches of windows. 608 609 Tensors are first collected into sequential windows (in a windowing queue 610 created by `tf.train.batch`, based on the order returned from 611 `time_series_reader`), then these windows are randomly batched (in a 612 `RandomShuffleQueue`), the Tensors returned by `create_batch` having shapes 613 prefixed by [`batch_size`, `window_size`]. 614 615 This `TimeSeriesInputFn` is useful for both training and quantitative 616 evaluation (but be sure to run several epochs for sequential models such as 617 `StructuralEnsembleRegressor` to completely flush stale state left over from 618 training). For qualitative evaluation or when preparing for predictions, use 619 `WholeDatasetInputFn`. 620 """ 621 622 def __init__( 623 self, time_series_reader, window_size, batch_size, 624 queue_capacity_multiplier=1000, shuffle_min_after_dequeue_multiplier=2, 625 discard_out_of_order=True, discard_consecutive_batches_limit=1000, 626 jitter=True, num_threads=2, shuffle_seed=None): 627 """Configure the RandomWindowInputFn. 628 629 Args: 630 time_series_reader: A TimeSeriesReader object. 631 window_size: The number of examples to keep together sequentially. This 632 controls the length of truncated backpropagation: smaller values mean 633 less sequential computation, which can lead to faster training, but 634 create a coarser approximation to the gradient (which would ideally be 635 computed by a forward pass over the entire sequence in order). 636 batch_size: The number of windows to place together in a batch. Larger 637 values will lead to more stable gradients during training. 638 queue_capacity_multiplier: The capacity for the queues used to create 639 batches, specified as a multiple of `batch_size` (for 640 RandomShuffleQueue) and `batch_size * window_size` (for the 641 FIFOQueue). Controls the maximum number of windows stored. Should be 642 greater than `shuffle_min_after_dequeue_multiplier`. 643 shuffle_min_after_dequeue_multiplier: The minimum number of windows in the 644 RandomShuffleQueue after a dequeue, which controls the amount of entropy 645 introduced during batching. Specified as a multiple of `batch_size`. 646 discard_out_of_order: If True, windows of data which have times which 647 decrease (a higher time followed by a lower time) are discarded. If 648 False, the window and associated features are instead sorted so that 649 times are non-decreasing. Discarding is typically faster, as models do 650 not have to deal with artificial gaps in the data. However, discarding 651 does create a bias where the beginnings and endings of files are 652 under-sampled. 653 discard_consecutive_batches_limit: Raise an OutOfRangeError if more than 654 this number of batches are discarded without a single non-discarded 655 window (prevents infinite looping when the dataset is too small). 656 jitter: If True, randomly discards examples between some windows in order 657 to avoid deterministic chunking patterns. This is important for models 658 like AR which may otherwise overfit a fixed chunking. 659 num_threads: Use this number of threads for queues. Setting a value of 1 660 removes one source of non-determinism (and in combination with 661 shuffle_seed should provide deterministic windowing). 662 shuffle_seed: A seed for window shuffling. The default value of None 663 provides random behavior. With `shuffle_seed` set and 664 `num_threads=1`, provides deterministic behavior. 665 """ 666 self._reader = time_series_reader 667 self._window_size = window_size 668 self._reader.check_dataset_size(minimum_dataset_size=self._window_size) 669 self._batch_size = batch_size 670 self._queue_capacity_multiplier = queue_capacity_multiplier 671 self._shuffle_min_after_dequeue_multiplier = ( 672 shuffle_min_after_dequeue_multiplier) 673 self._discard_out_of_order = discard_out_of_order 674 self._discard_limit = discard_consecutive_batches_limit 675 self._jitter = jitter 676 if num_threads is None: 677 self._num_threads = self._batch_size 678 else: 679 self._num_threads = num_threads 680 self._shuffle_seed = shuffle_seed 681 super(RandomWindowInputFn, self).__init__() 682 683 def create_batch(self): 684 """Create queues to window and batch time series data. 685 686 Returns: 687 A dictionary of Tensors corresponding to the output of `self._reader` 688 (from the `time_series_reader` constructor argument), each with shapes 689 prefixed by [`batch_size`, `window_size`]. 690 """ 691 features = self._reader.read() 692 if self._jitter: 693 # TODO(agarwal, allenl): Figure out if more jitter is needed here. 694 jitter = random_ops.random_uniform(shape=[], maxval=2, dtype=dtypes.int32) 695 else: 696 jitter = 0 697 # To keep things efficient, we pass from the windowing batcher to the 698 # batch-of-windows batcher in batches. This avoids the need for huge numbers 699 # of threads, but does mean that jitter is only applied occasionally. 700 # TODO(allenl): Experiment with different internal passing sizes. 701 internal_passing_size = self._batch_size 702 features_windowed = input_lib.batch( 703 features, 704 batch_size=self._window_size * internal_passing_size + jitter, 705 enqueue_many=True, 706 capacity=(self._queue_capacity_multiplier 707 * internal_passing_size * self._window_size), 708 num_threads=self._num_threads) 709 raw_features_windowed = features_windowed 710 if self._jitter: 711 features_windowed = { 712 key: value[jitter:] 713 for key, value in features_windowed.items()} 714 features_windowed = { 715 key: array_ops.reshape( 716 value, 717 array_ops.concat( 718 [[internal_passing_size, self._window_size], 719 array_ops.shape(value)[1:]], 720 axis=0)) 721 for key, value in features_windowed.items()} 722 batch_and_window_shape = tensor_shape.TensorShape( 723 [internal_passing_size, self._window_size]) 724 for key in features_windowed.keys(): 725 features_windowed[key].set_shape( 726 batch_and_window_shape.concatenate( 727 raw_features_windowed[key].get_shape()[1:])) 728 # When switching files, we may end up with windows where the time is not 729 # decreasing, even if times within each file are sorted (and even if those 730 # files are visited in order, when looping back around to the beginning of 731 # the first file). This is hard for models to deal with, so we either 732 # discard such examples, creating a bias where the beginning and end of the 733 # series is under-sampled, or we sort the window, creating large gaps. 734 times = features_windowed[feature_keys.TrainEvalFeatures.TIMES] 735 if self._discard_out_of_order: 736 non_decreasing = math_ops.reduce_all( 737 times[:, 1:] >= times[:, :-1], axis=1) 738 # Ensure that no more than self._discard_limit complete batches are 739 # discarded contiguously (resetting the count when we find a single clean 740 # window). This prevents infinite looping when the dataset is smaller than 741 # the window size. 742 # TODO(allenl): Figure out a way to return informative errors from 743 # count_up_to. 744 discarded_windows_limiter = variable_scope.variable( 745 initial_value=constant_op.constant(0, dtype=dtypes.int64), 746 name="discarded_windows_limiter", 747 trainable=False, 748 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 749 def _initialized_limit_check(): 750 return control_flow_ops.cond( 751 math_ops.reduce_any(non_decreasing), 752 lambda: state_ops.assign(discarded_windows_limiter, 0), 753 lambda: discarded_windows_limiter.count_up_to(self._discard_limit)) 754 discard_limit_op = control_flow_ops.cond( 755 state_ops.is_variable_initialized(discarded_windows_limiter), 756 _initialized_limit_check, 757 lambda: constant_op.constant(0, dtype=dtypes.int64)) 758 with ops.control_dependencies([discard_limit_op]): 759 non_decreasing = array_ops.identity(non_decreasing) 760 else: 761 _, indices_descending = nn.top_k( 762 times, k=array_ops.shape(times)[-1], sorted=True) 763 indices = array_ops.reverse(indices_descending, axis=[0]) 764 features_windowed = { 765 key: array_ops.gather(params=value, indices=indices) 766 for key, value in features_windowed.items() 767 } 768 non_decreasing = True 769 features_batched = input_lib.maybe_shuffle_batch( 770 features_windowed, 771 num_threads=self._num_threads, 772 seed=self._shuffle_seed, 773 batch_size=self._batch_size, 774 capacity=self._queue_capacity_multiplier * self._batch_size, 775 min_after_dequeue=(self._shuffle_min_after_dequeue_multiplier * 776 self._batch_size), 777 keep_input=non_decreasing, 778 enqueue_many=True) 779 return (features_batched, None) 780 781 782def _canonicalize_numpy_data(data, require_single_batch): 783 """Do basic checking and reshaping for Numpy data. 784 785 Args: 786 data: A dictionary mapping keys to Numpy arrays, with several possible 787 shapes (requires keys `TrainEvalFeatures.TIMES` and 788 `TrainEvalFeatures.VALUES`): 789 Single example; `TIMES` is a scalar and `VALUES` is either a scalar or a 790 vector of length [number of features]. 791 Sequence; `TIMES` is a vector of shape [series length], `VALUES` either 792 has shape [series length] (univariate) or [series length x number of 793 features] (multivariate). 794 Batch of sequences; `TIMES` is a vector of shape [batch size x series 795 length], `VALUES` has shape [batch size x series length] or [batch 796 size x series length x number of features]. 797 In any case, `VALUES` and any exogenous features must have their shapes 798 prefixed by the shape of the value corresponding to the `TIMES` key. 799 require_single_batch: If True, raises an error if the provided data has a 800 batch dimension > 1. 801 Returns: 802 A dictionary with features normalized to have shapes prefixed with [batch 803 size x series length]. The sizes of dimensions which were omitted in the 804 inputs are 1. 805 Raises: 806 ValueError: If dimensions are incorrect or do not match, or required 807 features are missing. 808 """ 809 features = {key: numpy.array(value) for key, value in data.items()} 810 if (feature_keys.TrainEvalFeatures.TIMES not in features or 811 feature_keys.TrainEvalFeatures.VALUES not in features): 812 raise ValueError("{} and {} are required features.".format( 813 feature_keys.TrainEvalFeatures.TIMES, 814 feature_keys.TrainEvalFeatures.VALUES)) 815 times = features[feature_keys.TrainEvalFeatures.TIMES] 816 for key, value in features.items(): 817 if value.shape[:len(times.shape)] != times.shape: 818 raise ValueError( 819 ("All features must have their shapes prefixed by the shape of the" 820 " times feature. Got shape {} for feature '{}', but shape {} for" 821 " '{}'").format(value.shape, key, times.shape, 822 feature_keys.TrainEvalFeatures.TIMES)) 823 if not times.shape: # a single example 824 if not features[feature_keys.TrainEvalFeatures.VALUES].shape: # univariate 825 # Add a feature dimension (with one feature) 826 features[feature_keys.TrainEvalFeatures.VALUES] = features[ 827 feature_keys.TrainEvalFeatures.VALUES][..., None] 828 elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 1: 829 raise ValueError( 830 ("Got an unexpected number of dimensions for the '{}' feature." 831 " Was expecting at most 1 dimension" 832 " ([number of features]) since '{}' does not " 833 "have a batch or time dimension, but got shape {}").format( 834 feature_keys.TrainEvalFeatures.VALUES, 835 feature_keys.TrainEvalFeatures.TIMES, 836 features[feature_keys.TrainEvalFeatures.VALUES].shape)) 837 # Add trivial batch and time dimensions for every feature 838 features = {key: value[None, None, ...] for key, value in features.items()} 839 if len(times.shape) == 1: # shape [series length] 840 if len(features[feature_keys.TrainEvalFeatures.VALUES] 841 .shape) == 1: # shape [series length] 842 # Add a feature dimension (with one feature) 843 features[feature_keys.TrainEvalFeatures.VALUES] = features[ 844 feature_keys.TrainEvalFeatures.VALUES][..., None] 845 elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 2: 846 raise ValueError( 847 ("Got an unexpected number of dimensions for the '{}' feature." 848 " Was expecting at most 2 dimensions" 849 " ([series length, number of features]) since '{}' does not " 850 "have a batch dimension, but got shape {}").format( 851 feature_keys.TrainEvalFeatures.VALUES, 852 feature_keys.TrainEvalFeatures.TIMES, 853 features[feature_keys.TrainEvalFeatures.VALUES].shape)) 854 # Add trivial batch dimensions for every feature 855 features = {key: value[None, ...] for key, value in features.items()} 856 elif len(features[feature_keys.TrainEvalFeatures.TIMES] 857 .shape) != 2: # shape [batch size, series length] 858 raise ValueError( 859 ("Got an unexpected number of dimensions for times. Was expecting at " 860 "most two ([batch size, series length]), but got shape {}.").format( 861 times.shape)) 862 if require_single_batch: 863 # We don't expect input to be already batched; batching is done later 864 if features[feature_keys.TrainEvalFeatures.TIMES].shape[0] != 1: 865 raise ValueError("Got batch input, was expecting unbatched input.") 866 return features 867